From 1df47a2ddde607dcc7e7a9c6938b49f25067513d Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 16 Aug 2017 13:18:25 -0400 Subject: [PATCH] account for waiting tasks not having execution_nodes yet * Reap running tasks on non-netsplit nodes * Reap running tasks on known to be offline nodes * Reap waiting tasks with no celery id anywhere if waiting >= 60 seconds --- awx/main/scheduler/__init__.py | 59 ++++++++++++------- .../task_management/test_scheduler.py | 39 ++++++------ awx/main/tests/unit/test_task_manager.py | 4 +- 3 files changed, 61 insertions(+), 41 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index abc7c167ee..2337170091 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -78,13 +78,18 @@ class TaskManager(): ''' def get_running_tasks(self): execution_nodes = {} + waiting_jobs = [] now = tz_now() workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter((Q(status='running') | Q(status='waiting', modified__lte=now - timedelta(seconds=60))) & ~Q(polymorphic_ctype_id=workflow_ctype_id)) - [execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs] - return execution_nodes + for j in jobs: + if j.execution_node: + execution_nodes.setdefault(j.execution_node, [j]).append(j) + else: + waiting_jobs.append(j) + return (execution_nodes, waiting_jobs) ''' Tasks that are currently running in celery @@ -410,6 +415,27 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) + def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time): + for task in node_jobs: + if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + if isinstance(task, WorkflowJob): + continue + if task.modified > celery_task_start_time: + continue + task.status = 'failed' + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) + try: + task.save(update_fields=['status', 'job_explanation']) + except DatabaseError: + logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) + continue + awx_tasks._send_notification_templates(task, 'failed') + task.websocket_emit_status('failed') + logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) + def cleanup_inconsistent_celery_tasks(self): ''' Rectify tower db <-> celery inconsistent view of jobs state @@ -431,7 +457,13 @@ class TaskManager(): Only consider failing tasks on instances for which we obtained a task list from celery for. ''' - running_tasks = self.get_running_tasks() + running_tasks, waiting_tasks = self.get_running_tasks() + all_celery_task_ids = [] + for node, node_jobs in active_queues.iteritems(): + all_celery_task_ids.extend(node_jobs) + + self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) + for node, node_jobs in running_tasks.iteritems(): if node in active_queues: active_tasks = active_queues[node] @@ -451,25 +483,8 @@ class TaskManager(): "The node is currently executing jobs {}".format(node, [j.log_format for j in node_jobs])) active_tasks = [] - for task in node_jobs: - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - if isinstance(task, WorkflowJob): - continue - if task.modified > celery_task_start_time: - continue - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) - try: - task.save(update_fields=['status', 'job_explanation']) - except DatabaseError: - logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) - continue - awx_tasks._send_notification_templates(task, 'failed') - task.websocket_emit_status('failed') - logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) + + self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) def calculate_capacity_used(self, tasks): for rampart_group in self.graph: diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 01bb88d0a0..05de8b2a81 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -231,23 +231,23 @@ class TestReaper(): Instance.objects.create(hostname='host4_offline', capacity=0) j1 = Job.objects.create(status='pending', execution_node='host1') - j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1') - j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1') + j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2') + j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3') j3.modified = now - timedelta(seconds=60) j3.save(update_fields=['modified']) j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1') - j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host1') + j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5') j5.modified = now - timedelta(seconds=60) j5.save(update_fields=['modified']) - j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6', execution_node='host2') + j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6') j6.modified = now - timedelta(seconds=60) j6.save(update_fields=['modified']) j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2') j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2') - j9 = Job.objects.create(status='waiting', celery_task_id='host3_j8', execution_node='host3_split') + j9 = Job.objects.create(status='waiting', celery_task_id='reapable_j8') j9.modified = now - timedelta(seconds=60) j9.save(update_fields=['modified']) - j10 = Job.objects.create(status='running', execution_node='host3_split') + j10 = Job.objects.create(status='running', celery_task_id='host3_j10', execution_node='host3_split') j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline') @@ -266,12 +266,16 @@ class TestReaper(): @pytest.fixture def running_tasks(self, all_jobs): return { - 'host1': all_jobs[2:5] + [all_jobs[11]], - 'host2': all_jobs[5:8], - 'host3_split': all_jobs[8:10], + 'host1': [all_jobs[3]], + 'host2': [all_jobs[7], all_jobs[8]], + 'host3_split': [all_jobs[9]], 'host4_offline': [all_jobs[10]], } + @pytest.fixture + def waiting_tasks(self, all_jobs): + return [all_jobs[2], all_jobs[4], all_jobs[5], all_jobs[8]] + @pytest.fixture def reapable_jobs(self, all_jobs): return [all_jobs[4], all_jobs[7], all_jobs[10]] @@ -290,10 +294,10 @@ class TestReaper(): @pytest.mark.django_db @mock.patch('awx.main.tasks._send_notification_templates') @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], [])) - def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, mocker): + def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker): tm = TaskManager() - tm.get_running_tasks = mocker.Mock(return_value=running_tasks) + tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks)) tm.get_active_tasks = mocker.Mock(return_value=active_tasks) tm.cleanup_inconsistent_celery_tasks() @@ -302,7 +306,7 @@ class TestReaper(): if j not in reapable_jobs: j.save.assert_not_called() - assert notify.call_count == 3 + assert notify.call_count == 4 notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True) for j in reapable_jobs: @@ -317,22 +321,23 @@ class TestReaper(): tm = TaskManager() # Ensure the query grabs the expected jobs - execution_nodes_jobs = tm.get_running_tasks() + execution_nodes_jobs, waiting_jobs = tm.get_running_tasks() assert 'host1' in execution_nodes_jobs assert 'host2' in execution_nodes_jobs assert 'host3_split' in execution_nodes_jobs - assert all_jobs[2] in execution_nodes_jobs['host1'] assert all_jobs[3] in execution_nodes_jobs['host1'] - assert all_jobs[4] in execution_nodes_jobs['host1'] - assert all_jobs[5] in execution_nodes_jobs['host2'] assert all_jobs[6] in execution_nodes_jobs['host2'] assert all_jobs[7] in execution_nodes_jobs['host2'] - assert all_jobs[8] in execution_nodes_jobs['host3_split'] assert all_jobs[9] in execution_nodes_jobs['host3_split'] assert all_jobs[10] in execution_nodes_jobs['host4_offline'] assert all_jobs[11] not in execution_nodes_jobs['host1'] + + assert all_jobs[2] in waiting_jobs + assert all_jobs[4] in waiting_jobs + assert all_jobs[5] in waiting_jobs + assert all_jobs[8] in waiting_jobs diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index b479952e53..f76e77862b 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -19,7 +19,7 @@ from django.core.cache import cache class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) - @mock.patch.object(TaskManager, 'get_running_tasks', return_value={'host1': [Job(id=2), Job(id=3),]}) + @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) @mock.patch('awx.main.scheduler.logger') @@ -43,7 +43,7 @@ class TestCleanupInconsistentCeleryTasks(): logger_mock.error = mock.MagicMock() job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1') job.websocket_emit_status = mock.MagicMock() - get_running_tasks.return_value = {'host1': [job]} + get_running_tasks.return_value = ({'host1': [job]}, []) tm = TaskManager() with mock.patch.object(job, 'save', side_effect=DatabaseError):