From d615e2e9ff4ae63803b02c3a338271cfdf6585f9 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 16 Aug 2017 08:30:32 -0400 Subject: [PATCH] do not include workflow jobs for reaping * Workflow jobs are virtual jobs that don't actually run. Thus they won't have a celery id and aren't candidates for the generic reaping. * Better error logging when Instance not found in reaping code. --- awx/main/scheduler/__init__.py | 10 +++++++--- .../tests/functional/task_management/test_scheduler.py | 9 +++++++-- awx/main/tests/unit/test_task_manager.py | 4 ++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index cdd9a55f3b..abc7c167ee 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -14,6 +14,7 @@ from django.db import transaction, connection, DatabaseError from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now as tz_now, utc from django.db.models import Q +from django.contrib.contenttypes.models import ContentType # AWX from awx.main.models import * # noqa @@ -78,8 +79,10 @@ class TaskManager(): def get_running_tasks(self): execution_nodes = {} now = tz_now() - jobs = UnifiedJob.objects.filter(Q(status='running') | - Q(status='waiting', modified__lte=now - timedelta(seconds=60))) + workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id + jobs = UnifiedJob.objects.filter((Q(status='running') | + Q(status='waiting', modified__lte=now - timedelta(seconds=60))) & + ~Q(polymorphic_ctype_id=workflow_ctype_id)) [execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs] return execution_nodes @@ -445,7 +448,8 @@ class TaskManager(): continue except Instance.DoesNotExist: logger.error("Execution node Instance {} not found in database. " - "The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs])) + "The node is currently executing jobs {}".format(node, + [j.log_format for j in node_jobs])) active_tasks = [] for task in node_jobs: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 335505a65e..01bb88d0a0 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -9,6 +9,7 @@ from awx.main.scheduler import TaskManager from awx.main.models import ( Job, Instance, + WorkflowJob, ) @@ -250,7 +251,9 @@ class TestReaper(): j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline') - js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11] + j12 = WorkflowJob.objects.create(status='running', celery_task_id='workflow_job', execution_node='host1') + + js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12] for j in js: j.save = mocker.Mock(wraps=j.save) j.websocket_emit_status = mocker.Mock() @@ -263,7 +266,7 @@ class TestReaper(): @pytest.fixture def running_tasks(self, all_jobs): return { - 'host1': all_jobs[2:5], + 'host1': all_jobs[2:5] + [all_jobs[11]], 'host2': all_jobs[5:8], 'host3_split': all_jobs[8:10], 'host4_offline': [all_jobs[10]], @@ -331,3 +334,5 @@ class TestReaper(): assert all_jobs[9] in execution_nodes_jobs['host3_split'] assert all_jobs[10] in execution_nodes_jobs['host4_offline'] + + assert all_jobs[11] not in execution_nodes_jobs['host1'] diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index fc0be720c8..b479952e53 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -31,8 +31,8 @@ class TestCleanupInconsistentCeleryTasks(): assert "mocked" in str(excinfo.value) logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. " - "The node is currently executing jobs ['None-2-new', " - "'None-3-new']") + "The node is currently executing jobs ['job 2 (new)', " + "'job 3 (new)']") @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))