From dba83674a24d38c8102a868f00b4f7a2c4d45532 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 21 Sep 2017 14:38:58 -0400 Subject: [PATCH] fix task manager running task structures - prevent dual-entry for first item in running jobs due to setdefault syntax - fix issue where queues (celery tasks) only returned the last item in the inspector output due to looping problem this caused reaper bugs in production --- awx/main/scheduler/task_manager.py | 10 +++++----- awx/main/tests/unit/test_task_manager.py | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2e96661159..c78b96b3f6 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -97,7 +97,7 @@ class TaskManager(): ~Q(polymorphic_ctype_id=workflow_ctype_id)) for j in jobs: if j.execution_node: - execution_nodes.setdefault(j.execution_node, [j]).append(j) + execution_nodes.setdefault(j.execution_node, []).append(j) else: waiting_jobs.append(j) return (execution_nodes, waiting_jobs) @@ -142,10 +142,10 @@ class TaskManager(): active_tasks = set() map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) - # celery worker name is of the form celery@myhost.com - queue_name = queue.split('@') - queue_name = queue_name[1 if len(queue_name) > 1 else 0] - queues[queue_name] = active_tasks + # celery worker name is of the form celery@myhost.com + queue_name = queue.split('@') + queue_name = queue_name[1 if len(queue_name) > 1 else 0] + queues[queue_name] = active_tasks else: if not hasattr(settings, 'CELERY_UNIT_TEST'): return (None, None) diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 9e8066c8a1..83a5fcb0ce 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -50,3 +50,20 @@ class TestCleanupInconsistentCeleryTasks(): tm.cleanup_inconsistent_celery_tasks() job.save.assert_called_once() logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.") + + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) + @mock.patch('awx.main.scheduler.task_manager.inspect') + def test_multiple_active_instances_sanity_check(self, inspect_mock, *args): + class MockInspector: + pass + + mock_inspector = MockInspector() + mock_inspector.active = lambda: { + 'celery@host1': [], + 'celery@host2': [] + } + inspect_mock.return_value = mock_inspector + tm = TaskManager() + active_task_queues, queues = tm.get_active_tasks() + assert 'host1' in queues + assert 'host2' in queues