Remove source_workflow_job and use workflow_job_id instead.

This commit is contained in:
Aaron Tan
2016-11-29 11:51:42 -05:00
parent 3e8e9480d1
commit a458bb3c89
6 changed files with 17 additions and 49 deletions

View File

@@ -18,8 +18,8 @@ from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.partial import (
JobDict,
ProjectUpdateDict,
JobDict,
ProjectUpdateDict,
ProjectUpdateLatestDict,
InventoryUpdateDict,
InventoryUpdateLatestDict,
@@ -103,7 +103,7 @@ class TaskManager():
for task in all_sorted_tasks:
if type(task) is JobDict:
inventory_ids.add(task['inventory_id'])
for inventory_id in inventory_ids:
results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id)))
@@ -116,11 +116,6 @@ class TaskManager():
for spawn_node in spawn_nodes:
kv = spawn_node.get_job_kwargs()
job = spawn_node.unified_job_template.create_unified_job(**kv)
# source_workflow_job is a job-specific field rather than a field copied from job
# template, therefore does not fit into the copy routine and should be put outside
# of create_unified_job.
job.source_workflow_job = workflow_job
job.save()
spawn_node.job = job
spawn_node.save()
can_start = job.signal_start(**kv)
@@ -179,10 +174,10 @@ class TaskManager():
'id': task['id'],
}
dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks]
error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies)
success_handler = handle_work_success.s(task_actual=task_actual)
job_obj = task.get_full()
job_obj.status = 'waiting'
@@ -206,7 +201,7 @@ class TaskManager():
job_obj.websocket_emit_status(job_obj.status)
if job_obj.status != 'failed':
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
connection.on_commit(post_commit)
def process_runnable_tasks(self, runnable_tasks):
@@ -284,7 +279,7 @@ class TaskManager():
if not self.graph.is_job_blocked(task):
dependencies = self.generate_dependencies(task)
self.process_dependencies(task, dependencies)
# Spawning deps might have blocked us
if not self.graph.is_job_blocked(task):
self.graph.add_job(task)
@@ -300,7 +295,7 @@ class TaskManager():
for task in all_running_sorted_tasks:
if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# NOTE: Pull status again and make sure it didn't finish in
# NOTE: Pull status again and make sure it didn't finish in
# the meantime?
# TODO: try catch the getting of the job. The job COULD have been deleted
task_obj = task.get_full()
@@ -351,7 +346,7 @@ class TaskManager():
latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
self.process_latest_inventory_updates(latest_inventory_updates)
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_inventory_sources(inventory_id_sources)
@@ -376,4 +371,3 @@ class TaskManager():
# Operations whose queries rely on modifications made during the atomic scheduling session
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')