workflow execution added

This commit is contained in:
Chris Meyers
2016-10-26 14:34:13 -04:00
parent fd8c641fa5
commit 4ef4b4709b
6 changed files with 68 additions and 67 deletions

View File

@@ -393,11 +393,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow
def _get_parent_field_name(cls):
return 'workflow_job_template'
@classmethod
def _get_task_class(cls):
from awx.main.tasks import RunWorkflowJob
return RunWorkflowJob
def _has_failed(self):
return False
@@ -426,11 +421,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow
def get_notification_friendly_name(self):
return "Workflow Job"
def start(self, *args, **kwargs):
(res, opts) = self.pre_start(**kwargs)
if res:
self.status = 'running'
self.save()
self.websocket_emit_status("running")
return res
'''
A WorkflowJob is a virtual job. It doesn't result in a celery task.
'''
def start_celery_task(self, opts, error_callback, success_callback):
return None

View File

@@ -26,6 +26,7 @@ from awx.main.scheduler.partial import (
InventorySourceDict,
SystemJobDict,
AdHocCommandDict,
WorkflowJobDict,
)
# Celery
@@ -47,15 +48,9 @@ class Scheduler():
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
system_jobs = SystemJobDict.filter_partial(status=status_list)
ad_hoc_commands = AdHocCommandDict.filter_partial(status=status_list)
'''
graph_workflow_jobs = [wf for wf in
WorkflowJob.objects.filter(**kv)]
all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates +
graph_project_updates + graph_system_jobs +
graph_workflow_jobs,
key=lambda task: task.created)
'''
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands,
workflow_jobs = WorkflowJobDict.filter_partial(status=status_list)
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs,
key=lambda task: task['created'])
return all_actions
@@ -111,7 +106,7 @@ class Scheduler():
job.status = 'failed'
job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials"
job.save(update_fields=['status', 'job_explanation'])
job.websocket_emit_status("failed")
connection.on_commit(lambda: job.websocket_emit_status('failed'))
# TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ?
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
@@ -122,12 +117,9 @@ class Scheduler():
dag = WorkflowDAG(workflow_job)
if dag.is_workflow_done():
# TODO: detect if wfj failed
if workflow_job._has_failed():
workflow_job.status = 'failed'
else:
workflow_job.status = 'successful'
workflow_job.status = 'completed'
workflow_job.save()
workflow_job.websocket_emit_status(workflow_job.status)
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
def get_activate_tasks(self):
inspector = inspect()
@@ -153,6 +145,8 @@ class Scheduler():
def start_task(self, task, dependent_tasks=[]):
from awx.main.tasks import handle_work_error, handle_work_success
status_changed = False
# TODO: spawn inventory and project updates
task_actual = {
'type':task.get_job_type_str(),
@@ -164,21 +158,36 @@ class Scheduler():
success_handler = handle_work_success.s(task_actual=task_actual)
job_obj = task.get_full()
job_obj.status = 'waiting'
if job_obj.status == 'pending':
status_changed = True
job_obj.status = 'waiting'
(start_status, opts) = job_obj.pre_start()
if not start_status:
status_changed = True
job_obj.status = 'failed'
if job_obj.job_explanation:
job_obj.job_explanation += ' '
job_obj.job_explanation += 'Task failed pre-start check.'
job_obj.save()
# TODO: run error handler to fail sub-tasks and send notifications
return
else:
if type(job_obj) is WorkflowJob:
job_obj.status = 'running'
status_changed = True
self.consume_capacity(task)
if status_changed is True:
job_obj.save()
connection.on_commit(lambda: job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler))
self.consume_capacity(task)
def post_commit():
if status_changed:
job_obj.websocket_emit_status(job_obj.status)
if job_obj.status != 'failed':
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
connection.on_commit(post_commit)
def process_runnable_tasks(self, runnable_tasks):
for i, task in enumerate(runnable_tasks):
@@ -281,7 +290,7 @@ class Scheduler():
'Celery, so it has been marked as failed.',
))
task_obj.save()
task_obj.websocket_emit_status("failed")
connection.on_commit(lambda: task_obj.websocket_emit_status('failed'))
all_sorted_tasks.pop(i)
logger.error("Task %s appears orphaned... marking as failed" % task)
@@ -323,28 +332,6 @@ class Scheduler():
pending_tasks = filter(lambda t: t['status'] == 'pending', all_sorted_tasks)
self.process_pending_tasks(pending_tasks)
'''
def do_graph_things():
# Rebuild graph
graph = SimpleDAG()
for task in running_tasks:
graph.add_node(task)
#for wait_task in waiting_tasks[:50]:
for wait_task in waiting_tasks:
node_dependencies = []
for node in graph:
if wait_task.is_blocked_by(node['node_object']):
node_dependencies.append(node['node_object'])
graph.add_node(wait_task)
for dependency in node_dependencies:
graph.add_edge(wait_task, dependency)
if settings.DEBUG:
graph.generate_graphviz_plot()
return graph
'''
#return do_graph_things()
def _schedule(self):
all_sorted_tasks = self.get_tasks()
if len(all_sorted_tasks) > 0:
@@ -359,23 +346,21 @@ class Scheduler():
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_inventory_sources(inventory_id_sources)
self.process_tasks(all_sorted_tasks)
running_workflow_tasks = self.get_running_workflow_jobs()
self.process_finished_workflow_jobs(running_workflow_tasks)
#print("Finished schedule()")
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.process_tasks(all_sorted_tasks)
def schedule(self):
with transaction.atomic():
#t1 = datetime.now()
# Lock
try:
Instance.objects.select_for_update(nowait=True).all()[0]
except DatabaseError:
return
#workflow_jobs = get_running_workflow_jobs()
#process_finished_workflow_jobs(workflow_jobs)
#spawn_workflow_graph_jobs(workflow_jobs)
'''
Get tasks known by celery
'''
@@ -387,10 +372,4 @@ class Scheduler():
'''
self._schedule()
# Unlock, due to transaction ending
#t2 = datetime.now()
#t_diff = t2 - t1
#print("schedule() time %s" % (t_diff.total_seconds()))

View File

@@ -7,6 +7,7 @@ from awx.main.scheduler.partial import (
InventoryUpdateDict,
SystemJobDict,
AdHocCommandDict,
WorkflowJobDict,
)
class DependencyGraph(object):
PROJECT_UPDATES = 'project_updates'
@@ -14,6 +15,7 @@ class DependencyGraph(object):
JOB_TEMPLATE_JOBS = 'job_template_jobs'
SYSTEM_JOB = 'system_job'
INVENTORY_SOURCE_UPDATES = 'inventory_source_updates'
WORKFLOW_JOB_TEMPLATES_JOBS = 'workflow_job_template_jobs'
LATEST_PROJECT_UPDATES = 'latest_project_updates'
LATEST_INVENTORY_UPDATES = 'latest_inventory_updates'
@@ -32,6 +34,8 @@ class DependencyGraph(object):
self.data[self.INVENTORY_SOURCE_UPDATES] = {}
# True / False
self.data[self.SYSTEM_JOB] = True
# workflow_job_template_id -> True / False
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {}
# project_id -> latest ProjectUpdateLatestDict
self.data[self.LATEST_PROJECT_UPDATES] = {}
@@ -138,6 +142,9 @@ class DependencyGraph(object):
self.data[self.PROJECT_UPDATES][job['project_id']] = False
self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False
def mark_workflow_job(self, job):
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False
def can_project_update_run(self, job):
return self.data[self.PROJECT_UPDATES].get(job['project_id'], True)
@@ -153,6 +160,9 @@ class DependencyGraph(object):
return True
return False
def can_workflow_job_run(self, job):
return self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS].get(job['workflow_job_template_id'], True)
def can_system_job_run(self):
return self.data[self.SYSTEM_JOB]
@@ -170,6 +180,8 @@ class DependencyGraph(object):
return not self.can_system_job_run()
elif type(job) is AdHocCommandDict:
return not self.can_ad_hoc_command_run(job)
elif type(job) is WorkflowJobDict:
return not self.can_workflow_job_run(job)
def add_job(self, job):
if type(job) is ProjectUpdateDict:
@@ -179,6 +191,8 @@ class DependencyGraph(object):
self.mark_inventory_source_update(job['inventory_source_id'])
elif type(job) is JobDict:
self.mark_job_template_job(job)
elif type(job) is WorkflowJobDict:
self.mark_workflow_job(job)
elif type(job) is SystemJobDict:
self.mark_system_job()
elif type(job) is AdHocCommandDict:

View File

@@ -7,6 +7,7 @@ from awx.main.models import (
InventorySource,
SystemJob,
AdHocCommand,
WorkflowJob,
)
class PartialModelDict(object):
@@ -205,3 +206,15 @@ class AdHocCommandDict(PartialModelDict):
def task_impact(self):
return 20
class WorkflowJobDict(PartialModelDict):
FIELDS = (
'id', 'created', 'status', 'workflow_job_template_id',
)
model = WorkflowJob
def get_job_type_str(self):
return 'workflow_job'
def task_impact(self):
return 10

View File

@@ -35,6 +35,7 @@ def scheduler_factory(mocker, epoch):
raise RuntimeError("create_project_update should not be called")
mocker.patch.object(sched, 'get_tasks', return_value=tasks)
mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[])
mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources)
mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates)
mocker.patch.object(sched, 'get_latest_inventory_update_tasks', return_value=latest_inventory_updates)

View File

@@ -25,6 +25,7 @@ django-radius==1.0.0
djangorestframework==3.3.2
djangorestframework-yaml==1.0.2
django-split-settings==0.1.1
django-transaction-hooks==0.2
django-taggit==0.17.6
git+https://github.com/matburt/dm.xmlsec.binding.git@master#egg=dm.xmlsec.binding
dogpile.core==0.4.1