launch workflows

This commit is contained in:
Chris Meyers
2016-09-01 14:11:20 -04:00
parent 2cfdee3b21
commit 39ac2c047b
9 changed files with 217 additions and 60 deletions

View File

@@ -2229,13 +2229,19 @@ class WorkflowNodeSerializer(BaseSerializer):
class Meta: class Meta:
model = WorkflowNode model = WorkflowNode
fields = ('id', 'url', 'related', 'workflow_job_template', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',) # TODO: workflow_job and job read-only
fields = ('id', 'url', 'related', 'workflow_job_template', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes', 'job',)
def get_related(self, obj): def get_related(self, obj):
res = super(WorkflowNodeSerializer, self).get_related(obj) res = super(WorkflowNodeSerializer, self).get_related(obj)
res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,)) if obj.workflow_job_template:
res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,))
if obj.unified_job_template: if obj.unified_job_template:
res['unified_job_template'] = obj.unified_job_template.get_absolute_url() res['unified_job_template'] = obj.unified_job_template.get_absolute_url()
if obj.job:
res['job'] = reverse('api:job_detail', args=(obj.job.pk,))
if obj.workflow_job:
res['workflow_job'] = reverse('api:workflow_job_detail', args=(obj.workflow_job.pk,))
res['success_nodes'] = reverse('api:workflow_node_success_nodes_list', args=(obj.pk,)) res['success_nodes'] = reverse('api:workflow_node_success_nodes_list', args=(obj.pk,))
res['failure_nodes'] = reverse('api:workflow_node_failure_nodes_list', args=(obj.pk,)) res['failure_nodes'] = reverse('api:workflow_node_failure_nodes_list', args=(obj.pk,))
res['always_nodes'] = reverse('api:workflow_node_always_nodes_list', args=(obj.pk,)) res['always_nodes'] = reverse('api:workflow_node_always_nodes_list', args=(obj.pk,))

View File

@@ -2627,8 +2627,6 @@ class WorkflowNodeDetail(RetrieveUpdateDestroyAPIView):
model = WorkflowNode model = WorkflowNode
serializer_class = WorkflowNodeDetailSerializer serializer_class = WorkflowNodeDetailSerializer
parent_model = WorkflowJobTemplate
relationship = 'workflow_job_template'
new_in_310 = True new_in_310 = True
class WorkflowNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView): class WorkflowNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView):
@@ -2748,7 +2746,7 @@ class WorkflowJobWorkflowNodesList(SubListAPIView):
always_allow_superuser = True # TODO: RBAC always_allow_superuser = True # TODO: RBAC
parent_model = WorkflowJob parent_model = WorkflowJob
relationship = 'workflow_job_nodes' relationship = 'workflow_job_nodes'
parent_key = 'job' parent_key = 'workflow_job'
class SystemJobTemplateList(ListAPIView): class SystemJobTemplateList(ListAPIView):

View File

@@ -54,6 +54,8 @@ class SimpleDAG(object):
type_str = "Inventory" type_str = "Inventory"
elif type(obj) == ProjectUpdate: elif type(obj) == ProjectUpdate:
type_str = "Project" type_str = "Project"
elif type(obj) == WorkflowJob:
type_str = "Workflow"
else: else:
type_str = "Unknown" type_str = "Unknown"
type_str += "%s" % str(obj.id) type_str += "%s" % str(obj.id)
@@ -68,10 +70,11 @@ class SimpleDAG(object):
short_string_obj(n['node_object']), short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black", "red" if n['node_object'].status == 'running' else "black",
) )
for from_node, to_node in self.edges: for from_node, to_node, label in self.edges:
doc += "%s -> %s;\n" % ( doc += "%s -> %s [ label=\"%s\" ];\n" % (
short_string_obj(self.nodes[from_node]['node_object']), short_string_obj(self.nodes[from_node]['node_object']),
short_string_obj(self.nodes[to_node]['node_object']), short_string_obj(self.nodes[to_node]['node_object']),
label,
) )
doc += "}\n" doc += "}\n"
gv_file = open('/tmp/graph.gv', 'w') gv_file = open('/tmp/graph.gv', 'w')
@@ -82,16 +85,16 @@ class SimpleDAG(object):
if self.find_ord(obj) is None: if self.find_ord(obj) is None:
self.nodes.append(dict(node_object=obj, metadata=metadata)) self.nodes.append(dict(node_object=obj, metadata=metadata))
def add_edge(self, from_obj, to_obj): def add_edge(self, from_obj, to_obj, label=None):
from_obj_ord = self.find_ord(from_obj) from_obj_ord = self.find_ord(from_obj)
to_obj_ord = self.find_ord(to_obj) to_obj_ord = self.find_ord(to_obj)
if from_obj_ord is None or to_obj_ord is None: if from_obj_ord is None or to_obj_ord is None:
raise LookupError("Object not found") raise LookupError("Object not found")
self.edges.append((from_obj_ord, to_obj_ord)) self.edges.append((from_obj_ord, to_obj_ord, label))
def add_edges(self, edgelist): def add_edges(self, edgelist):
for edge_pair in edgelist: for edge_pair in edgelist:
self.add_edge(edge_pair[0], edge_pair[1]) self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2])
def find_ord(self, obj): def find_ord(self, obj):
for idx in range(len(self.nodes)): for idx in range(len(self.nodes)):
@@ -114,20 +117,28 @@ class SimpleDAG(object):
return "workflow_job" return "workflow_job"
return "unknown" return "unknown"
def get_dependencies(self, obj): def get_dependencies(self, obj, label=None):
antecedents = [] antecedents = []
this_ord = self.find_ord(obj) this_ord = self.find_ord(obj)
for node, dep in self.edges: for node, dep, lbl in self.edges:
if node == this_ord: if label:
antecedents.append(self.nodes[dep]) if node == this_ord and lbl == label:
antecedents.append(self.nodes[dep])
else:
if node == this_ord:
antecedents.append(self.nodes[dep])
return antecedents return antecedents
def get_dependents(self, obj): def get_dependents(self, obj, label=None):
decendents = [] decendents = []
this_ord = self.find_ord(obj) this_ord = self.find_ord(obj)
for node, dep in self.edges: for node, dep, lbl in self.edges:
if dep == this_ord: if label:
decendents.append(self.nodes[node]) if dep == this_ord and lbl == label:
decendents.append(self.nodes[node])
else:
if dep == this_ord:
decendents.append(self.nodes[node])
return decendents return decendents
def get_leaf_nodes(self): def get_leaf_nodes(self):
@@ -144,6 +155,83 @@ class SimpleDAG(object):
roots.append(n) roots.append(n)
return roots return roots
class WorkflowDAG(SimpleDAG):
def __init__(self, workflow_job=None):
super(WorkflowDAG, self).__init__()
if workflow_job:
self._init_graph(workflow_job)
def _init_graph(self, workflow_job):
workflow_nodes = workflow_job.workflow_job_nodes.all()
for workflow_node in workflow_nodes:
self.add_node(workflow_node)
for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']:
for workflow_node in workflow_nodes:
related_nodes = getattr(workflow_node, node_type).all()
for related_node in related_nodes:
self.add_edge(workflow_node, related_node, node_type)
def bfs_nodes_to_run(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
nodes_found = []
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
print("\t\tExamining node %s job %s" % (obj, job))
if not job:
print("\t\tNo job for node %s" % obj)
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
print("\t\tJob status not 'failed' 'error' nor 'successful' %s" % job.status)
continue
elif job.status in ['failed', 'error']:
print("\t\tJob status is failed or error %s" % job.status)
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
print("\t\tJob status is successful %s" % job.status)
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return [n['node_object'] for n in nodes_found]
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
return False
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'error', 'successful']:
return False
elif job.status in ['failed', 'error']:
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status in ['successful']:
children_success = self.get_dependencies(obj, 'success_nodes')
nodes.extend(children_success)
else:
logger.warn("Incorrect graph structure")
return True
def get_tasks(): def get_tasks():
"""Fetch all Tower tasks that are relevant to the task management """Fetch all Tower tasks that are relevant to the task management
system. system.
@@ -166,6 +254,33 @@ def get_tasks():
key=lambda task: task.created) key=lambda task: task.created)
return all_actions return all_actions
def get_running_workflow_jobs():
graph_workflow_jobs = [wf for wf in
WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
def do_spawn_workflow_jobs():
workflow_jobs = get_running_workflow_jobs()
print("Set of workflow jobs to process %s" % workflow_jobs)
for workflow_job in workflow_jobs:
print("Building the dag")
dag = WorkflowDAG(workflow_job)
print("Imported the workflow job dag")
for n in dag.nodes:
print("\tWorkflow dag node %s" % n)
for f, to, label in dag.edges:
print("\tWorkflow dag edge <%s,%s,%s>" % (f, to, label))
spawn_nodes = dag.bfs_nodes_to_run()
for spawn_node in spawn_nodes:
print("Spawning job %s" % spawn_node)
# TODO: Inject job template template params as kwargs
kv = {}
job = spawn_node.unified_job_template.create_unified_job(**kv)
print("Started new job %s" % job.id)
spawn_node.job = job
spawn_node.save()
result = job.signal_start(**kv)
def rebuild_graph(message): def rebuild_graph(message):
"""Regenerate the task graph by refreshing known tasks from Tower, purging """Regenerate the task graph by refreshing known tasks from Tower, purging
orphaned running tasks, and creating dependencies for new tasks before orphaned running tasks, and creating dependencies for new tasks before
@@ -182,6 +297,8 @@ def rebuild_graph(message):
logger.warn("Ignoring celery task inspector") logger.warn("Ignoring celery task inspector")
active_task_queues = None active_task_queues = None
do_spawn_workflow_jobs()
all_sorted_tasks = get_tasks() all_sorted_tasks = get_tasks()
if not len(all_sorted_tasks): if not len(all_sorted_tasks):
return None return None
@@ -196,6 +313,7 @@ def rebuild_graph(message):
# as a whole that celery appears to be down. # as a whole that celery appears to be down.
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
return None return None
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks)
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0034_auto_20160830_1716'),
]
operations = [
migrations.AlterField(
model_name='workflownode',
name='workflow_job_template',
field=models.ForeignKey(related_name='workflow_nodes', default=None, blank=True, to='main.WorkflowJobTemplate', null=True),
),
]

View File

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0035_auto_20160831_2008'),
]
operations = [
migrations.AddField(
model_name='workflownode',
name='workflow_job',
field=models.ForeignKey(related_name='workflow_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.WorkflowJob', null=True),
),
migrations.AlterField(
model_name='workflownode',
name='job',
field=models.ForeignKey(related_name='unified_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True),
),
]

View File

@@ -29,9 +29,13 @@ class WorkflowNode(CreatedModifiedModel):
) )
''' '''
# TODO: Ensure the API forces workflow_job_template being set
workflow_job_template = models.ForeignKey( workflow_job_template = models.ForeignKey(
'WorkflowJobTemplate', 'WorkflowJobTemplate',
related_name='workflow_nodes', related_name='workflow_nodes',
blank=True,
null=True,
default=None,
on_delete=models.CASCADE, on_delete=models.CASCADE,
) )
unified_job_template = models.ForeignKey( unified_job_template = models.ForeignKey(
@@ -60,9 +64,17 @@ class WorkflowNode(CreatedModifiedModel):
blank=True, blank=True,
symmetrical=False, symmetrical=False,
) )
workflow_job = models.ForeignKey(
'WorkflowJob',
related_name='workflow_job_nodes',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
job = models.ForeignKey( job = models.ForeignKey(
'UnifiedJob', 'UnifiedJob',
related_name='workflow_job_nodes', related_name='unified_job_nodes',
blank=True, blank=True,
null=True, null=True,
default=None, default=None,
@@ -143,7 +155,9 @@ class WorkflowJobInheritNodesMixin(object):
for old_node in old_nodes: for old_node in old_nodes:
new_node = WorkflowNode.objects.get(id=old_node.pk) new_node = WorkflowNode.objects.get(id=old_node.pk)
new_node.job = self new_node.workflow_job = self
new_node.job = None
new_node.workflow_job_template = None
new_node.pk = None new_node.pk = None
new_node.save() new_node.save()
new_nodes.append(new_node) new_nodes.append(new_node)

View File

@@ -1663,41 +1663,23 @@ class RunWorkflowJob(BaseTask):
model = WorkflowJob model = WorkflowJob
def run(self, pk, **kwargs): def run(self, pk, **kwargs):
from awx.main.management.commands.run_task_system import WorkflowDAG
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
pass
instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.socketio_emit_status("running") instance.socketio_emit_status("running")
status, rc, tb = 'error', None, ''
output_replacements = [] # FIXME: Detect workflow run completion
try: while True:
if instance.cancel_flag: dag = WorkflowDAG(instance)
instance = self.update_model(instance.pk, status='canceled') print("Deciding if workflow is done")
if instance.status != 'running': if dag.is_workflow_done():
if hasattr(settings, 'CELERY_UNIT_TEST'): # TODO: update with accurate finish status (i.e. canceled, error, etc.)
return instance = self.update_model(instance.pk, status='success')
else: print("Workflow IS done")
# Stop the task chain and prevent starting the job if it has return
# already been canceled. time.sleep(1)
instance = self.update_model(pk) # TODO: Handle cancel
status = instance.status
raise RuntimeError('not starting %s task' % instance.status)
#status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle)
# TODO: Do the workflow logic here
except Exception:
if status != 'canceled':
tb = traceback.format_exc()
status = 'successful'
instance = self.update_model(pk, status=status, result_traceback=tb)
instance.socketio_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
if status == 'canceled':
raise Exception("Task %s(pk:%s) was canceled (rc=%s)" % (str(self.model.__class__), str(pk), str(rc)))
else:
raise Exception("Task %s(pk:%s) encountered an error (rc=%s)" % (str(self.model.__class__), str(pk), str(rc)))
if not hasattr(settings, 'CELERY_UNIT_TEST'):
self.signal_finished(pk)

View File

@@ -177,4 +177,3 @@ def mk_workflow_node(workflow_job_template=None, unified_job_template=None,
if persisted: if persisted:
workflow_node.save() workflow_node.save()
return workflow_node return workflow_node

View File

@@ -366,14 +366,10 @@ def create_workflow_job_template(name, persisted=True, **kwargs):
if type(i) is Job: if type(i) is Job:
jobs[i.pk] = i jobs[i.pk] = i
else: else:
# Fill in default survey answers # TODO: Create the job
job_extra_vars = {} raise RuntimeError("Currently, only already created jobs are supported")
for question in spec['spec']:
job_extra_vars[question['variable']] = question['default']
jobs[i] = mk_job(job_template=wfjt, extra_vars=job_extra_vars,
persisted=persisted)
return Objects(workflow_job_template=wfjt, return Objects(workflow_job_template=wfjt,
#jobs=jobs, jobs=jobs,
survey=spec,) survey=spec,)