From 0e29f3617d66749c33b9ffbc94c389e124d13b95 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Mon, 10 Jul 2017 14:03:07 -0400 Subject: [PATCH] periodically run orphaned task cleanup as part of the scheduler Running orphaned task cleanup within its own scheduled task via celery-beat causes a race-y lock contention between the cleanup task and the task scheduler. Unfortunately, the scheduler and the cleanup task both run at similar intervals, so this race condition is fairly easy to hit. At best, it results in situations where the scheduler is regularly delayed 20s; depending on timing, this can cause situations where task execution is needlessly delayed a minute+. At worst, it can result in situations where the scheduler is never able to schedule tasks. This change implements the cleanup as a periodic block of code in the scheduler itself that tracks its "last run" time in memcached (by default, it performs a cleanup every 60 seconds) see: #6534 --- awx/main/scheduler/__init__.py | 32 +++++++++++++---- awx/main/scheduler/tasks.py | 30 +--------------- .../task_management/test_scheduler.py | 34 ++++++++++++++++++- awx/settings/defaults.py | 10 ++---- 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 194e4b0964..78382149bb 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -2,15 +2,17 @@ # All Rights Reserved # Python -from datetime import timedelta +from datetime import datetime, timedelta import logging +import json from sets import Set # Django from django.conf import settings +from django.core.cache import cache from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _ -from django.utils.timezone import now as tz_now +from django.utils.timezone import now as tz_now, utc # AWX from awx.main.models import * # noqa @@ -19,7 +21,7 @@ from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock from awx.main.scheduler.dependency_graph import DependencyGraph -from awx.main.tasks import _send_notification_templates +from awx.main import tasks as awx_tasks # Celery from celery.task.control import inspect @@ -376,17 +378,32 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) - def process_celery_tasks(self, celery_task_start_time, active_tasks, all_running_sorted_tasks): + def cleanup_inconsistent_celery_tasks(self): ''' Rectify tower db <-> celery inconsistent view of jobs state ''' + last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc) + if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL: + return + + logger.debug("Failing inconsistent running jobs.") + celery_task_start_time = tz_now() + active_task_queues, active_tasks = self.get_active_tasks() + cache.set("active_celery_tasks", json.dumps(active_task_queues)) + cache.set('last_celery_task_cleanup', tz_now()) + + if active_tasks is None: + logger.error('Failed to retrieve active tasks from celery') + return None + + all_running_sorted_tasks = self.get_running_tasks() for task in all_running_sorted_tasks: if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # TODO: try catch the getting of the job. The job COULD have been deleted if isinstance(task, WorkflowJob): continue - if task_obj.modified > celery_task_start_time: + if task.modified > celery_task_start_time: continue task.status = 'failed' task.job_explanation += ' '.join(( @@ -394,7 +411,7 @@ class TaskManager(): 'Celery, so it has been marked as failed.', )) task.save() - _send_notification_templates(task, 'failed') + awx_tasks._send_notification_templates(task, 'failed') task.websocket_emit_status('failed') logger.error("Task %s appears orphaned... marking as failed" % task) @@ -460,8 +477,9 @@ class TaskManager(): if acquired is False: return + self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() # Operations whose queries rely on modifications made during the atomic scheduling session for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): - _send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') + awx_tasks._send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed') diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 140c940924..48b80cdbb6 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,19 +1,12 @@ # Python import logging -import json - -# Django -from django.db import transaction -from django.utils.timezone import now as tz_now -from awx.main.utils.pglock import advisory_lock # Celery from celery import task # AWX -from awx.main.scheduler import TaskManager -from django.core.cache import cache +from awx.main.scheduler import TaskManager logger = logging.getLogger('awx.main.scheduler') @@ -36,24 +29,3 @@ def run_job_complete(job_id): def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() - - -@task -def run_fail_inconsistent_running_jobs(): - logger.debug("Running task to fail inconsistent running jobs.") - with transaction.atomic(): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - if acquired is False: - return - - scheduler = TaskManager() - celery_task_start_time = tz_now() - active_task_queues, active_tasks = scheduler.get_active_tasks() - cache.set("active_celery_tasks", json.dumps(active_task_queues)) - if active_tasks is None: - # TODO: Failed to contact celery. We should surface this. - return None - - all_running_sorted_tasks = scheduler.get_running_tasks() - scheduler.process_celery_tasks(celery_task_start_time, active_tasks, all_running_sorted_tasks) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index c345eb85bb..15646dfe54 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -1,6 +1,9 @@ import pytest import mock -from datetime import timedelta +from datetime import timedelta, datetime + +from django.core.cache import cache + from awx.main.scheduler import TaskManager @@ -198,3 +201,32 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 assert len(iu) == 1 + + +@pytest.mark.django_db +def test_cleanup_interval(): + assert cache.get('last_celery_task_cleanup') is None + + TaskManager().cleanup_inconsistent_celery_tasks() + last_cleanup = cache.get('last_celery_task_cleanup') + assert isinstance(last_cleanup, datetime) + + TaskManager().cleanup_inconsistent_celery_tasks() + assert cache.get('last_celery_task_cleanup') == last_cleanup + + +@pytest.mark.django_db +@mock.patch('awx.main.tasks._send_notification_templates') +@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []]) +@mock.patch.object(TaskManager, 'get_running_tasks') +def test_cleanup_inconsistent_task(get_running_tasks, notify): + orphaned_task = mock.Mock(job_explanation='') + get_running_tasks.return_value = [orphaned_task] + TaskManager().cleanup_inconsistent_celery_tasks() + + notify.assert_called_once_with(orphaned_task, 'failed') + orphaned_task.websocket_emit_status.assert_called_once_with('failed') + assert orphaned_task.status == 'failed' + assert orphaned_task.job_explanation == ( + 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' + ) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d48b50eca2..86e0956e3a 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -432,9 +432,7 @@ CELERY_QUEUES = ( Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False), Broadcast('tower_broadcast_all') ) -CELERY_ROUTES = {'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs': {'queue': 'tower', - 'routing_key': 'tower'}, - 'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', +CELERY_ROUTES = {'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', 'routing_key': 'tower'}, 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.launch'}, @@ -473,12 +471,8 @@ CELERYBEAT_SCHEDULE = { 'schedule': timedelta(seconds=20), 'options': {'expires': 20,} }, - 'task_fail_inconsistent_running_jobs': { - 'task': 'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs', - 'schedule': timedelta(seconds=30), - 'options': {'expires': 20,} - }, } +AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 # Django Caching Configuration if is_testing():