diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 92e8630148..a3083b0bb1 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -918,6 +918,17 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): ScheduleWorkflowManager().schedule() return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) + def cancel(self, job_explanation=None, is_chain=False): + # WorkflowApprovals have no dispatcher process (they wait for human + # input) and are excluded from TaskManager processing, so the base + # cancel() would only set cancel_flag without ever transitioning the + # status. We call super() for the flag, then transition directly. + has_already_canceled = bool(self.status == 'canceled') + super().cancel(job_explanation=job_explanation, is_chain=is_chain) + if self.status != 'canceled' and not has_already_canceled: + self.status = 'canceled' + self.save(update_fields=['status']) + def signal_start(self, **kwargs): can_start = super(WorkflowApproval, self).signal_start(**kwargs) self.started = self.created diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index c2afba68ad..644c799b54 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -122,8 +122,11 @@ class WorkflowDAG(SimpleDAG): if not job: continue elif job.can_cancel: - cancel_finished = False job.cancel() + # If the job is not yet in a terminal state after .cancel(), + # the TaskManager still needs to process it. + if job.status not in ('successful', 'failed', 'canceled', 'error'): + cancel_finished = False return cancel_finished def is_workflow_done(self): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 5fc1c0b51c..d0da4e5f46 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -196,6 +196,10 @@ class WorkflowManager(TaskBase): workflow_job.start_args = '' # blank field to remove encrypted passwords workflow_job.save(update_fields=['status', 'start_args']) status_changed = True + else: + # Speed-up: schedule the task manager so it can process the + # canceled pending jobs without waiting for the next cycle. + ScheduleTaskManager().schedule() else: dnr_nodes = dag.mark_dnr_nodes() WorkflowJobNode.objects.bulk_update(dnr_nodes, ['do_not_run']) @@ -443,17 +447,29 @@ class TaskManager(TaskBase): self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig def process_job_dep_failures(self, task): - """If job depends on a job that has failed, mark as failed and handle misc stuff.""" + """If job depends on a job that has failed or been canceled, mark as failed. + + Returns True if a dep failure was found, False otherwise. + """ for dep in task.dependent_jobs.all(): - # if we detect a failed or error dependency, go ahead and fail this task. - if dep.status in ("error", "failed"): + # if we detect a failed, error, or canceled dependency, go ahead and fail this task. + if dep.status in ("error", "failed", "canceled"): task.status = 'failed' - logger.warning(f'Previous task failed task: {task.id} dep: {dep.id} task manager') - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) + if dep.status == 'canceled': + logger.warning(f'Previous task canceled, failing task: {task.id} dep: {dep.id} task manager') + task.job_explanation = 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + ScheduleWorkflowManager().schedule() # speedup for dependency chains in workflow, on workflow cancel + else: + logger.warning(f'Previous task failed, failing task: {task.id} dep: {dep.id} task manager') + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) task.save(update_fields=['status', 'job_explanation']) task.websocket_emit_status('failed') self.pre_start_failed.append(task.id) @@ -545,8 +561,17 @@ class TaskManager(TaskBase): logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early") break - has_failed = self.process_job_dep_failures(task) - if has_failed: + if task.cancel_flag: + logger.debug(f"Canceling pending task {task.log_format} because cancel_flag is set") + task.status = 'canceled' + task.job_explanation = gettext_noop("This job was canceled before it started.") + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('canceled') + self.pre_start_failed.append(task.id) + ScheduleWorkflowManager().schedule() + continue + + if self.process_job_dep_failures(task): continue blocked_by = self.job_blocked_by(task) diff --git a/awx/main/tests/functional/task_management/test_cancel_dependency.py b/awx/main/tests/functional/task_management/test_cancel_dependency.py new file mode 100644 index 0000000000..f8fb82a9fb --- /dev/null +++ b/awx/main/tests/functional/task_management/test_cancel_dependency.py @@ -0,0 +1,274 @@ +# Generated by Claude Opus 4.6 (claude-opus-4-6) +# +# Test file for cancel + dependency chain behavior and workflow cancel propagation. +# +# These tests verify: +# +# 1. TaskManager.process_job_dep_failures() correctly distinguishes canceled vs +# failed dependencies in the job_explanation message. +# +# 2. TaskManager.process_pending_tasks() transitions pending jobs with +# cancel_flag=True directly to canceled status. +# +# 3. WorkflowManager + TaskManager together cancel all spawned jobs in a +# workflow and finalize the workflow as canceled. + +import pytest +from unittest import mock + +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager +from awx.main.models import JobTemplate, ProjectUpdate, WorkflowApproval, WorkflowJobTemplate +from awx.main.models.workflow import WorkflowApprovalTemplate +from . import create_job + + +@pytest.fixture +def scm_on_launch_objects(job_template_factory): + """Create a job template with a project configured for scm_update_on_launch.""" + objects = job_template_factory( + 'jt', + organization='org1', + project='proj', + inventory='inv', + credential='cred', + ) + p = objects.project + p.scm_update_on_launch = True + p.scm_update_cache_timeout = 0 + p.save(skip_update=True) + return objects + + +def _create_job_with_dependency(objects): + """Create a pending job and run DependencyManager to produce its project update dependency. + + Returns (job, project_update). + """ + j = create_job(objects.job_template, dependencies_processed=False) + with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'): + DependencyManager().schedule() + assert j.dependent_jobs.count() == 1 + pu = j.dependent_jobs.first() + assert isinstance(pu.get_real_instance(), ProjectUpdate) + return j, pu + + +@pytest.mark.django_db +class TestCanceledDependencyFailsBlockedJob: + """When a dependency project update is canceled or failed, the task manager + should fail the blocked job via process_job_dep_failures.""" + + def test_canceled_dependency_fails_blocked_job(self, controlplane_instance_group, scm_on_launch_objects): + """A canceled dependency causes the blocked job to be failed with + a 'Previous Task Canceled' explanation.""" + j, pu = _create_job_with_dependency(scm_on_launch_objects) + + ProjectUpdate.objects.filter(pk=pu.pk).update(status='canceled', cancel_flag=True) + + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + + j.refresh_from_db() + assert j.status == 'failed' + assert 'Previous Task Canceled' in j.job_explanation + + def test_failed_dependency_fails_blocked_job(self, controlplane_instance_group, scm_on_launch_objects): + """A failed dependency causes the blocked job to be failed with + a 'Previous Task Failed' explanation.""" + j, pu = _create_job_with_dependency(scm_on_launch_objects) + + ProjectUpdate.objects.filter(pk=pu.pk).update(status='failed') + + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + + j.refresh_from_db() + assert j.status == 'failed' + assert 'Previous Task Failed' in j.job_explanation + + +@pytest.mark.django_db +class TestTaskManagerCancelsPendingJobsWithCancelFlag: + """When the task manager encounters pending jobs that have cancel_flag set, + it should transition them directly to canceled status.""" + + def test_pending_job_with_cancel_flag_is_canceled(self, controlplane_instance_group, job_template_factory): + """A pending job with cancel_flag=True is transitioned to canceled + by the task manager without being started.""" + objects = job_template_factory( + 'jt', + organization='org1', + project='proj', + inventory='inv', + credential='cred', + ) + j = create_job(objects.job_template) + j.cancel_flag = True + j.save(update_fields=['cancel_flag']) + + with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start: + TaskManager().schedule() + + j.refresh_from_db() + assert j.status == 'canceled' + assert 'canceled before it started' in j.job_explanation + assert not mock_start.called + + def test_pending_job_without_cancel_flag_is_not_canceled(self, controlplane_instance_group, job_template_factory): + """A normal pending job without cancel_flag should not be canceled + by the task manager (sanity check).""" + objects = job_template_factory( + 'jt', + organization='org1', + project='proj', + inventory='inv', + credential='cred', + ) + j = create_job(objects.job_template) + + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + + j.refresh_from_db() + assert j.status != 'canceled' + + def test_multiple_pending_jobs_with_cancel_flag_bulk_canceled(self, controlplane_instance_group, job_template_factory): + """Multiple pending jobs with cancel_flag=True are all transitioned + to canceled in a single task manager cycle.""" + objects = job_template_factory( + 'jt', + organization='org1', + project='proj', + inventory='inv', + credential='cred', + ) + jt = objects.job_template + jt.allow_simultaneous = True + jt.save() + + jobs = [] + for _ in range(3): + j = create_job(jt) + j.cancel_flag = True + j.save(update_fields=['cancel_flag']) + jobs.append(j) + + with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start: + TaskManager().schedule() + + for j in jobs: + j.refresh_from_db() + assert j.status == 'canceled', f"Job {j.id} should be canceled but is {j.status}" + assert 'canceled before it started' in j.job_explanation + assert not mock_start.called + + +@pytest.mark.django_db +class TestWorkflowCancelFinalizesWorkflow: + """When a workflow job is canceled, the WorkflowManager cancels spawned child + jobs (setting cancel_flag), the TaskManager transitions those pending jobs to + canceled, and a final WorkflowManager pass finalizes the workflow as canceled.""" + + def test_cancel_workflow_with_parallel_nodes(self, inventory, project, controlplane_instance_group): + """Create a workflow with parallel nodes, cancel it after one job is + running, and verify all jobs and the workflow reach canceled status.""" + jt = JobTemplate.objects.create(allow_simultaneous=False, inventory=inventory, project=project, playbook='helloworld.yml') + wfjt = WorkflowJobTemplate.objects.create(name='test-cancel-wf') + for _ in range(4): + wfjt.workflow_nodes.create(unified_job_template=jt) + + wj = wfjt.create_unified_job() + wj.signal_start() + + # TaskManager transitions workflow job to running via start_task + TaskManager().schedule() + wj.refresh_from_db() + assert wj.status == 'running' + + # WorkflowManager spawns jobs for all 4 nodes + WorkflowManager().schedule() + assert jt.jobs.count() == 4 + + # Simulate one job running (blocking the others via allow_simultaneous=False) + first_job = jt.jobs.order_by('created').first() + first_job.status = 'running' + first_job.celery_task_id = 'fake-task-id' + first_job.controller_node = 'test-node' + first_job.save(update_fields=['status', 'celery_task_id', 'controller_node']) + + # Cancel the workflow + wj.cancel_flag = True + wj.save(update_fields=['cancel_flag']) + + # WorkflowManager sees cancel_flag, calls cancel_node_jobs() which sets + # cancel_flag on all child jobs + with mock.patch('awx.main.models.unified_jobs.UnifiedJob.cancel_dispatcher_process'): + WorkflowManager().schedule() + + # The running job won't actually stop in tests (no dispatcher), simulate it + first_job.status = 'canceled' + first_job.save(update_fields=['status']) + + # TaskManager processes remaining pending jobs with cancel_flag set + with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start: + DependencyManager().schedule() + TaskManager().schedule() + + for job in jt.jobs.all(): + job.refresh_from_db() + assert job.status == 'canceled', f"Job {job.id} should be canceled but is {job.status}" + assert not mock_start.called + + # Final WorkflowManager pass finalizes the workflow + WorkflowManager().schedule() + wj.refresh_from_db() + assert wj.status == 'canceled' + + def test_cancel_workflow_with_approval_node(self, controlplane_instance_group): + """Create a workflow with a pending approval node and a downstream job + node. Cancel the workflow and verify the approval is directly canceled + by the WorkflowManager (since approvals are excluded from TaskManager), + the downstream node is marked do_not_run, and the workflow finalizes.""" + approval_template = WorkflowApprovalTemplate.objects.create(name='test-approval', timeout=0) + wfjt = WorkflowJobTemplate.objects.create(name='test-cancel-approval-wf') + approval_node = wfjt.workflow_nodes.create(unified_job_template=approval_template) + + # Add a downstream node (just another approval to keep it simple) + downstream_template = WorkflowApprovalTemplate.objects.create(name='test-downstream', timeout=0) + downstream_node = wfjt.workflow_nodes.create(unified_job_template=downstream_template) + approval_node.success_nodes.add(downstream_node) + + wj = wfjt.create_unified_job() + wj.signal_start() + + # TaskManager transitions workflow to running + TaskManager().schedule() + wj.refresh_from_db() + assert wj.status == 'running' + + # WorkflowManager spawns the approval (root node only, downstream waits) + WorkflowManager().schedule() + assert WorkflowApproval.objects.filter(unified_job_node__workflow_job=wj).count() == 1 + + approval_job = WorkflowApproval.objects.get(unified_job_node__workflow_job=wj) + assert approval_job.status == 'pending' + + # Cancel the workflow + wj.cancel_flag = True + wj.save(update_fields=['cancel_flag']) + + # WorkflowManager should cancel the approval directly and mark + # the downstream node as do_not_run + WorkflowManager().schedule() + + approval_job.refresh_from_db() + assert approval_job.status == 'canceled', f"Approval should be canceled directly by WorkflowManager but is {approval_job.status}" + + # Downstream node should be marked do_not_run with no job spawned + downstream_wj_node = wj.workflow_nodes.get(unified_job_template=downstream_template) + assert downstream_wj_node.do_not_run is True + assert downstream_wj_node.job is None + + # Workflow should finalize as canceled in the same pass + wj.refresh_from_db() + assert wj.status == 'canceled'