From 878e7ef49fcaf741c7e6f986ab51e574ca608269 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 31 Aug 2017 17:37:40 -0400 Subject: [PATCH 1/2] reap isolated jobs --- awx/main/scheduler/__init__.py | 72 +++++++++++++++++------- awx/main/tests/unit/test_task_manager.py | 2 +- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index a6f477539c..5c713e6196 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta import logging import uuid from sets import Set +import itertools # Django from django.conf import settings @@ -421,26 +422,40 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) - def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time): + def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time, + isolated=False): for task in node_jobs: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if isinstance(task, WorkflowJob): continue if task.modified > celery_task_start_time: continue - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) + new_status = 'failed' + if isolated: + new_status = 'error' + task.status = new_status + if isolated: + # TODO: cancel and reap artifacts of lost jobs from heartbeat + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but its' + 'controller management daemon was not present in', + 'Celery, so it has been marked as failed.', + 'Task may still be running, but contactability is unknown.' + )) + else: + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) try: task.save(update_fields=['status', 'job_explanation']) except DatabaseError: logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) continue - awx_tasks._send_notification_templates(task, 'failed') - task.websocket_emit_status('failed') - logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) + awx_tasks._send_notification_templates(task, new_status) + task.websocket_emit_status(new_status) + logger.error("{}Task {} has no record in celery. Marking as failed".format( + 'Isolated ' if isolated else '', task.log_format)) def cleanup_inconsistent_celery_tasks(self): ''' @@ -471,26 +486,41 @@ class TaskManager(): self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) for node, node_jobs in running_tasks.iteritems(): + isolated = False if node in active_queues: active_tasks = active_queues[node] else: ''' - Node task list not found in celery. If tower thinks the node is down - then fail all the jobs on the node. + Node task list not found in celery. We may branch into cases: + - instance is unknown to tower, system is improperly configured + - instance is reported as down, then fail all jobs on the node + - instance is an isolated node, then check running tasks + among all allowed controller nodes for management process ''' - try: - instance = Instance.objects.get(hostname=node) - if instance.capacity == 0: - active_tasks = [] + instance = Instance.objects.filter(hostname=node).first() + + if instance is None: + logger.error("Execution node Instance {} not found in database. " + "The node is currently executing jobs {}".format( + node, [j.log_format for j in node_jobs])) + active_tasks = [] + elif instance.capacity == 0: + active_tasks = [] + else: + cids = instance.rampart_groups.filter( + controller__isnull=False).values_list('id', flat=True) + if cids: + control_hosts = Instance.objects.filter( + rampart_groups__in=cids).values_list('hostname', flat=True) + active_tasks = set(itertools.chain(active_queues[host] for host in control_hosts)) + isolated = True else: continue - except Instance.DoesNotExist: - logger.error("Execution node Instance {} not found in database. " - "The node is currently executing jobs {}".format(node, - [j.log_format for j in node_jobs])) - active_tasks = [] - self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) + self.fail_jobs_if_not_in_celery( + node_jobs, active_tasks, celery_task_start_time, + isolated=isolated + ) def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 65b7607bb4..24536adae5 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) + @mock.patch.object(Instance.objects, 'prefetch_related', return_value=[]) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) From 8e1e60c18727d68d35440849cc1637696b4b7fa1 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 8 Sep 2017 10:01:44 -0700 Subject: [PATCH 2/2] simplify isolated job reaping by checking all task ids --- awx/main/scheduler/__init__.py | 18 ++++++------------ awx/main/tests/unit/test_task_manager.py | 2 +- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 5c713e6196..670012dc57 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta import logging import uuid from sets import Set -import itertools # Django from django.conf import settings @@ -437,7 +436,7 @@ class TaskManager(): if isolated: # TODO: cancel and reap artifacts of lost jobs from heartbeat task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but its' + 'Task was marked as running in Tower but its ', 'controller management daemon was not present in', 'Celery, so it has been marked as failed.', 'Task may still be running, but contactability is unknown.' @@ -452,7 +451,7 @@ class TaskManager(): except DatabaseError: logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) continue - awx_tasks._send_notification_templates(task, new_status) + awx_tasks._send_notification_templates(task, 'failed') task.websocket_emit_status(new_status) logger.error("{}Task {} has no record in celery. Marking as failed".format( 'Isolated ' if isolated else '', task.log_format)) @@ -506,16 +505,11 @@ class TaskManager(): active_tasks = [] elif instance.capacity == 0: active_tasks = [] + elif instance.rampart_groups.filter(controller__isnull=False).exists(): + active_tasks = all_celery_task_ids + isolated = True else: - cids = instance.rampart_groups.filter( - controller__isnull=False).values_list('id', flat=True) - if cids: - control_hosts = Instance.objects.filter( - rampart_groups__in=cids).values_list('hostname', flat=True) - active_tasks = set(itertools.chain(active_queues[host] for host in control_hosts)) - isolated = True - else: - continue + continue self.fail_jobs_if_not_in_celery( node_jobs, active_tasks, celery_task_start_time, diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 24536adae5..1937b7b5ca 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) - @mock.patch.object(Instance.objects, 'prefetch_related', return_value=[]) + @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None)) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))