periodically run orphaned task cleanup as part of the scheduler

Running orphaned task cleanup within its own scheduled task via
celery-beat causes a race-y lock contention between the cleanup task and
the task scheduler.  Unfortunately, the scheduler and the cleanup task
both run at similar intervals, so this race condition is fairly easy to
hit.  At best, it results in situations where the scheduler is
regularly delayed 20s; depending on timing, this can cause situations
where task execution is needlessly delayed a minute+.  At worst, it can
result in situations where the scheduler is never able to schedule
tasks.

This change implements the cleanup as a periodic block of code in the
scheduler itself that tracks its "last run" time in memcached (by
default, it performs a cleanup every 60 seconds)

see: #6534
This commit is contained in:
Ryan Petrello 2017-07-10 14:03:07 -04:00
parent 35e28e9347
commit 0e29f3617d
4 changed files with 61 additions and 45 deletions

View File

@ -2,15 +2,17 @@
# All Rights Reserved
# Python
from datetime import timedelta
from datetime import datetime, timedelta
import logging
import json
from sets import Set
# Django
from django.conf import settings
from django.core.cache import cache
from django.db import transaction, connection
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now as tz_now
from django.utils.timezone import now as tz_now, utc
# AWX
from awx.main.models import * # noqa
@ -19,7 +21,7 @@ from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.utils.pglock import advisory_lock
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.tasks import _send_notification_templates
from awx.main import tasks as awx_tasks
# Celery
from celery.task.control import inspect
@ -376,17 +378,32 @@ class TaskManager():
if not found_acceptable_queue:
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
def process_celery_tasks(self, celery_task_start_time, active_tasks, all_running_sorted_tasks):
def cleanup_inconsistent_celery_tasks(self):
'''
Rectify tower db <-> celery inconsistent view of jobs state
'''
last_cleanup = cache.get('last_celery_task_cleanup') or datetime.min.replace(tzinfo=utc)
if (tz_now() - last_cleanup).seconds < settings.AWX_INCONSISTENT_TASK_INTERVAL:
return
logger.debug("Failing inconsistent running jobs.")
celery_task_start_time = tz_now()
active_task_queues, active_tasks = self.get_active_tasks()
cache.set("active_celery_tasks", json.dumps(active_task_queues))
cache.set('last_celery_task_cleanup', tz_now())
if active_tasks is None:
logger.error('Failed to retrieve active tasks from celery')
return None
all_running_sorted_tasks = self.get_running_tasks()
for task in all_running_sorted_tasks:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# TODO: try catch the getting of the job. The job COULD have been deleted
if isinstance(task, WorkflowJob):
continue
if task_obj.modified > celery_task_start_time:
if task.modified > celery_task_start_time:
continue
task.status = 'failed'
task.job_explanation += ' '.join((
@ -394,7 +411,7 @@ class TaskManager():
'Celery, so it has been marked as failed.',
))
task.save()
_send_notification_templates(task, 'failed')
awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed')
logger.error("Task %s appears orphaned... marking as failed" % task)
@ -460,8 +477,9 @@ class TaskManager():
if acquired is False:
return
self.cleanup_inconsistent_celery_tasks()
finished_wfjs = self._schedule()
# Operations whose queries rely on modifications made during the atomic scheduling session
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')
awx_tasks._send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')

View File

@ -1,19 +1,12 @@
# Python
import logging
import json
# Django
from django.db import transaction
from django.utils.timezone import now as tz_now
from awx.main.utils.pglock import advisory_lock
# Celery
from celery import task
# AWX
from awx.main.scheduler import TaskManager
from django.core.cache import cache
from awx.main.scheduler import TaskManager
logger = logging.getLogger('awx.main.scheduler')
@ -36,24 +29,3 @@ def run_job_complete(job_id):
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()
@task
def run_fail_inconsistent_running_jobs():
logger.debug("Running task to fail inconsistent running jobs.")
with transaction.atomic():
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
if acquired is False:
return
scheduler = TaskManager()
celery_task_start_time = tz_now()
active_task_queues, active_tasks = scheduler.get_active_tasks()
cache.set("active_celery_tasks", json.dumps(active_task_queues))
if active_tasks is None:
# TODO: Failed to contact celery. We should surface this.
return None
all_running_sorted_tasks = scheduler.get_running_tasks()
scheduler.process_celery_tasks(celery_task_start_time, active_tasks, all_running_sorted_tasks)

View File

@ -1,6 +1,9 @@
import pytest
import mock
from datetime import timedelta
from datetime import timedelta, datetime
from django.core.cache import cache
from awx.main.scheduler import TaskManager
@ -198,3 +201,32 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1
assert len(iu) == 1
@pytest.mark.django_db
def test_cleanup_interval():
assert cache.get('last_celery_task_cleanup') is None
TaskManager().cleanup_inconsistent_celery_tasks()
last_cleanup = cache.get('last_celery_task_cleanup')
assert isinstance(last_cleanup, datetime)
TaskManager().cleanup_inconsistent_celery_tasks()
assert cache.get('last_celery_task_cleanup') == last_cleanup
@pytest.mark.django_db
@mock.patch('awx.main.tasks._send_notification_templates')
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []])
@mock.patch.object(TaskManager, 'get_running_tasks')
def test_cleanup_inconsistent_task(get_running_tasks, notify):
orphaned_task = mock.Mock(job_explanation='')
get_running_tasks.return_value = [orphaned_task]
TaskManager().cleanup_inconsistent_celery_tasks()
notify.assert_called_once_with(orphaned_task, 'failed')
orphaned_task.websocket_emit_status.assert_called_once_with('failed')
assert orphaned_task.status == 'failed'
assert orphaned_task.job_explanation == (
'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.'
)

View File

@ -432,9 +432,7 @@ CELERY_QUEUES = (
Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False),
Broadcast('tower_broadcast_all')
)
CELERY_ROUTES = {'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs': {'queue': 'tower',
'routing_key': 'tower'},
'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower',
CELERY_ROUTES = {'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower',
'routing_key': 'tower'},
'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler',
'routing_key': 'tower_scheduler.job.launch'},
@ -473,12 +471,8 @@ CELERYBEAT_SCHEDULE = {
'schedule': timedelta(seconds=20),
'options': {'expires': 20,}
},
'task_fail_inconsistent_running_jobs': {
'task': 'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs',
'schedule': timedelta(seconds=30),
'options': {'expires': 20,}
},
}
AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
# Django Caching Configuration
if is_testing():