if dependency fails, fail job in task manager

This commit is contained in:
Seth Foster 2022-05-10 17:19:07 -04:00
parent 1b662fcca5
commit 0ae9fe3624
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028
3 changed files with 30 additions and 17 deletions

View File

@ -407,18 +407,11 @@ class TaskManagerUnifiedJobMixin(models.Model):
def get_jobs_fail_chain(self):
return []
def dependent_jobs_finished(self):
return True
class TaskManagerJobMixin(TaskManagerUnifiedJobMixin):
class Meta:
abstract = True
def dependent_jobs_finished(self):
# if any dependent jobs are pending, waiting, or running, return False
return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all())
class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin):
class Meta:
@ -430,7 +423,17 @@ class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin):
abstract = True
def get_jobs_fail_chain(self):
return list(self.unifiedjob_blocked_jobs.all())
# project update can be a dependency of an inventory update, in which
# case we need to fail the job that may have spawned the inventory
# update.
# The inventory update will fail, but since it is not running it will
# not cascade fail to the job from the errback logic in apply_async. As
# such we should capture it here.
blocked_jobs = list(self.unifiedjob_blocked_jobs.all().prefetch_related("unifiedjob_blocked_jobs"))
other_tasks = []
for b in blocked_jobs:
other_tasks += list(b.unifiedjob_blocked_jobs.all())
return blocked_jobs + other_tasks
class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin):
@ -452,10 +455,6 @@ class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin):
other_updates.append(dep)
return blocked_jobs + other_updates
def dependent_jobs_finished(self):
# if any dependent jobs are pending, waiting, or running, return False
return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all())
class ExecutionEnvironmentMixin(models.Model):
class Meta:

View File

@ -34,6 +34,7 @@ from awx.main.utils.pglock import advisory_lock
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
from awx.main.utils.common import create_partition
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
@ -79,10 +80,23 @@ class TaskManager:
if blocked_by:
return blocked_by
if not task.dependent_jobs_finished():
blocked_by = task.dependent_jobs.first()
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None

View File

@ -695,7 +695,7 @@ def handle_work_error(task_id, *args, **kwargs):
first_instance = instance
first_instance_type = each_task['type']
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful':
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status in ('successful', 'failed'):
instance.status = 'failed'
instance.failed = True
if not instance.job_explanation: