mirror of
https://github.com/ansible/awx.git
synced 2026-02-27 15:58:45 -03:30
fix 2 data source inconcistency with failing tasks
* Do not "trust" the list of celery ids for database entries that were modified after the list of celery ids was gotten. * err on the side of caution and just let the next heartbeat celery killer try killing the task if it needs to be reaped.
This commit is contained in:
@@ -376,7 +376,7 @@ class TaskManager():
|
|||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
|
logger.debug("Task {} couldn't be scheduled on graph, waiting for next cycle".format(task))
|
||||||
|
|
||||||
def process_celery_tasks(self, active_tasks, all_running_sorted_tasks):
|
def process_celery_tasks(self, celery_task_start_time, active_tasks, all_running_sorted_tasks):
|
||||||
'''
|
'''
|
||||||
Rectify tower db <-> celery inconsistent view of jobs state
|
Rectify tower db <-> celery inconsistent view of jobs state
|
||||||
'''
|
'''
|
||||||
@@ -384,13 +384,9 @@ class TaskManager():
|
|||||||
|
|
||||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||||
# TODO: try catch the getting of the job. The job COULD have been deleted
|
# TODO: try catch the getting of the job. The job COULD have been deleted
|
||||||
# Ensure job did not finish running between the time we get the
|
|
||||||
# list of task id's from celery and now.
|
|
||||||
# Note: This is an actual fix, not a reduction in the time
|
|
||||||
# window that this can happen.
|
|
||||||
if isinstance(task, WorkflowJob):
|
if isinstance(task, WorkflowJob):
|
||||||
continue
|
continue
|
||||||
if task.status != 'running':
|
if task_obj.modified > celery_task_start_time:
|
||||||
continue
|
continue
|
||||||
task.status = 'failed'
|
task.status = 'failed'
|
||||||
task.job_explanation += ' '.join((
|
task.job_explanation += ' '.join((
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import json
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
|
from django.utils.timezone import now as tz_now
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery import task
|
from celery import task
|
||||||
@@ -48,6 +49,7 @@ def run_fail_inconsistent_running_jobs():
|
|||||||
return
|
return
|
||||||
|
|
||||||
scheduler = TaskManager()
|
scheduler = TaskManager()
|
||||||
|
celery_task_start_time = tz_now()
|
||||||
active_task_queues, active_tasks = scheduler.get_active_tasks()
|
active_task_queues, active_tasks = scheduler.get_active_tasks()
|
||||||
cache.set("active_celery_tasks", json.dumps(active_task_queues))
|
cache.set("active_celery_tasks", json.dumps(active_task_queues))
|
||||||
if active_tasks is None:
|
if active_tasks is None:
|
||||||
@@ -55,5 +57,5 @@ def run_fail_inconsistent_running_jobs():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
all_running_sorted_tasks = scheduler.get_running_tasks()
|
all_running_sorted_tasks = scheduler.get_running_tasks()
|
||||||
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
|
scheduler.process_celery_tasks(celery_task_start_time, active_tasks, all_running_sorted_tasks)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user