diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 055ab4c9c1..55416de5a5 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -958,6 +958,15 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return (True, opts) def start_celery_task(self, opts, error_callback, success_callback, queue): + kwargs = { + 'link_error': error_callback, + 'link': success_callback, + 'queue': None, + 'task_id': None, + } + if not self.celery_task_id: + raise RuntimeError("Expected celery_task_id to be set on model.") + kwargs['task_id'] = self.celery_task_id task_class = self._get_task_class() from awx.main.models.ha import InstanceGroup ig = InstanceGroup.objects.get(name=queue) @@ -968,7 +977,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique args.append(isolated_instance.hostname) else: # proj & inv updates, system jobs run on controller queue = ig.controller.name - task_class().apply_async(args, opts, link_error=error_callback, link=success_callback, queue=queue) + kwargs['queue'] = queue + task_class().apply_async(args, opts, **kwargs) def start(self, error_callback, success_callback, **kwargs): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 1af72ff4c1..11bb73a651 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -4,6 +4,7 @@ # Python from datetime import datetime, timedelta import logging +import uuid from sets import Set # Django @@ -12,6 +13,7 @@ from django.core.cache import cache from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now as tz_now, utc +from django.db.models import Q # AWX from awx.main.models import * # noqa @@ -68,10 +70,10 @@ class TaskManager(): ''' Tasks that are running and SHOULD have a celery task. ''' - def get_running_tasks(self, all_tasks=None): - if all_tasks is None: - return self.get_tasks(status_list=('running',)) - return filter(lambda t: t.status == 'running', all_tasks) + def get_running_tasks(self): + now = tz_now() + return list(UnifiedJob.objects.filter(Q(status='running') | + (Q(status='waiting', modified__lte=now - timedelta(seconds=60))))) ''' Tasks that are currently running in celery @@ -216,6 +218,7 @@ class TaskManager(): task.instance_group = rampart_group logger.info('Submitting job {} to instance group {}.'.format(task.id, task.instance_group_id)) with disable_activity_stream(): + task.celery_task_id = uuid.uuid4() task.save() self.consume_capacity(task, rampart_group.name) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index bd6e999848..7d231c83f6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -755,7 +755,7 @@ class BaseTask(LogErrorsTask): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) + instance = self.update_model(pk, status='running') instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 28131ec385..3c2823c0d1 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -78,7 +78,8 @@ class TestIsolatedRuns: iso_ig.instances.create(hostname='iso1', capacity=50) i2 = iso_ig.instances.create(hostname='iso2', capacity=200) job = Job.objects.create( - instance_group=iso_ig + instance_group=iso_ig, + celery_task_id='something', ) mock_async = mock.MagicMock() @@ -91,7 +92,11 @@ class TestIsolatedRuns: with mock.patch.object(job, '_get_task_class') as task_class: task_class.return_value = MockTaskClass job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id, 'iso2'], [], link_error=error_callback, link=success_callback, queue='thepentagon') + mock_async.assert_called_with([job.id, 'iso2'], [], + link_error=error_callback, + link=success_callback, + queue='thepentagon', + task_id='something') i2.capacity = 20 i2.save() @@ -99,4 +104,8 @@ class TestIsolatedRuns: with mock.patch.object(job, '_get_task_class') as task_class: task_class.return_value = MockTaskClass job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id, 'iso1'], [], link_error=error_callback, link=success_callback, queue='thepentagon') + mock_async.assert_called_with([job.id, 'iso1'], [], + link_error=error_callback, + link=success_callback, + queue='thepentagon', + task_id='something') diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index cb1d689577..7fe41ca781 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -3,8 +3,52 @@ import mock from datetime import timedelta, datetime from django.core.cache import cache +from django.utils.timezone import now as tz_now from awx.main.scheduler import TaskManager +from awx.main.models import ( + Job, +) + + +@pytest.fixture +def all_jobs(mocker): + now = tz_now() + j1 = Job.objects.create(status='pending') + j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2') + j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3') + j3.modified = now - timedelta(seconds=60) + j3.save(update_fields=['modified']) + j4 = Job.objects.create(status='running', celery_task_id='considered_j4') + j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5') + j5.modified = now - timedelta(seconds=60) + j5.save(update_fields=['modified']) + + js = [j1, j2, j3, j4, j5] + for j in js: + j.save = mocker.Mock(wraps=j.save) + j.websocket_emit_status = mocker.Mock() + return js + + +@pytest.fixture +def considered_jobs(all_jobs): + return all_jobs[2:4] + [all_jobs[4]] + + +@pytest.fixture +def reapable_jobs(all_jobs): + return [all_jobs[4]] + + +@pytest.fixture +def unconsidered_jobs(all_jobs): + return all_jobs[0:1] + + +@pytest.fixture +def active_tasks(): + return ([], ['considered_j2', 'considered_j3', 'considered_j4',]) @pytest.mark.django_db @@ -217,15 +261,38 @@ def test_cleanup_interval(): @pytest.mark.django_db @mock.patch('awx.main.tasks._send_notification_templates') @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []]) -@mock.patch.object(TaskManager, 'get_running_tasks') -def test_cleanup_inconsistent_task(get_running_tasks, notify): - orphaned_task = mock.Mock(job_explanation='') - get_running_tasks.return_value = [orphaned_task] - TaskManager().cleanup_inconsistent_celery_tasks() +def test_cleanup_inconsistent_task(notify, active_tasks, considered_jobs, reapable_jobs, mocker): + tm = TaskManager() + + tm.get_running_tasks = mocker.Mock(return_value=considered_jobs) + tm.get_active_tasks = mocker.Mock(return_value=active_tasks) + + tm.cleanup_inconsistent_celery_tasks() + + for j in considered_jobs: + if j not in reapable_jobs: + j.save.assert_not_called() + + for reaped_job in reapable_jobs: + notify.assert_called_once_with(reaped_job, 'failed') + reaped_job.websocket_emit_status.assert_called_once_with('failed') + assert reaped_job.status == 'failed' + assert reaped_job.job_explanation == ( + 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' + ) + + +@pytest.mark.django_db +def test_get_running_tasks(considered_jobs, reapable_jobs, unconsidered_jobs): + tm = TaskManager() + + # Ensure the query grabs the expected jobs + rt = tm.get_running_tasks() + for j in considered_jobs: + assert j in rt + for j in reapable_jobs: + assert j in rt + for j in unconsidered_jobs: + assert j in unconsidered_jobs + - notify.assert_called_once_with(orphaned_task, 'failed') - orphaned_task.websocket_emit_status.assert_called_once_with('failed') - assert orphaned_task.status == 'failed' - assert orphaned_task.job_explanation == ( - 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' - ) diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 290f3ad243..7e888942c4 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -257,7 +257,7 @@ class TestGenericRun(TestJobExecution): with pytest.raises(Exception): self.task.run(self.pk) for c in [ - mock.call(self.pk, celery_task_id='', status='running'), + mock.call(self.pk, status='running'), mock.call(self.pk, output_replacements=[], result_traceback=mock.ANY, status='canceled') ]: assert c in self.task.update_model.call_args_list