Fix job cancel chain bugs (#16325)

* Fix job cancel chain bugs

* Early relief valve for canceled jobs, ATF related changes

* Add test and fix for approval nodes as well

* Revert unwanted change

* Refactor workflow approval nodes to make it more clean

* Revert data structure changes

* Delete local utility file

* Review comment addressing

* Use canceled status in websocket

* Delete slop

* Add agent marker

* Bugbot comment about status websocket mismatch
This commit is contained in:
Alan Rominger
2026-03-18 12:08:27 -04:00
committed by GitHub
parent 679e48cbe8
commit 0aaca1bffd
4 changed files with 325 additions and 12 deletions

View File

@@ -918,6 +918,17 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
ScheduleWorkflowManager().schedule()
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
def cancel(self, job_explanation=None, is_chain=False):
# WorkflowApprovals have no dispatcher process (they wait for human
# input) and are excluded from TaskManager processing, so the base
# cancel() would only set cancel_flag without ever transitioning the
# status. We call super() for the flag, then transition directly.
has_already_canceled = bool(self.status == 'canceled')
super().cancel(job_explanation=job_explanation, is_chain=is_chain)
if self.status != 'canceled' and not has_already_canceled:
self.status = 'canceled'
self.save(update_fields=['status'])
def signal_start(self, **kwargs):
can_start = super(WorkflowApproval, self).signal_start(**kwargs)
self.started = self.created

View File

@@ -122,8 +122,11 @@ class WorkflowDAG(SimpleDAG):
if not job:
continue
elif job.can_cancel:
cancel_finished = False
job.cancel()
# If the job is not yet in a terminal state after .cancel(),
# the TaskManager still needs to process it.
if job.status not in ('successful', 'failed', 'canceled', 'error'):
cancel_finished = False
return cancel_finished
def is_workflow_done(self):

View File

@@ -196,6 +196,10 @@ class WorkflowManager(TaskBase):
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
status_changed = True
else:
# Speed-up: schedule the task manager so it can process the
# canceled pending jobs without waiting for the next cycle.
ScheduleTaskManager().schedule()
else:
dnr_nodes = dag.mark_dnr_nodes()
WorkflowJobNode.objects.bulk_update(dnr_nodes, ['do_not_run'])
@@ -443,17 +447,29 @@ class TaskManager(TaskBase):
self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig
def process_job_dep_failures(self, task):
"""If job depends on a job that has failed, mark as failed and handle misc stuff."""
"""If job depends on a job that has failed or been canceled, mark as failed.
Returns True if a dep failure was found, False otherwise.
"""
for dep in task.dependent_jobs.all():
# if we detect a failed or error dependency, go ahead and fail this task.
if dep.status in ("error", "failed"):
# if we detect a failed, error, or canceled dependency, go ahead and fail this task.
if dep.status in ("error", "failed", "canceled"):
task.status = 'failed'
logger.warning(f'Previous task failed task: {task.id} dep: {dep.id} task manager')
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
if dep.status == 'canceled':
logger.warning(f'Previous task canceled, failing task: {task.id} dep: {dep.id} task manager')
task.job_explanation = 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
ScheduleWorkflowManager().schedule() # speedup for dependency chains in workflow, on workflow cancel
else:
logger.warning(f'Previous task failed, failing task: {task.id} dep: {dep.id} task manager')
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
self.pre_start_failed.append(task.id)
@@ -545,8 +561,17 @@ class TaskManager(TaskBase):
logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early")
break
has_failed = self.process_job_dep_failures(task)
if has_failed:
if task.cancel_flag:
logger.debug(f"Canceling pending task {task.log_format} because cancel_flag is set")
task.status = 'canceled'
task.job_explanation = gettext_noop("This job was canceled before it started.")
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('canceled')
self.pre_start_failed.append(task.id)
ScheduleWorkflowManager().schedule()
continue
if self.process_job_dep_failures(task):
continue
blocked_by = self.job_blocked_by(task)

View File

@@ -0,0 +1,274 @@
# Generated by Claude Opus 4.6 (claude-opus-4-6)
#
# Test file for cancel + dependency chain behavior and workflow cancel propagation.
#
# These tests verify:
#
# 1. TaskManager.process_job_dep_failures() correctly distinguishes canceled vs
# failed dependencies in the job_explanation message.
#
# 2. TaskManager.process_pending_tasks() transitions pending jobs with
# cancel_flag=True directly to canceled status.
#
# 3. WorkflowManager + TaskManager together cancel all spawned jobs in a
# workflow and finalize the workflow as canceled.
import pytest
from unittest import mock
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
from awx.main.models import JobTemplate, ProjectUpdate, WorkflowApproval, WorkflowJobTemplate
from awx.main.models.workflow import WorkflowApprovalTemplate
from . import create_job
@pytest.fixture
def scm_on_launch_objects(job_template_factory):
"""Create a job template with a project configured for scm_update_on_launch."""
objects = job_template_factory(
'jt',
organization='org1',
project='proj',
inventory='inv',
credential='cred',
)
p = objects.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0
p.save(skip_update=True)
return objects
def _create_job_with_dependency(objects):
"""Create a pending job and run DependencyManager to produce its project update dependency.
Returns (job, project_update).
"""
j = create_job(objects.job_template, dependencies_processed=False)
with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'):
DependencyManager().schedule()
assert j.dependent_jobs.count() == 1
pu = j.dependent_jobs.first()
assert isinstance(pu.get_real_instance(), ProjectUpdate)
return j, pu
@pytest.mark.django_db
class TestCanceledDependencyFailsBlockedJob:
"""When a dependency project update is canceled or failed, the task manager
should fail the blocked job via process_job_dep_failures."""
def test_canceled_dependency_fails_blocked_job(self, controlplane_instance_group, scm_on_launch_objects):
"""A canceled dependency causes the blocked job to be failed with
a 'Previous Task Canceled' explanation."""
j, pu = _create_job_with_dependency(scm_on_launch_objects)
ProjectUpdate.objects.filter(pk=pu.pk).update(status='canceled', cancel_flag=True)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
j.refresh_from_db()
assert j.status == 'failed'
assert 'Previous Task Canceled' in j.job_explanation
def test_failed_dependency_fails_blocked_job(self, controlplane_instance_group, scm_on_launch_objects):
"""A failed dependency causes the blocked job to be failed with
a 'Previous Task Failed' explanation."""
j, pu = _create_job_with_dependency(scm_on_launch_objects)
ProjectUpdate.objects.filter(pk=pu.pk).update(status='failed')
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
j.refresh_from_db()
assert j.status == 'failed'
assert 'Previous Task Failed' in j.job_explanation
@pytest.mark.django_db
class TestTaskManagerCancelsPendingJobsWithCancelFlag:
"""When the task manager encounters pending jobs that have cancel_flag set,
it should transition them directly to canceled status."""
def test_pending_job_with_cancel_flag_is_canceled(self, controlplane_instance_group, job_template_factory):
"""A pending job with cancel_flag=True is transitioned to canceled
by the task manager without being started."""
objects = job_template_factory(
'jt',
organization='org1',
project='proj',
inventory='inv',
credential='cred',
)
j = create_job(objects.job_template)
j.cancel_flag = True
j.save(update_fields=['cancel_flag'])
with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start:
TaskManager().schedule()
j.refresh_from_db()
assert j.status == 'canceled'
assert 'canceled before it started' in j.job_explanation
assert not mock_start.called
def test_pending_job_without_cancel_flag_is_not_canceled(self, controlplane_instance_group, job_template_factory):
"""A normal pending job without cancel_flag should not be canceled
by the task manager (sanity check)."""
objects = job_template_factory(
'jt',
organization='org1',
project='proj',
inventory='inv',
credential='cred',
)
j = create_job(objects.job_template)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
j.refresh_from_db()
assert j.status != 'canceled'
def test_multiple_pending_jobs_with_cancel_flag_bulk_canceled(self, controlplane_instance_group, job_template_factory):
"""Multiple pending jobs with cancel_flag=True are all transitioned
to canceled in a single task manager cycle."""
objects = job_template_factory(
'jt',
organization='org1',
project='proj',
inventory='inv',
credential='cred',
)
jt = objects.job_template
jt.allow_simultaneous = True
jt.save()
jobs = []
for _ in range(3):
j = create_job(jt)
j.cancel_flag = True
j.save(update_fields=['cancel_flag'])
jobs.append(j)
with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start:
TaskManager().schedule()
for j in jobs:
j.refresh_from_db()
assert j.status == 'canceled', f"Job {j.id} should be canceled but is {j.status}"
assert 'canceled before it started' in j.job_explanation
assert not mock_start.called
@pytest.mark.django_db
class TestWorkflowCancelFinalizesWorkflow:
"""When a workflow job is canceled, the WorkflowManager cancels spawned child
jobs (setting cancel_flag), the TaskManager transitions those pending jobs to
canceled, and a final WorkflowManager pass finalizes the workflow as canceled."""
def test_cancel_workflow_with_parallel_nodes(self, inventory, project, controlplane_instance_group):
"""Create a workflow with parallel nodes, cancel it after one job is
running, and verify all jobs and the workflow reach canceled status."""
jt = JobTemplate.objects.create(allow_simultaneous=False, inventory=inventory, project=project, playbook='helloworld.yml')
wfjt = WorkflowJobTemplate.objects.create(name='test-cancel-wf')
for _ in range(4):
wfjt.workflow_nodes.create(unified_job_template=jt)
wj = wfjt.create_unified_job()
wj.signal_start()
# TaskManager transitions workflow job to running via start_task
TaskManager().schedule()
wj.refresh_from_db()
assert wj.status == 'running'
# WorkflowManager spawns jobs for all 4 nodes
WorkflowManager().schedule()
assert jt.jobs.count() == 4
# Simulate one job running (blocking the others via allow_simultaneous=False)
first_job = jt.jobs.order_by('created').first()
first_job.status = 'running'
first_job.celery_task_id = 'fake-task-id'
first_job.controller_node = 'test-node'
first_job.save(update_fields=['status', 'celery_task_id', 'controller_node'])
# Cancel the workflow
wj.cancel_flag = True
wj.save(update_fields=['cancel_flag'])
# WorkflowManager sees cancel_flag, calls cancel_node_jobs() which sets
# cancel_flag on all child jobs
with mock.patch('awx.main.models.unified_jobs.UnifiedJob.cancel_dispatcher_process'):
WorkflowManager().schedule()
# The running job won't actually stop in tests (no dispatcher), simulate it
first_job.status = 'canceled'
first_job.save(update_fields=['status'])
# TaskManager processes remaining pending jobs with cancel_flag set
with mock.patch("awx.main.scheduler.TaskManager.start_task") as mock_start:
DependencyManager().schedule()
TaskManager().schedule()
for job in jt.jobs.all():
job.refresh_from_db()
assert job.status == 'canceled', f"Job {job.id} should be canceled but is {job.status}"
assert not mock_start.called
# Final WorkflowManager pass finalizes the workflow
WorkflowManager().schedule()
wj.refresh_from_db()
assert wj.status == 'canceled'
def test_cancel_workflow_with_approval_node(self, controlplane_instance_group):
"""Create a workflow with a pending approval node and a downstream job
node. Cancel the workflow and verify the approval is directly canceled
by the WorkflowManager (since approvals are excluded from TaskManager),
the downstream node is marked do_not_run, and the workflow finalizes."""
approval_template = WorkflowApprovalTemplate.objects.create(name='test-approval', timeout=0)
wfjt = WorkflowJobTemplate.objects.create(name='test-cancel-approval-wf')
approval_node = wfjt.workflow_nodes.create(unified_job_template=approval_template)
# Add a downstream node (just another approval to keep it simple)
downstream_template = WorkflowApprovalTemplate.objects.create(name='test-downstream', timeout=0)
downstream_node = wfjt.workflow_nodes.create(unified_job_template=downstream_template)
approval_node.success_nodes.add(downstream_node)
wj = wfjt.create_unified_job()
wj.signal_start()
# TaskManager transitions workflow to running
TaskManager().schedule()
wj.refresh_from_db()
assert wj.status == 'running'
# WorkflowManager spawns the approval (root node only, downstream waits)
WorkflowManager().schedule()
assert WorkflowApproval.objects.filter(unified_job_node__workflow_job=wj).count() == 1
approval_job = WorkflowApproval.objects.get(unified_job_node__workflow_job=wj)
assert approval_job.status == 'pending'
# Cancel the workflow
wj.cancel_flag = True
wj.save(update_fields=['cancel_flag'])
# WorkflowManager should cancel the approval directly and mark
# the downstream node as do_not_run
WorkflowManager().schedule()
approval_job.refresh_from_db()
assert approval_job.status == 'canceled', f"Approval should be canceled directly by WorkflowManager but is {approval_job.status}"
# Downstream node should be marked do_not_run with no job spawned
downstream_wj_node = wj.workflow_nodes.get(unified_job_template=downstream_template)
assert downstream_wj_node.do_not_run is True
assert downstream_wj_node.job is None
# Workflow should finalize as canceled in the same pass
wj.refresh_from_db()
assert wj.status == 'canceled'