diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 42489bc1b0..7ef6efb74a 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2229,13 +2229,19 @@ class WorkflowNodeSerializer(BaseSerializer): class Meta: model = WorkflowNode - fields = ('id', 'url', 'related', 'workflow_job_template', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',) + # TODO: workflow_job and job read-only + fields = ('id', 'url', 'related', 'workflow_job_template', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes', 'job',) def get_related(self, obj): res = super(WorkflowNodeSerializer, self).get_related(obj) - res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,)) + if obj.workflow_job_template: + res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,)) if obj.unified_job_template: res['unified_job_template'] = obj.unified_job_template.get_absolute_url() + if obj.job: + res['job'] = reverse('api:job_detail', args=(obj.job.pk,)) + if obj.workflow_job: + res['workflow_job'] = reverse('api:workflow_job_detail', args=(obj.workflow_job.pk,)) res['success_nodes'] = reverse('api:workflow_node_success_nodes_list', args=(obj.pk,)) res['failure_nodes'] = reverse('api:workflow_node_failure_nodes_list', args=(obj.pk,)) res['always_nodes'] = reverse('api:workflow_node_always_nodes_list', args=(obj.pk,)) diff --git a/awx/api/views.py b/awx/api/views.py index 69a38fde2f..ce63713707 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2627,8 +2627,6 @@ class WorkflowNodeDetail(RetrieveUpdateDestroyAPIView): model = WorkflowNode serializer_class = WorkflowNodeDetailSerializer - parent_model = WorkflowJobTemplate - relationship = 'workflow_job_template' new_in_310 = True class WorkflowNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView): @@ -2748,7 +2746,7 @@ class WorkflowJobWorkflowNodesList(SubListAPIView): always_allow_superuser = True # TODO: RBAC parent_model = WorkflowJob relationship = 'workflow_job_nodes' - parent_key = 'job' + parent_key = 'workflow_job' class SystemJobTemplateList(ListAPIView): diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 0cdc3c1556..3e11b3511d 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -54,6 +54,8 @@ class SimpleDAG(object): type_str = "Inventory" elif type(obj) == ProjectUpdate: type_str = "Project" + elif type(obj) == WorkflowJob: + type_str = "Workflow" else: type_str = "Unknown" type_str += "%s" % str(obj.id) @@ -68,10 +70,11 @@ class SimpleDAG(object): short_string_obj(n['node_object']), "red" if n['node_object'].status == 'running' else "black", ) - for from_node, to_node in self.edges: - doc += "%s -> %s;\n" % ( + for from_node, to_node, label in self.edges: + doc += "%s -> %s [ label=\"%s\" ];\n" % ( short_string_obj(self.nodes[from_node]['node_object']), short_string_obj(self.nodes[to_node]['node_object']), + label, ) doc += "}\n" gv_file = open('/tmp/graph.gv', 'w') @@ -82,16 +85,16 @@ class SimpleDAG(object): if self.find_ord(obj) is None: self.nodes.append(dict(node_object=obj, metadata=metadata)) - def add_edge(self, from_obj, to_obj): + def add_edge(self, from_obj, to_obj, label=None): from_obj_ord = self.find_ord(from_obj) to_obj_ord = self.find_ord(to_obj) if from_obj_ord is None or to_obj_ord is None: raise LookupError("Object not found") - self.edges.append((from_obj_ord, to_obj_ord)) + self.edges.append((from_obj_ord, to_obj_ord, label)) def add_edges(self, edgelist): for edge_pair in edgelist: - self.add_edge(edge_pair[0], edge_pair[1]) + self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) def find_ord(self, obj): for idx in range(len(self.nodes)): @@ -114,20 +117,28 @@ class SimpleDAG(object): return "workflow_job" return "unknown" - def get_dependencies(self, obj): + def get_dependencies(self, obj, label=None): antecedents = [] this_ord = self.find_ord(obj) - for node, dep in self.edges: - if node == this_ord: - antecedents.append(self.nodes[dep]) + for node, dep, lbl in self.edges: + if label: + if node == this_ord and lbl == label: + antecedents.append(self.nodes[dep]) + else: + if node == this_ord: + antecedents.append(self.nodes[dep]) return antecedents - def get_dependents(self, obj): + def get_dependents(self, obj, label=None): decendents = [] this_ord = self.find_ord(obj) - for node, dep in self.edges: - if dep == this_ord: - decendents.append(self.nodes[node]) + for node, dep, lbl in self.edges: + if label: + if dep == this_ord and lbl == label: + decendents.append(self.nodes[node]) + else: + if dep == this_ord: + decendents.append(self.nodes[node]) return decendents def get_leaf_nodes(self): @@ -144,6 +155,83 @@ class SimpleDAG(object): roots.append(n) return roots +class WorkflowDAG(SimpleDAG): + def __init__(self, workflow_job=None): + super(WorkflowDAG, self).__init__() + if workflow_job: + self._init_graph(workflow_job) + + def _init_graph(self, workflow_job): + workflow_nodes = workflow_job.workflow_job_nodes.all() + for workflow_node in workflow_nodes: + self.add_node(workflow_node) + + for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: + for workflow_node in workflow_nodes: + related_nodes = getattr(workflow_node, node_type).all() + for related_node in related_nodes: + self.add_edge(workflow_node, related_node, node_type) + + def bfs_nodes_to_run(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + nodes_found = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + print("\t\tExamining node %s job %s" % (obj, job)) + + if not job: + print("\t\tNo job for node %s" % obj) + nodes_found.append(n) + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + print("\t\tJob status not 'failed' 'error' nor 'successful' %s" % job.status) + continue + elif job.status in ['failed', 'error']: + print("\t\tJob status is failed or error %s" % job.status) + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + print("\t\tJob status is successful %s" % job.status) + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return [n['node_object'] for n in nodes_found] + + def is_workflow_done(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + return False + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + return False + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return True + def get_tasks(): """Fetch all Tower tasks that are relevant to the task management system. @@ -166,6 +254,33 @@ def get_tasks(): key=lambda task: task.created) return all_actions +def get_running_workflow_jobs(): + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + +def do_spawn_workflow_jobs(): + workflow_jobs = get_running_workflow_jobs() + print("Set of workflow jobs to process %s" % workflow_jobs) + for workflow_job in workflow_jobs: + print("Building the dag") + dag = WorkflowDAG(workflow_job) + print("Imported the workflow job dag") + for n in dag.nodes: + print("\tWorkflow dag node %s" % n) + for f, to, label in dag.edges: + print("\tWorkflow dag edge <%s,%s,%s>" % (f, to, label)) + spawn_nodes = dag.bfs_nodes_to_run() + for spawn_node in spawn_nodes: + print("Spawning job %s" % spawn_node) + # TODO: Inject job template template params as kwargs + kv = {} + job = spawn_node.unified_job_template.create_unified_job(**kv) + print("Started new job %s" % job.id) + spawn_node.job = job + spawn_node.save() + result = job.signal_start(**kv) + def rebuild_graph(message): """Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, and creating dependencies for new tasks before @@ -182,6 +297,8 @@ def rebuild_graph(message): logger.warn("Ignoring celery task inspector") active_task_queues = None + do_spawn_workflow_jobs() + all_sorted_tasks = get_tasks() if not len(all_sorted_tasks): return None @@ -196,6 +313,7 @@ def rebuild_graph(message): # as a whole that celery appears to be down. if not hasattr(settings, 'CELERY_UNIT_TEST'): return None + running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) diff --git a/awx/main/migrations/0035_auto_20160831_2008.py b/awx/main/migrations/0035_auto_20160831_2008.py new file mode 100644 index 0000000000..6297a29824 --- /dev/null +++ b/awx/main/migrations/0035_auto_20160831_2008.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0034_auto_20160830_1716'), + ] + + operations = [ + migrations.AlterField( + model_name='workflownode', + name='workflow_job_template', + field=models.ForeignKey(related_name='workflow_nodes', default=None, blank=True, to='main.WorkflowJobTemplate', null=True), + ), + ] diff --git a/awx/main/migrations/0036_auto_20160831_2052.py b/awx/main/migrations/0036_auto_20160831_2052.py new file mode 100644 index 0000000000..ad16af0e4a --- /dev/null +++ b/awx/main/migrations/0036_auto_20160831_2052.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0035_auto_20160831_2008'), + ] + + operations = [ + migrations.AddField( + model_name='workflownode', + name='workflow_job', + field=models.ForeignKey(related_name='workflow_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.WorkflowJob', null=True), + ), + migrations.AlterField( + model_name='workflownode', + name='job', + field=models.ForeignKey(related_name='unified_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True), + ), + ] diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index c77ed0c43d..cc764e48af 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -29,9 +29,13 @@ class WorkflowNode(CreatedModifiedModel): ) ''' + # TODO: Ensure the API forces workflow_job_template being set workflow_job_template = models.ForeignKey( 'WorkflowJobTemplate', related_name='workflow_nodes', + blank=True, + null=True, + default=None, on_delete=models.CASCADE, ) unified_job_template = models.ForeignKey( @@ -60,9 +64,17 @@ class WorkflowNode(CreatedModifiedModel): blank=True, symmetrical=False, ) + workflow_job = models.ForeignKey( + 'WorkflowJob', + related_name='workflow_job_nodes', + blank=True, + null=True, + default=None, + on_delete=models.SET_NULL, + ) job = models.ForeignKey( 'UnifiedJob', - related_name='workflow_job_nodes', + related_name='unified_job_nodes', blank=True, null=True, default=None, @@ -143,7 +155,9 @@ class WorkflowJobInheritNodesMixin(object): for old_node in old_nodes: new_node = WorkflowNode.objects.get(id=old_node.pk) - new_node.job = self + new_node.workflow_job = self + new_node.job = None + new_node.workflow_job_template = None new_node.pk = None new_node.save() new_nodes.append(new_node) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 25479ae5ca..713874ba3a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1663,41 +1663,23 @@ class RunWorkflowJob(BaseTask): model = WorkflowJob def run(self, pk, **kwargs): + from awx.main.management.commands.run_task_system import WorkflowDAG ''' Run the job/task and capture its output. ''' + pass instance = self.update_model(pk, status='running', celery_task_id=self.request.id) - instance.socketio_emit_status("running") - status, rc, tb = 'error', None, '' - output_replacements = [] - try: - if instance.cancel_flag: - instance = self.update_model(instance.pk, status='canceled') - if instance.status != 'running': - if hasattr(settings, 'CELERY_UNIT_TEST'): - return - else: - # Stop the task chain and prevent starting the job if it has - # already been canceled. - instance = self.update_model(pk) - status = instance.status - raise RuntimeError('not starting %s task' % instance.status) - #status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) - # TODO: Do the workflow logic here - except Exception: - if status != 'canceled': - tb = traceback.format_exc() - status = 'successful' - instance = self.update_model(pk, status=status, result_traceback=tb) - instance.socketio_emit_status(status) - if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): - # Raising an exception will mark the job as 'failed' in celery - # and will stop a task chain from continuing to execute - if status == 'canceled': - raise Exception("Task %s(pk:%s) was canceled (rc=%s)" % (str(self.model.__class__), str(pk), str(rc))) - else: - raise Exception("Task %s(pk:%s) encountered an error (rc=%s)" % (str(self.model.__class__), str(pk), str(rc))) - if not hasattr(settings, 'CELERY_UNIT_TEST'): - self.signal_finished(pk) + + # FIXME: Detect workflow run completion + while True: + dag = WorkflowDAG(instance) + print("Deciding if workflow is done") + if dag.is_workflow_done(): + # TODO: update with accurate finish status (i.e. canceled, error, etc.) + instance = self.update_model(instance.pk, status='success') + print("Workflow IS done") + return + time.sleep(1) + # TODO: Handle cancel diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py index cdbfac6531..1f32d76739 100644 --- a/awx/main/tests/factories/fixtures.py +++ b/awx/main/tests/factories/fixtures.py @@ -177,4 +177,3 @@ def mk_workflow_node(workflow_job_template=None, unified_job_template=None, if persisted: workflow_node.save() return workflow_node - diff --git a/awx/main/tests/factories/tower.py b/awx/main/tests/factories/tower.py index 3813bf2faa..953cb2d26e 100644 --- a/awx/main/tests/factories/tower.py +++ b/awx/main/tests/factories/tower.py @@ -366,14 +366,10 @@ def create_workflow_job_template(name, persisted=True, **kwargs): if type(i) is Job: jobs[i.pk] = i else: - # Fill in default survey answers - job_extra_vars = {} - for question in spec['spec']: - job_extra_vars[question['variable']] = question['default'] - jobs[i] = mk_job(job_template=wfjt, extra_vars=job_extra_vars, - persisted=persisted) + # TODO: Create the job + raise RuntimeError("Currently, only already created jobs are supported") return Objects(workflow_job_template=wfjt, - #jobs=jobs, + jobs=jobs, survey=spec,)