From f3f9782c0b2c6819dbd4deff25140be38b5cb64e Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 5 Jul 2017 12:08:31 -0400 Subject: [PATCH] fix 2 data source inconcistency with failing tasks * Do not "trust" the list of celery ids for database entries that were modified after the list of celery ids was gotten. * err on the side of caution and just let the next heartbeat celery killer try killing the task if it needs to be reaped. --- awx/main/scheduler/__init__.py | 8 ++------ awx/main/scheduler/tasks.py | 4 +++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 2136a58fe8..5f309ce0bc 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -376,7 +376,7 @@ class TaskManager(): if not found_acceptable_queue: logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task)) - def process_celery_tasks(self, active_tasks, all_running_sorted_tasks): + def process_celery_tasks(self, celery_task_start_time, active_tasks, all_running_sorted_tasks): ''' Rectify tower db <-> celery inconsistent view of jobs state ''' @@ -384,13 +384,9 @@ class TaskManager(): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # TODO: try catch the getting of the job. The job COULD have been deleted - # Ensure job did not finish running between the time we get the - # list of task id's from celery and now. - # Note: This is an actual fix, not a reduction in the time - # window that this can happen. if isinstance(task, WorkflowJob): continue - if task.status != 'running': + if task_obj.modified > celery_task_start_time: continue task.status = 'failed' task.job_explanation += ' '.join(( diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 8e6b4ec836..d2200d8912 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -5,6 +5,7 @@ import json # Django from django.db import transaction +from django.utils.timezone import now as tz_now # Celery from celery import task @@ -48,6 +49,7 @@ def run_fail_inconsistent_running_jobs(): return scheduler = TaskManager() + celery_task_start_time = tz_now() active_task_queues, active_tasks = scheduler.get_active_tasks() cache.set("active_celery_tasks", json.dumps(active_task_queues)) if active_tasks is None: @@ -55,5 +57,5 @@ def run_fail_inconsistent_running_jobs(): return None all_running_sorted_tasks = scheduler.get_running_tasks() - scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks) + scheduler.process_celery_tasks(celery_task_start_time, active_tasks, all_running_sorted_tasks)