diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index a6f477539c..670012dc57 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -421,26 +421,40 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) - def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time): + def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time, + isolated=False): for task in node_jobs: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if isinstance(task, WorkflowJob): continue if task.modified > celery_task_start_time: continue - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) + new_status = 'failed' + if isolated: + new_status = 'error' + task.status = new_status + if isolated: + # TODO: cancel and reap artifacts of lost jobs from heartbeat + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but its ', + 'controller management daemon was not present in', + 'Celery, so it has been marked as failed.', + 'Task may still be running, but contactability is unknown.' + )) + else: + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) try: task.save(update_fields=['status', 'job_explanation']) except DatabaseError: logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) continue awx_tasks._send_notification_templates(task, 'failed') - task.websocket_emit_status('failed') - logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) + task.websocket_emit_status(new_status) + logger.error("{}Task {} has no record in celery. Marking as failed".format( + 'Isolated ' if isolated else '', task.log_format)) def cleanup_inconsistent_celery_tasks(self): ''' @@ -471,26 +485,36 @@ class TaskManager(): self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) for node, node_jobs in running_tasks.iteritems(): + isolated = False if node in active_queues: active_tasks = active_queues[node] else: ''' - Node task list not found in celery. If tower thinks the node is down - then fail all the jobs on the node. + Node task list not found in celery. We may branch into cases: + - instance is unknown to tower, system is improperly configured + - instance is reported as down, then fail all jobs on the node + - instance is an isolated node, then check running tasks + among all allowed controller nodes for management process ''' - try: - instance = Instance.objects.get(hostname=node) - if instance.capacity == 0: - active_tasks = [] - else: - continue - except Instance.DoesNotExist: - logger.error("Execution node Instance {} not found in database. " - "The node is currently executing jobs {}".format(node, - [j.log_format for j in node_jobs])) - active_tasks = [] + instance = Instance.objects.filter(hostname=node).first() - self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) + if instance is None: + logger.error("Execution node Instance {} not found in database. " + "The node is currently executing jobs {}".format( + node, [j.log_format for j in node_jobs])) + active_tasks = [] + elif instance.capacity == 0: + active_tasks = [] + elif instance.rampart_groups.filter(controller__isnull=False).exists(): + active_tasks = all_celery_task_ids + isolated = True + else: + continue + + self.fail_jobs_if_not_in_celery( + node_jobs, active_tasks, celery_task_start_time, + isolated=isolated + ) def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 65b7607bb4..1937b7b5ca 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) + @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None)) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))