mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
Send notifications for dependency failures (#14603)
* Send notifications for dependency failures * Delete tests for deleted method * Remove another test for removed method
This commit is contained in:
@@ -270,6 +270,9 @@ class WorkflowManager(TaskBase):
|
|||||||
job.status = 'failed'
|
job.status = 'failed'
|
||||||
job.save(update_fields=['status', 'job_explanation'])
|
job.save(update_fields=['status', 'job_explanation'])
|
||||||
job.websocket_emit_status('failed')
|
job.websocket_emit_status('failed')
|
||||||
|
# NOTE: sending notification templates here is slightly worse performance
|
||||||
|
# this is not yet optimized in the same way as for the TaskManager
|
||||||
|
job.send_notification_templates('failed')
|
||||||
ScheduleWorkflowManager().schedule()
|
ScheduleWorkflowManager().schedule()
|
||||||
|
|
||||||
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
|
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
|
||||||
@@ -430,6 +433,25 @@ class TaskManager(TaskBase):
|
|||||||
self.tm_models = TaskManagerModels()
|
self.tm_models = TaskManagerModels()
|
||||||
self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig
|
self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig
|
||||||
|
|
||||||
|
def process_job_dep_failures(self, task):
|
||||||
|
"""If job depends on a job that has failed, mark as failed and handle misc stuff."""
|
||||||
|
for dep in task.dependent_jobs.all():
|
||||||
|
# if we detect a failed or error dependency, go ahead and fail this task.
|
||||||
|
if dep.status in ("error", "failed"):
|
||||||
|
task.status = 'failed'
|
||||||
|
logger.warning(f'Previous task failed task: {task.id} dep: {dep.id} task manager')
|
||||||
|
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
||||||
|
get_type_for_model(type(dep)),
|
||||||
|
dep.name,
|
||||||
|
dep.id,
|
||||||
|
)
|
||||||
|
task.save(update_fields=['status', 'job_explanation'])
|
||||||
|
task.websocket_emit_status('failed')
|
||||||
|
self.pre_start_failed.append(task.id)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def job_blocked_by(self, task):
|
def job_blocked_by(self, task):
|
||||||
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
|
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
|
||||||
# in the old task manager this was handled as a method on each task object outside of the graph and
|
# in the old task manager this was handled as a method on each task object outside of the graph and
|
||||||
@@ -441,20 +463,6 @@ class TaskManager(TaskBase):
|
|||||||
for dep in task.dependent_jobs.all():
|
for dep in task.dependent_jobs.all():
|
||||||
if dep.status in ACTIVE_STATES:
|
if dep.status in ACTIVE_STATES:
|
||||||
return dep
|
return dep
|
||||||
# if we detect a failed or error dependency, go ahead and fail this
|
|
||||||
# task. The errback on the dependency takes some time to trigger,
|
|
||||||
# and we don't want the task to enter running state if its
|
|
||||||
# dependency has failed or errored.
|
|
||||||
elif dep.status in ("error", "failed"):
|
|
||||||
task.status = 'failed'
|
|
||||||
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
|
||||||
get_type_for_model(type(dep)),
|
|
||||||
dep.name,
|
|
||||||
dep.id,
|
|
||||||
)
|
|
||||||
task.save(update_fields=['status', 'job_explanation'])
|
|
||||||
task.websocket_emit_status('failed')
|
|
||||||
return dep
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -474,7 +482,6 @@ class TaskManager(TaskBase):
|
|||||||
if self.start_task_limit == 0:
|
if self.start_task_limit == 0:
|
||||||
# schedule another run immediately after this task manager
|
# schedule another run immediately after this task manager
|
||||||
ScheduleTaskManager().schedule()
|
ScheduleTaskManager().schedule()
|
||||||
from awx.main.tasks.system import handle_work_error, handle_work_success
|
|
||||||
|
|
||||||
task.status = 'waiting'
|
task.status = 'waiting'
|
||||||
|
|
||||||
@@ -485,7 +492,7 @@ class TaskManager(TaskBase):
|
|||||||
task.job_explanation += ' '
|
task.job_explanation += ' '
|
||||||
task.job_explanation += 'Task failed pre-start check.'
|
task.job_explanation += 'Task failed pre-start check.'
|
||||||
task.save()
|
task.save()
|
||||||
# TODO: run error handler to fail sub-tasks and send notifications
|
self.pre_start_failed.append(task.id)
|
||||||
else:
|
else:
|
||||||
if type(task) is WorkflowJob:
|
if type(task) is WorkflowJob:
|
||||||
task.status = 'running'
|
task.status = 'running'
|
||||||
@@ -507,19 +514,16 @@ class TaskManager(TaskBase):
|
|||||||
# apply_async does a NOTIFY to the channel dispatcher is listening to
|
# apply_async does a NOTIFY to the channel dispatcher is listening to
|
||||||
# postgres will treat this as part of the transaction, which is what we want
|
# postgres will treat this as part of the transaction, which is what we want
|
||||||
if task.status != 'failed' and type(task) is not WorkflowJob:
|
if task.status != 'failed' and type(task) is not WorkflowJob:
|
||||||
task_actual = {'type': get_type_for_model(type(task)), 'id': task.id}
|
|
||||||
task_cls = task._get_task_class()
|
task_cls = task._get_task_class()
|
||||||
task_cls.apply_async(
|
task_cls.apply_async(
|
||||||
[task.pk],
|
[task.pk],
|
||||||
opts,
|
opts,
|
||||||
queue=task.get_queue_name(),
|
queue=task.get_queue_name(),
|
||||||
uuid=task.celery_task_id,
|
uuid=task.celery_task_id,
|
||||||
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
|
|
||||||
errbacks=[{'task': handle_work_error.name, 'kwargs': {'task_actual': task_actual}}],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# In exception cases, like a job failing pre-start checks, we send the websocket status message
|
# In exception cases, like a job failing pre-start checks, we send the websocket status message.
|
||||||
# for jobs going into waiting, we omit this because of performance issues, as it should go to running quickly
|
# For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly
|
||||||
if task.status != 'waiting':
|
if task.status != 'waiting':
|
||||||
task.websocket_emit_status(task.status) # adds to on_commit
|
task.websocket_emit_status(task.status) # adds to on_commit
|
||||||
|
|
||||||
@@ -540,6 +544,11 @@ class TaskManager(TaskBase):
|
|||||||
if self.timed_out():
|
if self.timed_out():
|
||||||
logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early")
|
logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
has_failed = self.process_job_dep_failures(task)
|
||||||
|
if has_failed:
|
||||||
|
continue
|
||||||
|
|
||||||
blocked_by = self.job_blocked_by(task)
|
blocked_by = self.job_blocked_by(task)
|
||||||
if blocked_by:
|
if blocked_by:
|
||||||
self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1)
|
self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1)
|
||||||
@@ -653,6 +662,11 @@ class TaskManager(TaskBase):
|
|||||||
reap_job(j, 'failed')
|
reap_job(j, 'failed')
|
||||||
|
|
||||||
def process_tasks(self):
|
def process_tasks(self):
|
||||||
|
# maintain a list of jobs that went to an early failure state,
|
||||||
|
# meaning the dispatcher never got these jobs,
|
||||||
|
# that means we have to handle notifications for those
|
||||||
|
self.pre_start_failed = []
|
||||||
|
|
||||||
running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']]
|
running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']]
|
||||||
self.process_running_tasks(running_tasks)
|
self.process_running_tasks(running_tasks)
|
||||||
self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks))
|
self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks))
|
||||||
@@ -662,6 +676,11 @@ class TaskManager(TaskBase):
|
|||||||
self.process_pending_tasks(pending_tasks)
|
self.process_pending_tasks(pending_tasks)
|
||||||
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks))
|
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks))
|
||||||
|
|
||||||
|
if self.pre_start_failed:
|
||||||
|
from awx.main.tasks.system import handle_failure_notifications
|
||||||
|
|
||||||
|
handle_failure_notifications.delay(self.pre_start_failed)
|
||||||
|
|
||||||
def timeout_approval_node(self, task):
|
def timeout_approval_node(self, task):
|
||||||
if self.timed_out():
|
if self.timed_out():
|
||||||
logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early")
|
logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early")
|
||||||
|
|||||||
@@ -74,6 +74,8 @@ from awx.main.utils.common import (
|
|||||||
extract_ansible_vars,
|
extract_ansible_vars,
|
||||||
get_awx_version,
|
get_awx_version,
|
||||||
create_partition,
|
create_partition,
|
||||||
|
ScheduleWorkflowManager,
|
||||||
|
ScheduleTaskManager,
|
||||||
)
|
)
|
||||||
from awx.conf.license import get_license
|
from awx.conf.license import get_license
|
||||||
from awx.main.utils.handlers import SpecialInventoryHandler
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
||||||
@@ -450,6 +452,12 @@ class BaseTask(object):
|
|||||||
instance.ansible_version = ansible_version_info
|
instance.ansible_version = ansible_version_info
|
||||||
instance.save(update_fields=['ansible_version'])
|
instance.save(update_fields=['ansible_version'])
|
||||||
|
|
||||||
|
# Run task manager appropriately for speculative dependencies
|
||||||
|
if instance.unifiedjob_blocked_jobs.exists():
|
||||||
|
ScheduleTaskManager().schedule()
|
||||||
|
if instance.spawned_by_workflow:
|
||||||
|
ScheduleWorkflowManager().schedule()
|
||||||
|
|
||||||
def should_use_fact_cache(self):
|
def should_use_fact_cache(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -53,13 +53,7 @@ from awx.main.models import (
|
|||||||
from awx.main.constants import ACTIVE_STATES
|
from awx.main.constants import ACTIVE_STATES
|
||||||
from awx.main.dispatch.publish import task
|
from awx.main.dispatch.publish import task
|
||||||
from awx.main.dispatch import get_task_queuename, reaper
|
from awx.main.dispatch import get_task_queuename, reaper
|
||||||
from awx.main.utils.common import (
|
from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal
|
||||||
get_type_for_model,
|
|
||||||
ignore_inventory_computed_fields,
|
|
||||||
ignore_inventory_group_removal,
|
|
||||||
ScheduleWorkflowManager,
|
|
||||||
ScheduleTaskManager,
|
|
||||||
)
|
|
||||||
|
|
||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
@@ -765,63 +759,19 @@ def awx_periodic_scheduler():
|
|||||||
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
||||||
|
|
||||||
|
|
||||||
def schedule_manager_success_or_error(instance):
|
|
||||||
if instance.unifiedjob_blocked_jobs.exists():
|
|
||||||
ScheduleTaskManager().schedule()
|
|
||||||
if instance.spawned_by_workflow:
|
|
||||||
ScheduleWorkflowManager().schedule()
|
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename)
|
@task(queue=get_task_queuename)
|
||||||
def handle_work_success(task_actual):
|
def handle_failure_notifications(task_ids):
|
||||||
try:
|
"""A task-ified version of the method that sends notifications."""
|
||||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
found_task_ids = set()
|
||||||
except ObjectDoesNotExist:
|
for instance in UnifiedJob.objects.filter(id__in=task_ids):
|
||||||
logger.warning('Missing {} `{}` in success callback.'.format(task_actual['type'], task_actual['id']))
|
found_task_ids.add(instance.id)
|
||||||
return
|
try:
|
||||||
if not instance:
|
instance.send_notification_templates('failed')
|
||||||
return
|
except Exception:
|
||||||
schedule_manager_success_or_error(instance)
|
logger.exception(f'Error preparing notifications for task {instance.id}')
|
||||||
|
deleted_tasks = set(task_ids) - found_task_ids
|
||||||
|
if deleted_tasks:
|
||||||
@task(queue=get_task_queuename)
|
logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database')
|
||||||
def handle_work_error(task_actual):
|
|
||||||
try:
|
|
||||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
|
||||||
except ObjectDoesNotExist:
|
|
||||||
logger.warning('Missing {} `{}` in error callback.'.format(task_actual['type'], task_actual['id']))
|
|
||||||
return
|
|
||||||
if not instance:
|
|
||||||
return
|
|
||||||
|
|
||||||
subtasks = instance.get_jobs_fail_chain() # reverse of dependent_jobs mostly
|
|
||||||
logger.debug(f'Executing error task id {task_actual["id"]}, subtasks: {[subtask.id for subtask in subtasks]}')
|
|
||||||
|
|
||||||
deps_of_deps = {}
|
|
||||||
|
|
||||||
for subtask in subtasks:
|
|
||||||
if subtask.celery_task_id != instance.celery_task_id and not subtask.cancel_flag and not subtask.status in ('successful', 'failed'):
|
|
||||||
# If there are multiple in the dependency chain, A->B->C, and this was called for A, blame B for clarity
|
|
||||||
blame_job = deps_of_deps.get(subtask.id, instance)
|
|
||||||
subtask.status = 'failed'
|
|
||||||
subtask.failed = True
|
|
||||||
if not subtask.job_explanation:
|
|
||||||
subtask.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
|
||||||
get_type_for_model(type(blame_job)),
|
|
||||||
blame_job.name,
|
|
||||||
blame_job.id,
|
|
||||||
)
|
|
||||||
subtask.save()
|
|
||||||
subtask.websocket_emit_status("failed")
|
|
||||||
|
|
||||||
for sub_subtask in subtask.get_jobs_fail_chain():
|
|
||||||
deps_of_deps[sub_subtask.id] = subtask
|
|
||||||
|
|
||||||
# We only send 1 job complete message since all the job completion message
|
|
||||||
# handling does is trigger the scheduler. If we extend the functionality of
|
|
||||||
# what the job complete message handler does then we may want to send a
|
|
||||||
# completion event for each job here.
|
|
||||||
schedule_manager_success_or_error(instance)
|
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename)
|
@task(queue=get_task_queuename)
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import tempfile
|
|||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from awx.main.tasks.jobs import RunJob
|
from awx.main.tasks.jobs import RunJob
|
||||||
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files, handle_work_error
|
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files
|
||||||
from awx.main.models import Instance, Job, InventoryUpdate, ProjectUpdate
|
from awx.main.models import Instance, Job
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -73,17 +73,3 @@ def test_does_not_run_reaped_job(mocker, mock_me):
|
|||||||
job.refresh_from_db()
|
job.refresh_from_db()
|
||||||
assert job.status == 'failed'
|
assert job.status == 'failed'
|
||||||
mock_run.assert_not_called()
|
mock_run.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_handle_work_error_nested(project, inventory_source):
|
|
||||||
pu = ProjectUpdate.objects.create(status='failed', project=project, celery_task_id='1234')
|
|
||||||
iu = InventoryUpdate.objects.create(status='pending', inventory_source=inventory_source, source='scm')
|
|
||||||
job = Job.objects.create(status='pending')
|
|
||||||
iu.dependent_jobs.add(pu)
|
|
||||||
job.dependent_jobs.add(pu, iu)
|
|
||||||
handle_work_error({'type': 'project_update', 'id': pu.id})
|
|
||||||
iu.refresh_from_db()
|
|
||||||
job.refresh_from_db()
|
|
||||||
assert iu.job_explanation == f'Previous Task Failed: {{"job_type": "project_update", "job_name": "", "job_id": "{pu.id}"}}'
|
|
||||||
assert job.job_explanation == f'Previous Task Failed: {{"job_type": "inventory_update", "job_name": "", "job_id": "{iu.id}"}}'
|
|
||||||
|
|||||||
@@ -143,13 +143,6 @@ def test_send_notifications_job_id(mocker):
|
|||||||
assert UnifiedJob.objects.get.called_with(id=1)
|
assert UnifiedJob.objects.get.called_with(id=1)
|
||||||
|
|
||||||
|
|
||||||
def test_work_success_callback_missing_job():
|
|
||||||
task_data = {'type': 'project_update', 'id': 9999}
|
|
||||||
with mock.patch('django.db.models.query.QuerySet.get') as get_mock:
|
|
||||||
get_mock.side_effect = ProjectUpdate.DoesNotExist()
|
|
||||||
assert system.handle_work_success(task_data) is None
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('awx.main.models.UnifiedJob.objects.get')
|
@mock.patch('awx.main.models.UnifiedJob.objects.get')
|
||||||
@mock.patch('awx.main.models.Notification.objects.filter')
|
@mock.patch('awx.main.models.Notification.objects.filter')
|
||||||
def test_send_notifications_list(mock_notifications_filter, mock_job_get, mocker):
|
def test_send_notifications_list(mock_notifications_filter, mock_job_get, mocker):
|
||||||
|
|||||||
Reference in New Issue
Block a user