diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index ba7427c03b..441c4e921b 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -270,6 +270,9 @@ class WorkflowManager(TaskBase): job.status = 'failed' job.save(update_fields=['status', 'job_explanation']) job.websocket_emit_status('failed') + # NOTE: sending notification templates here is slightly worse performance + # this is not yet optimized in the same way as for the TaskManager + job.send_notification_templates('failed') ScheduleWorkflowManager().schedule() # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? @@ -430,6 +433,25 @@ class TaskManager(TaskBase): self.tm_models = TaskManagerModels() self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig + def process_job_dep_failures(self, task): + """If job depends on a job that has failed, mark as failed and handle misc stuff.""" + for dep in task.dependent_jobs.all(): + # if we detect a failed or error dependency, go ahead and fail this task. + if dep.status in ("error", "failed"): + task.status = 'failed' + logger.warning(f'Previous task failed task: {task.id} dep: {dep.id} task manager') + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + self.pre_start_failed.append(task.id) + return True + + return False + def job_blocked_by(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph # in the old task manager this was handled as a method on each task object outside of the graph and @@ -441,20 +463,6 @@ class TaskManager(TaskBase): for dep in task.dependent_jobs.all(): if dep.status in ACTIVE_STATES: return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep return None @@ -474,7 +482,6 @@ class TaskManager(TaskBase): if self.start_task_limit == 0: # schedule another run immediately after this task manager ScheduleTaskManager().schedule() - from awx.main.tasks.system import handle_work_error, handle_work_success task.status = 'waiting' @@ -485,7 +492,7 @@ class TaskManager(TaskBase): task.job_explanation += ' ' task.job_explanation += 'Task failed pre-start check.' task.save() - # TODO: run error handler to fail sub-tasks and send notifications + self.pre_start_failed.append(task.id) else: if type(task) is WorkflowJob: task.status = 'running' @@ -507,19 +514,16 @@ class TaskManager(TaskBase): # apply_async does a NOTIFY to the channel dispatcher is listening to # postgres will treat this as part of the transaction, which is what we want if task.status != 'failed' and type(task) is not WorkflowJob: - task_actual = {'type': get_type_for_model(type(task)), 'id': task.id} task_cls = task._get_task_class() task_cls.apply_async( [task.pk], opts, queue=task.get_queue_name(), uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'kwargs': {'task_actual': task_actual}}], ) - # In exception cases, like a job failing pre-start checks, we send the websocket status message - # for jobs going into waiting, we omit this because of performance issues, as it should go to running quickly + # In exception cases, like a job failing pre-start checks, we send the websocket status message. + # For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly if task.status != 'waiting': task.websocket_emit_status(task.status) # adds to on_commit @@ -540,6 +544,11 @@ class TaskManager(TaskBase): if self.timed_out(): logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early") break + + has_failed = self.process_job_dep_failures(task) + if has_failed: + continue + blocked_by = self.job_blocked_by(task) if blocked_by: self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) @@ -653,6 +662,11 @@ class TaskManager(TaskBase): reap_job(j, 'failed') def process_tasks(self): + # maintain a list of jobs that went to an early failure state, + # meaning the dispatcher never got these jobs, + # that means we have to handle notifications for those + self.pre_start_failed = [] + running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) @@ -662,6 +676,11 @@ class TaskManager(TaskBase): self.process_pending_tasks(pending_tasks) self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) + if self.pre_start_failed: + from awx.main.tasks.system import handle_failure_notifications + + handle_failure_notifications.delay(self.pre_start_failed) + def timeout_approval_node(self, task): if self.timed_out(): logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early") diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 31c95fd102..d244baa534 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -74,6 +74,8 @@ from awx.main.utils.common import ( extract_ansible_vars, get_awx_version, create_partition, + ScheduleWorkflowManager, + ScheduleTaskManager, ) from awx.conf.license import get_license from awx.main.utils.handlers import SpecialInventoryHandler @@ -450,6 +452,12 @@ class BaseTask(object): instance.ansible_version = ansible_version_info instance.save(update_fields=['ansible_version']) + # Run task manager appropriately for speculative dependencies + if instance.unifiedjob_blocked_jobs.exists(): + ScheduleTaskManager().schedule() + if instance.spawned_by_workflow: + ScheduleWorkflowManager().schedule() + def should_use_fact_cache(self): return False diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index da7341af36..b32ae7b5e5 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -53,13 +53,7 @@ from awx.main.models import ( from awx.main.constants import ACTIVE_STATES from awx.main.dispatch.publish import task from awx.main.dispatch import get_task_queuename, reaper -from awx.main.utils.common import ( - get_type_for_model, - ignore_inventory_computed_fields, - ignore_inventory_group_removal, - ScheduleWorkflowManager, - ScheduleTaskManager, -) +from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock @@ -765,63 +759,19 @@ def awx_periodic_scheduler(): emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) -def schedule_manager_success_or_error(instance): - if instance.unifiedjob_blocked_jobs.exists(): - ScheduleTaskManager().schedule() - if instance.spawned_by_workflow: - ScheduleWorkflowManager().schedule() - - @task(queue=get_task_queuename) -def handle_work_success(task_actual): - try: - instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) - except ObjectDoesNotExist: - logger.warning('Missing {} `{}` in success callback.'.format(task_actual['type'], task_actual['id'])) - return - if not instance: - return - schedule_manager_success_or_error(instance) - - -@task(queue=get_task_queuename) -def handle_work_error(task_actual): - try: - instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) - except ObjectDoesNotExist: - logger.warning('Missing {} `{}` in error callback.'.format(task_actual['type'], task_actual['id'])) - return - if not instance: - return - - subtasks = instance.get_jobs_fail_chain() # reverse of dependent_jobs mostly - logger.debug(f'Executing error task id {task_actual["id"]}, subtasks: {[subtask.id for subtask in subtasks]}') - - deps_of_deps = {} - - for subtask in subtasks: - if subtask.celery_task_id != instance.celery_task_id and not subtask.cancel_flag and not subtask.status in ('successful', 'failed'): - # If there are multiple in the dependency chain, A->B->C, and this was called for A, blame B for clarity - blame_job = deps_of_deps.get(subtask.id, instance) - subtask.status = 'failed' - subtask.failed = True - if not subtask.job_explanation: - subtask.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(blame_job)), - blame_job.name, - blame_job.id, - ) - subtask.save() - subtask.websocket_emit_status("failed") - - for sub_subtask in subtask.get_jobs_fail_chain(): - deps_of_deps[sub_subtask.id] = subtask - - # We only send 1 job complete message since all the job completion message - # handling does is trigger the scheduler. If we extend the functionality of - # what the job complete message handler does then we may want to send a - # completion event for each job here. - schedule_manager_success_or_error(instance) +def handle_failure_notifications(task_ids): + """A task-ified version of the method that sends notifications.""" + found_task_ids = set() + for instance in UnifiedJob.objects.filter(id__in=task_ids): + found_task_ids.add(instance.id) + try: + instance.send_notification_templates('failed') + except Exception: + logger.exception(f'Error preparing notifications for task {instance.id}') + deleted_tasks = set(task_ids) - found_task_ids + if deleted_tasks: + logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database') @task(queue=get_task_queuename) diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index c4d0dac4e3..70de6317a4 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -5,8 +5,8 @@ import tempfile import shutil from awx.main.tasks.jobs import RunJob -from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files, handle_work_error -from awx.main.models import Instance, Job, InventoryUpdate, ProjectUpdate +from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files +from awx.main.models import Instance, Job @pytest.fixture @@ -73,17 +73,3 @@ def test_does_not_run_reaped_job(mocker, mock_me): job.refresh_from_db() assert job.status == 'failed' mock_run.assert_not_called() - - -@pytest.mark.django_db -def test_handle_work_error_nested(project, inventory_source): - pu = ProjectUpdate.objects.create(status='failed', project=project, celery_task_id='1234') - iu = InventoryUpdate.objects.create(status='pending', inventory_source=inventory_source, source='scm') - job = Job.objects.create(status='pending') - iu.dependent_jobs.add(pu) - job.dependent_jobs.add(pu, iu) - handle_work_error({'type': 'project_update', 'id': pu.id}) - iu.refresh_from_db() - job.refresh_from_db() - assert iu.job_explanation == f'Previous Task Failed: {{"job_type": "project_update", "job_name": "", "job_id": "{pu.id}"}}' - assert job.job_explanation == f'Previous Task Failed: {{"job_type": "inventory_update", "job_name": "", "job_id": "{iu.id}"}}' diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 20ab56f270..aa1e63c906 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -143,13 +143,6 @@ def test_send_notifications_job_id(mocker): assert UnifiedJob.objects.get.called_with(id=1) -def test_work_success_callback_missing_job(): - task_data = {'type': 'project_update', 'id': 9999} - with mock.patch('django.db.models.query.QuerySet.get') as get_mock: - get_mock.side_effect = ProjectUpdate.DoesNotExist() - assert system.handle_work_success(task_data) is None - - @mock.patch('awx.main.models.UnifiedJob.objects.get') @mock.patch('awx.main.models.Notification.objects.filter') def test_send_notifications_list(mock_notifications_filter, mock_job_get, mocker):