From 5287e5c111d92f8bdb574d06508ebd068bc0e5fe Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Mon, 9 Oct 2017 15:04:30 -0400 Subject: [PATCH] Implement workflow job failure Relates #264. This PR proposed and implemented a way of defining workflow failure state: A workflow job fails if one of the conditions below satisfies. * At least one node runs into states `canceled` or `error`. * At least one leaf node runs into states `failed`, but no child node is spawned to run (no error handler). Signed-off-by: Aaron Tan --- awx/main/models/workflow.py | 3 - awx/main/scheduler/dag_workflow.py | 46 +++++---- awx/main/scheduler/task_manager.py | 14 +-- .../tests/functional/models/test_workflow.py | 94 +++++++++++-------- docs/workflow.md | 4 + 5 files changed, 94 insertions(+), 67 deletions(-) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 5b4e8c2869..30709db53b 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -468,9 +468,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def _get_unified_job_template_class(cls): return WorkflowJobTemplate - def _has_failed(self): - return False - def socketio_emit_data(self): return {} diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 5fc716584a..aa1f8bd957 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -62,6 +62,7 @@ class WorkflowDAG(SimpleDAG): def is_workflow_done(self): root_nodes = self.get_root_nodes() nodes = root_nodes + is_failed = False for index, n in enumerate(nodes): obj = n['node_object'] @@ -69,24 +70,29 @@ class WorkflowDAG(SimpleDAG): if obj.unified_job_template is None: continue - if not job: - return False - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status in ['canceled', 'error']: - continue - elif job.status not in ['failed', 'successful']: - return False - elif job.status == 'failed': - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status == 'successful': - children_success = self.get_dependencies(obj, 'success_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_success + children_always - nodes.extend(children_all) - return True + elif not job: + return False, False + children_success = self.get_dependencies(obj, 'success_nodes') + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + if not is_failed and job.status != 'successful': + children_all = children_success + children_failed + children_always + for child in children_all: + if child['node_object'].job: + break + else: + is_failed = True if children_all else job.status in ['failed', 'canceled', 'error'] + + if job.status in ['canceled', 'error']: + continue + elif job.status == 'failed': + nodes.extend(children_failed + children_always) + elif job.status == 'successful': + nodes.extend(children_success + children_always) + else: + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + return False, False + return True, is_failed diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 7e012b60b3..0b4662bba3 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -218,12 +218,12 @@ class TaskManager(): workflow_job.save() dag.cancel_node_jobs() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) - elif dag.is_workflow_done(): + else: + is_done, has_failed = dag.is_workflow_done() + if not is_done: + continue result.append(workflow_job.id) - if workflow_job._has_failed(): - workflow_job.status = 'failed' - else: - workflow_job.status = 'successful' + workflow_job.status = 'failed' if has_failed else 'successful' workflow_job.save() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) return result @@ -362,7 +362,7 @@ class TaskManager(): return False ''' - If the latest project update has a created time == job_created_time-1 + If the latest project update has a created time == job_created_time-1 then consider the project update found. This is so we don't enter an infinite loop of updating the project when cache timeout is 0. ''' @@ -514,7 +514,7 @@ class TaskManager(): return None ''' - Only consider failing tasks on instances for which we obtained a task + Only consider failing tasks on instances for which we obtained a task list from celery for. ''' running_tasks, waiting_tasks = self.get_running_tasks() diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index 5cae3008e0..af6724d8b7 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -4,7 +4,7 @@ import pytest # AWX from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode, WorkflowJobTemplate -from awx.main.models.jobs import Job +from awx.main.models.jobs import JobTemplate, Job from awx.main.models.projects import ProjectUpdate from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -15,9 +15,28 @@ from django.core.exceptions import ValidationError @pytest.mark.django_db class TestWorkflowDAGFunctional(TransactionTestCase): - def workflow_job(self): + def workflow_job(self, states=['new', 'new', 'new', 'new', 'new']): + """ + Workflow topology: + node[0] + /\ + s/ \f + / \ + node[1] node[3] + / \ + s/ \f + / \ + node[2] node[4] + """ wfj = WorkflowJob.objects.create() - nodes = [WorkflowJobNode.objects.create(workflow_job=wfj) for i in range(0, 5)] + jt = JobTemplate.objects.create(name='test-jt') + nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 5)] + for node, state in zip(nodes, states): + if state: + node.job = jt.create_job() + node.job.status = state + node.job.save() + node.save() nodes[0].success_nodes.add(nodes[1]) nodes[1].success_nodes.add(nodes[2]) nodes[0].failure_nodes.add(nodes[3]) @@ -35,6 +54,41 @@ class TestWorkflowDAGFunctional(TransactionTestCase): with self.assertNumQueries(4): dag._init_graph(wfj) + def test_workflow_done(self): + wfj = self.workflow_job(states=['failed', None, None, 'successful', None]) + dag = WorkflowDAG(workflow_job=wfj) + is_done, has_failed = dag.is_workflow_done() + self.assertTrue(is_done) + self.assertFalse(has_failed) + + def test_workflow_fails_for_unfinished_node(self): + wfj = self.workflow_job(states=['error', None, None, None, None]) + dag = WorkflowDAG(workflow_job=wfj) + is_done, has_failed = dag.is_workflow_done() + self.assertTrue(is_done) + self.assertTrue(has_failed) + + def test_workflow_fails_for_no_error_handler(self): + wfj = self.workflow_job(states=['successful', 'failed', None, None, None]) + dag = WorkflowDAG(workflow_job=wfj) + is_done, has_failed = dag.is_workflow_done() + self.assertTrue(is_done) + self.assertTrue(has_failed) + + def test_workflow_fails_leaf(self): + wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None]) + dag = WorkflowDAG(workflow_job=wfj) + is_done, has_failed = dag.is_workflow_done() + self.assertTrue(is_done) + self.assertTrue(has_failed) + + def test_workflow_not_finished(self): + wfj = self.workflow_job(states=['new', None, None, None, None]) + dag = WorkflowDAG(workflow_job=wfj) + is_done, has_failed = dag.is_workflow_done() + self.assertFalse(is_done) + self.assertFalse(has_failed) + @pytest.mark.django_db class TestWorkflowJob: @@ -164,37 +218,3 @@ class TestWorkflowJobTemplate: wfjt2.validate_unique() wfjt2 = WorkflowJobTemplate(name='foo', organization=None) wfjt2.validate_unique() - - -@pytest.mark.django_db -class TestWorkflowJobFailure: - """ - Tests to re-implement if workflow failure status is introduced in - a future Tower version. - """ - @pytest.fixture - def wfj(self): - return WorkflowJob.objects.create(name='test-wf-job') - - def test_workflow_not_failed_unran_job(self, wfj): - """ - Test that an un-ran node will not mark workflow job as failed - """ - WorkflowJobNode.objects.create(workflow_job=wfj) - assert not wfj._has_failed() - - def test_workflow_not_failed_successful_job(self, wfj): - """ - Test that a sucessful node will not mark workflow job as failed - """ - job = Job.objects.create(name='test-job', status='successful') - WorkflowJobNode.objects.create(workflow_job=wfj, job=job) - assert not wfj._has_failed() - - def test_workflow_not_failed_failed_job_but_okay(self, wfj): - """ - Test that a failed node will not mark workflow job as failed - """ - job = Job.objects.create(name='test-job', status='failed') - WorkflowJobNode.objects.create(workflow_job=wfj, job=job) - assert not wfj._has_failed() diff --git a/docs/workflow.md b/docs/workflow.md index 3bf94b1bef..d525cee3a7 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -48,6 +48,10 @@ Workflow job summary: Starting from Tower 3.2, Workflow jobs support simultaneous job runs just like that of ordinary jobs. It is controlled by `allow_simultaneous` field of underlying workflow job template. By default, simultaneous workflow job runs are disabled and users should be prudent in enabling this functionality. Because the performance boost of simultaneous workflow runs will only manifest when a large portion of jobs contained by a workflow allow simultaneous runs. Otherwise it is expected to have some long-running workflow jobs since its spawned jobs can be in pending state for a long time. +Before Tower 3.3, the 'failed' status of workflow job is not defined. Starting from 3.3 we define a finished workflow job to fail, if at least one of the conditions below satisfies: +* At least one node runs into states `canceled` or `error`. +* At least one leaf node runs into states `failed`, but no child node is spawned to run (no error handler). + ### Workflow Copy and Relaunch Other than the normal way of creating workflow job templates, it is also possible to copy existing workflow job templates. The resulting new workflow job template will be mostly identical to the original, except for `name` field which will be appended a text to indicate it's a copy.