From 4ef4b4709b25a43809b57073aac3caf9c11485e3 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 26 Oct 2016 14:34:13 -0400 Subject: [PATCH] workflow execution added --- awx/main/models/workflow.py | 17 ++--- awx/main/scheduler/__init__.py | 89 +++++++++-------------- awx/main/scheduler/dependency_graph.py | 14 ++++ awx/main/scheduler/partial.py | 13 ++++ awx/main/tests/unit/scheduler/conftest.py | 1 + requirements/requirements.txt | 1 + 6 files changed, 68 insertions(+), 67 deletions(-) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index a4f02deef2..318d32ff48 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -393,11 +393,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def _get_parent_field_name(cls): return 'workflow_job_template' - @classmethod - def _get_task_class(cls): - from awx.main.tasks import RunWorkflowJob - return RunWorkflowJob - def _has_failed(self): return False @@ -426,11 +421,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def get_notification_friendly_name(self): return "Workflow Job" - def start(self, *args, **kwargs): - (res, opts) = self.pre_start(**kwargs) - if res: - self.status = 'running' - self.save() - self.websocket_emit_status("running") - return res + ''' + A WorkflowJob is a virtual job. It doesn't result in a celery task. + ''' + def start_celery_task(self, opts, error_callback, success_callback): + return None diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index ca79fb8aca..cf5fbecddc 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -26,6 +26,7 @@ from awx.main.scheduler.partial import ( InventorySourceDict, SystemJobDict, AdHocCommandDict, + WorkflowJobDict, ) # Celery @@ -47,15 +48,9 @@ class Scheduler(): project_updates = ProjectUpdateDict.filter_partial(status=status_list) system_jobs = SystemJobDict.filter_partial(status=status_list) ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list) - ''' - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(**kv)] - all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + - graph_project_updates + graph_system_jobs + - graph_workflow_jobs, - key=lambda task: task.created) - ''' - all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands, + workflow_jobs = WorkflowJobDict.filter_partial(status=status_list) + + all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task['created']) return all_actions @@ -111,7 +106,7 @@ class Scheduler(): job.status = 'failed' job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status("failed") + connection.on_commit(lambda: job.websocket_emit_status('failed')) # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) @@ -122,12 +117,9 @@ class Scheduler(): dag = WorkflowDAG(workflow_job) if dag.is_workflow_done(): # TODO: detect if wfj failed - if workflow_job._has_failed(): - workflow_job.status = 'failed' - else: - workflow_job.status = 'successful' + workflow_job.status = 'completed' workflow_job.save() - workflow_job.websocket_emit_status(workflow_job.status) + connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) def get_activate_tasks(self): inspector = inspect() @@ -153,6 +145,8 @@ class Scheduler(): def start_task(self, task, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success + status_changed = False + # TODO: spawn inventory and project updates task_actual = { 'type':task.get_job_type_str(), @@ -164,21 +158,36 @@ class Scheduler(): success_handler = handle_work_success.s(task_actual=task_actual) job_obj = task.get_full() - job_obj.status = 'waiting' + if job_obj.status == 'pending': + status_changed = True + job_obj.status = 'waiting' (start_status, opts) = job_obj.pre_start() if not start_status: + status_changed = True job_obj.status = 'failed' if job_obj.job_explanation: job_obj.job_explanation += ' ' job_obj.job_explanation += 'Task failed pre-start check.' job_obj.save() # TODO: run error handler to fail sub-tasks and send notifications - return + else: + if type(job_obj) is WorkflowJob: + job_obj.status = 'running' + status_changed = True - self.consume_capacity(task) + if status_changed is True: + job_obj.save() - connection.on_commit(lambda: job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)) + self.consume_capacity(task) + + def post_commit(): + if status_changed: + job_obj.websocket_emit_status(job_obj.status) + if job_obj.status != 'failed': + job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) + + connection.on_commit(post_commit) def process_runnable_tasks(self, runnable_tasks): for i, task in enumerate(runnable_tasks): @@ -281,7 +290,7 @@ class Scheduler(): 'Celery, so it has been marked as failed.', )) task_obj.save() - task_obj.websocket_emit_status("failed") + connection.on_commit(lambda: task_obj.websocket_emit_status('failed')) all_sorted_tasks.pop(i) logger.error("Task %s appears orphaned... marking as failed" % task) @@ -323,28 +332,6 @@ class Scheduler(): pending_tasks = filter(lambda t: t['status'] == 'pending', all_sorted_tasks) self.process_pending_tasks(pending_tasks) - - ''' - def do_graph_things(): - # Rebuild graph - graph = SimpleDAG() - for task in running_tasks: - graph.add_node(task) - #for wait_task in waiting_tasks[:50]: - for wait_task in waiting_tasks: - node_dependencies = [] - for node in graph: - if wait_task.is_blocked_by(node['node_object']): - node_dependencies.append(node['node_object']) - graph.add_node(wait_task) - for dependency in node_dependencies: - graph.add_edge(wait_task, dependency) - if settings.DEBUG: - graph.generate_graphviz_plot() - return graph - ''' - #return do_graph_things() - def _schedule(self): all_sorted_tasks = self.get_tasks() if len(all_sorted_tasks) > 0: @@ -359,23 +346,21 @@ class Scheduler(): inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks) self.process_inventory_sources(inventory_id_sources) - self.process_tasks(all_sorted_tasks) + running_workflow_tasks = self.get_running_workflow_jobs() + self.process_finished_workflow_jobs(running_workflow_tasks) - #print("Finished schedule()") + self.spawn_workflow_graph_jobs(running_workflow_tasks) + + self.process_tasks(all_sorted_tasks) def schedule(self): with transaction.atomic(): - #t1 = datetime.now() # Lock try: Instance.objects.select_for_update(nowait=True).all()[0] except DatabaseError: return - #workflow_jobs = get_running_workflow_jobs() - #process_finished_workflow_jobs(workflow_jobs) - #spawn_workflow_graph_jobs(workflow_jobs) - ''' Get tasks known by celery ''' @@ -387,10 +372,4 @@ class Scheduler(): ''' self._schedule() - # Unlock, due to transaction ending - #t2 = datetime.now() - #t_diff = t2 - t1 - #print("schedule() time %s" % (t_diff.total_seconds())) - - diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 3142699077..edd49c98a9 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -7,6 +7,7 @@ from awx.main.scheduler.partial import ( InventoryUpdateDict, SystemJobDict, AdHocCommandDict, + WorkflowJobDict, ) class DependencyGraph(object): PROJECT_UPDATES = 'project_updates' @@ -14,6 +15,7 @@ class DependencyGraph(object): JOB_TEMPLATE_JOBS = 'job_template_jobs' SYSTEM_JOB = 'system_job' INVENTORY_SOURCE_UPDATES = 'inventory_source_updates' + WORKFLOW_JOB_TEMPLATES_JOBS = 'workflow_job_template_jobs' LATEST_PROJECT_UPDATES = 'latest_project_updates' LATEST_INVENTORY_UPDATES = 'latest_inventory_updates' @@ -32,6 +34,8 @@ class DependencyGraph(object): self.data[self.INVENTORY_SOURCE_UPDATES] = {} # True / False self.data[self.SYSTEM_JOB] = True + # workflow_job_template_id -> True / False + self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {} # project_id -> latest ProjectUpdateLatestDict self.data[self.LATEST_PROJECT_UPDATES] = {} @@ -138,6 +142,9 @@ class DependencyGraph(object): self.data[self.PROJECT_UPDATES][job['project_id']] = False self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False + def mark_workflow_job(self, job): + self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False + def can_project_update_run(self, job): return self.data[self.PROJECT_UPDATES].get(job['project_id'], True) @@ -153,6 +160,9 @@ class DependencyGraph(object): return True return False + def can_workflow_job_run(self, job): + return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job['workflow_job_template_id'], True) + def can_system_job_run(self): return self.data[self.SYSTEM_JOB] @@ -170,6 +180,8 @@ class DependencyGraph(object): return not self.can_system_job_run() elif type(job) is AdHocCommandDict: return not self.can_ad_hoc_command_run(job) + elif type(job) is WorkflowJobDict: + return not self.can_workflow_job_run(job) def add_job(self, job): if type(job) is ProjectUpdateDict: @@ -179,6 +191,8 @@ class DependencyGraph(object): self.mark_inventory_source_update(job['inventory_source_id']) elif type(job) is JobDict: self.mark_job_template_job(job) + elif type(job) is WorkflowJobDict: + self.mark_workflow_job(job) elif type(job) is SystemJobDict: self.mark_system_job() elif type(job) is AdHocCommandDict: diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py index a92c5c7bd6..576b66a9c3 100644 --- a/awx/main/scheduler/partial.py +++ b/awx/main/scheduler/partial.py @@ -7,6 +7,7 @@ from awx.main.models import ( InventorySource, SystemJob, AdHocCommand, + WorkflowJob, ) class PartialModelDict(object): @@ -205,3 +206,15 @@ class AdHocCommandDict(PartialModelDict): def task_impact(self): return 20 +class WorkflowJobDict(PartialModelDict): + FIELDS = ( + 'id', 'created', 'status', 'workflow_job_template_id', + ) + model = WorkflowJob + + def get_job_type_str(self): + return 'workflow_job' + + def task_impact(self): + return 10 + diff --git a/awx/main/tests/unit/scheduler/conftest.py b/awx/main/tests/unit/scheduler/conftest.py index 2fd84474f7..cec68b1ef7 100644 --- a/awx/main/tests/unit/scheduler/conftest.py +++ b/awx/main/tests/unit/scheduler/conftest.py @@ -35,6 +35,7 @@ def scheduler_factory(mocker, epoch): raise RuntimeError("create_project_update should not be called") mocker.patch.object(sched, 'get_tasks', return_value=tasks) + mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[]) mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources) mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates) mocker.patch.object(sched, 'get_latest_inventory_update_tasks', return_value=latest_inventory_updates) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index fb885a8842..5f2448d9e6 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -25,6 +25,7 @@ django-radius==1.0.0 djangorestframework==3.3.2 djangorestframework-yaml==1.0.2 django-split-settings==0.1.1 +django-transaction-hooks==0.2 django-taggit==0.17.6 git+https://github.com/matburt/dm.xmlsec.binding.git@master#egg=dm.xmlsec.binding dogpile.core==0.4.1