From 8e1e60c18727d68d35440849cc1637696b4b7fa1 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 8 Sep 2017 10:01:44 -0700 Subject: [PATCH] simplify isolated job reaping by checking all task ids --- awx/main/scheduler/__init__.py | 18 ++++++------------ awx/main/tests/unit/test_task_manager.py | 2 +- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 5c713e6196..670012dc57 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta import logging import uuid from sets import Set -import itertools # Django from django.conf import settings @@ -437,7 +436,7 @@ class TaskManager(): if isolated: # TODO: cancel and reap artifacts of lost jobs from heartbeat task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but its' + 'Task was marked as running in Tower but its ', 'controller management daemon was not present in', 'Celery, so it has been marked as failed.', 'Task may still be running, but contactability is unknown.' @@ -452,7 +451,7 @@ class TaskManager(): except DatabaseError: logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) continue - awx_tasks._send_notification_templates(task, new_status) + awx_tasks._send_notification_templates(task, 'failed') task.websocket_emit_status(new_status) logger.error("{}Task {} has no record in celery. Marking as failed".format( 'Isolated ' if isolated else '', task.log_format)) @@ -506,16 +505,11 @@ class TaskManager(): active_tasks = [] elif instance.capacity == 0: active_tasks = [] + elif instance.rampart_groups.filter(controller__isnull=False).exists(): + active_tasks = all_celery_task_ids + isolated = True else: - cids = instance.rampart_groups.filter( - controller__isnull=False).values_list('id', flat=True) - if cids: - control_hosts = Instance.objects.filter( - rampart_groups__in=cids).values_list('hostname', flat=True) - active_tasks = set(itertools.chain(active_queues[host] for host in control_hosts)) - isolated = True - else: - continue + continue self.fail_jobs_if_not_in_celery( node_jobs, active_tasks, celery_task_start_time, diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 24536adae5..1937b7b5ca 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(Instance.objects, 'prefetch_related', return_value=[]) + @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None)) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))