mirror of
https://github.com/ansible/awx.git
synced 2026-05-16 13:57:39 -02:30
celery task fail check now uses pglock
* Align locking used by celery task cleaner upper with regular task manager. * Uses pglock/advisory lock instead of abusing Instance table lock.
This commit is contained in:
@@ -5,16 +5,16 @@ import json
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
from django.db.utils import DatabaseError
|
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery import task
|
from celery import task
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import Instance
|
|
||||||
from awx.main.scheduler import TaskManager
|
from awx.main.scheduler import TaskManager
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
|
|
||||||
|
from django_pglocks import advisory_lock
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.scheduler')
|
logger = logging.getLogger('awx.main.scheduler')
|
||||||
|
|
||||||
# TODO: move logic to UnifiedJob model and use bind=True feature of celery.
|
# TODO: move logic to UnifiedJob model and use bind=True feature of celery.
|
||||||
@@ -43,8 +43,10 @@ def run_fail_inconsistent_running_jobs():
|
|||||||
logger.debug("Running task to fail inconsistent running jobs.")
|
logger.debug("Running task to fail inconsistent running jobs.")
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
# Lock
|
# Lock
|
||||||
try:
|
with advisory_lock('task_manager_lock', wait=False) as acquired:
|
||||||
Instance.objects.select_for_update(nowait=True).all()[0]
|
if acquired is False:
|
||||||
|
return
|
||||||
|
|
||||||
scheduler = TaskManager()
|
scheduler = TaskManager()
|
||||||
active_task_queues, active_tasks = scheduler.get_active_tasks()
|
active_task_queues, active_tasks = scheduler.get_active_tasks()
|
||||||
cache.set("active_celery_tasks", json.dumps(active_task_queues))
|
cache.set("active_celery_tasks", json.dumps(active_task_queues))
|
||||||
@@ -54,6 +56,4 @@ def run_fail_inconsistent_running_jobs():
|
|||||||
|
|
||||||
all_running_sorted_tasks = scheduler.get_running_tasks()
|
all_running_sorted_tasks = scheduler.get_running_tasks()
|
||||||
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
|
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
|
||||||
except DatabaseError:
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user