From 0ae9fe3624a3225ef63ea75f0aa36a48ee2cd884 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 10 May 2022 17:19:07 -0400 Subject: [PATCH] if dependency fails, fail job in task manager --- awx/main/models/mixins.py | 23 +++++++++++------------ awx/main/scheduler/task_manager.py | 22 ++++++++++++++++++---- awx/main/tasks/system.py | 2 +- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index d6c5e4e980..34e05fa818 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -407,18 +407,11 @@ class TaskManagerUnifiedJobMixin(models.Model): def get_jobs_fail_chain(self): return [] - def dependent_jobs_finished(self): - return True - class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def dependent_jobs_finished(self): - # if any dependent jobs are pending, waiting, or running, return False - return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) - class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: @@ -430,7 +423,17 @@ class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): abstract = True def get_jobs_fail_chain(self): - return list(self.unifiedjob_blocked_jobs.all()) + # project update can be a dependency of an inventory update, in which + # case we need to fail the job that may have spawned the inventory + # update. + # The inventory update will fail, but since it is not running it will + # not cascade fail to the job from the errback logic in apply_async. As + # such we should capture it here. + blocked_jobs = list(self.unifiedjob_blocked_jobs.all().prefetch_related("unifiedjob_blocked_jobs")) + other_tasks = [] + for b in blocked_jobs: + other_tasks += list(b.unifiedjob_blocked_jobs.all()) + return blocked_jobs + other_tasks class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): @@ -452,10 +455,6 @@ class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): other_updates.append(dep) return blocked_jobs + other_updates - def dependent_jobs_finished(self): - # if any dependent jobs are pending, waiting, or running, return False - return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) - class ExecutionEnvironmentMixin(models.Model): class Meta: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8de9cf3546..3d80973840 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -34,6 +34,7 @@ from awx.main.utils.pglock import advisory_lock from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager from awx.main.utils.common import create_partition from awx.main.signals import disable_activity_stream +from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.task_manager_models import TaskManagerInstances from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups @@ -79,10 +80,23 @@ class TaskManager: if blocked_by: return blocked_by - if not task.dependent_jobs_finished(): - blocked_by = task.dependent_jobs.first() - if blocked_by: - return blocked_by + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep return None diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 274ff546f1..8c698609a5 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -695,7 +695,7 @@ def handle_work_error(task_id, *args, **kwargs): first_instance = instance first_instance_type = each_task['type'] - if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful': + if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status in ('successful', 'failed'): instance.status = 'failed' instance.failed = True if not instance.job_explanation: