diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 194e4b0964..78382149bb 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -2,15 +2,17 @@ # All Rights Reserved # Python -from datetime import timedelta +from datetime import datetime, timedelta import logging +import json from sets import Set # Django from django.conf import settings +from django.core.cache import cache from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _ -from django.utils.timezone import now as tz_now +from django.utils.timezone import now as tz_now, utc # AWX from awx.main.models import * # noqa @@ -19,7 +21,7 @@ from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock from awx.main.scheduler.dependency_graph import DependencyGraph -from awx.main.tasks import _send_notification_templates +from awx.main import tasks as awx_tasks # Celery from celery.task.control import inspect @@ -376,17 +378,32 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) - def process_celery_tasks(self, celery_task_start_time, active_tasks, all_running_sorted_tasks): + def cleanup_inconsistent_celery_tasks(self): ''' Rectify tower db <-> celery inconsistent view of jobs state ''' + last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc) + if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL: + return + + logger.debug("Failing inconsistent running jobs.") + celery_task_start_time = tz_now() + active_task_queues, active_tasks = self.get_active_tasks() + cache.set("active_celery_tasks", json.dumps(active_task_queues)) + cache.set('last_celery_task_cleanup', tz_now()) + + if active_tasks is None: + logger.error('Failed to retrieve active tasks from celery') + return None + + all_running_sorted_tasks = self.get_running_tasks() for task in all_running_sorted_tasks: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # TODO: try catch the getting of the job. The job COULD have been deleted if isinstance(task, WorkflowJob): continue - if task_obj.modified > celery_task_start_time: + if task.modified > celery_task_start_time: continue task.status = 'failed' task.job_explanation += ' '.join(( @@ -394,7 +411,7 @@ class TaskManager(): 'Celery, so it has been marked as failed.', )) task.save() - _send_notification_templates(task, 'failed') + awx_tasks._send_notification_templates(task, 'failed') task.websocket_emit_status('failed') logger.error("Task %s appears orphaned... marking as failed" % task) @@ -460,8 +477,9 @@ class TaskManager(): if acquired is False: return + self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() # Operations whose queries rely on modifications made during the atomic scheduling session for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): - _send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') + awx_tasks._send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 140c940924..48b80cdbb6 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,19 +1,12 @@ # Python import logging -import json - -# Django -from django.db import transaction -from django.utils.timezone import now as tz_now -from awx.main.utils.pglock import advisory_lock # Celery from celery import task # AWX -from awx.main.scheduler import TaskManager -from django.core.cache import cache +from awx.main.scheduler import TaskManager logger = logging.getLogger('awx.main.scheduler') @@ -36,24 +29,3 @@ def run_job_complete(job_id): def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() - - -@task -def run_fail_inconsistent_running_jobs(): - logger.debug("Running task to fail inconsistent running jobs.") - with transaction.atomic(): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - if acquired is False: - return - - scheduler = TaskManager() - celery_task_start_time = tz_now() - active_task_queues, active_tasks = scheduler.get_active_tasks() - cache.set("active_celery_tasks", json.dumps(active_task_queues)) - if active_tasks is None: - # TODO: Failed to contact celery. We should surface this. - return None - - all_running_sorted_tasks = scheduler.get_running_tasks() - scheduler.process_celery_tasks(celery_task_start_time, active_tasks, all_running_sorted_tasks) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index c345eb85bb..15646dfe54 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -1,6 +1,9 @@ import pytest import mock -from datetime import timedelta +from datetime import timedelta, datetime + +from django.core.cache import cache + from awx.main.scheduler import TaskManager @@ -198,3 +201,32 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 assert len(iu) == 1 + + +@pytest.mark.django_db +def test_cleanup_interval(): + assert cache.get('last_celery_task_cleanup') is None + + TaskManager().cleanup_inconsistent_celery_tasks() + last_cleanup = cache.get('last_celery_task_cleanup') + assert isinstance(last_cleanup, datetime) + + TaskManager().cleanup_inconsistent_celery_tasks() + assert cache.get('last_celery_task_cleanup') == last_cleanup + + +@pytest.mark.django_db +@mock.patch('awx.main.tasks._send_notification_templates') +@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []]) +@mock.patch.object(TaskManager, 'get_running_tasks') +def test_cleanup_inconsistent_task(get_running_tasks, notify): + orphaned_task = mock.Mock(job_explanation='') + get_running_tasks.return_value = [orphaned_task] + TaskManager().cleanup_inconsistent_celery_tasks() + + notify.assert_called_once_with(orphaned_task, 'failed') + orphaned_task.websocket_emit_status.assert_called_once_with('failed') + assert orphaned_task.status == 'failed' + assert orphaned_task.job_explanation == ( + 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' + ) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d48b50eca2..86e0956e3a 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -432,9 +432,7 @@ CELERY_QUEUES = ( Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False), Broadcast('tower_broadcast_all') ) -CELERY_ROUTES = {'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs': {'queue': 'tower', - 'routing_key': 'tower'}, - 'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', +CELERY_ROUTES = {'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', 'routing_key': 'tower'}, 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.launch'}, @@ -473,12 +471,8 @@ CELERYBEAT_SCHEDULE = { 'schedule': timedelta(seconds=20), 'options': {'expires': 20,} }, - 'task_fail_inconsistent_running_jobs': { - 'task': 'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs', - 'schedule': timedelta(seconds=30), - 'options': {'expires': 20,} - }, } +AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 # Django Caching Configuration if is_testing():