mirror of
https://github.com/ansible/awx.git
synced 2026-02-28 00:08:44 -03:30
Improve transactional integrity for starting controller jobs in dispatcherd (#16300)
Remove SELECT FOR UPDATE from job dispatch to reduce transaction rollbacks
Move status transition from BaseTask.transition_status (which used
SELECT FOR UPDATE inside transaction.atomic()) into
dispatch_waiting_jobs. The new approach uses filter().update() which
is atomic at the database level without requiring explicit row locks,
reducing transaction contention and rollbacks observed in perfscale
testing.
The transition_status method was an artifact of the feature flag era
where we needed to support both old and new code paths. Since
dispatch_waiting_jobs is already a singleton
(on_duplicate='queue_one') scoped to the local node, the
de-duplication logic is unnecessary.
Status is updated after task submission to dispatcherd, so the job's
UUID is in the dispatch pipeline before being marked running —
preventing the reaper from incorrectly reaping jobs during the
handoff window. RunJob.run() handles the race where a worker picks
up the task before the status update lands by accepting waiting and
transitioning it to running itself.
Signed-off-by: Seth Foster <fosterbseth@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,7 +17,6 @@ import urllib.parse as urlparse
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import transaction
|
|
||||||
|
|
||||||
# Shared code for the AWX platform
|
# Shared code for the AWX platform
|
||||||
from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT, get_incontainer_path
|
from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT, get_incontainer_path
|
||||||
@@ -205,6 +204,7 @@ def dispatch_waiting_jobs(binder):
|
|||||||
if not kwargs:
|
if not kwargs:
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id})
|
binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id})
|
||||||
|
UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='running', start_args='')
|
||||||
|
|
||||||
|
|
||||||
class BaseTask(object):
|
class BaseTask(object):
|
||||||
@@ -551,48 +551,32 @@ class BaseTask(object):
|
|||||||
def should_use_fact_cache(self):
|
def should_use_fact_cache(self):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def transition_status(self, pk: int) -> bool:
|
|
||||||
"""Atomically transition status to running, if False returned, another process got it"""
|
|
||||||
with transaction.atomic():
|
|
||||||
# Explanation of parts for the fetch:
|
|
||||||
# .values - avoid loading a full object, this is known to lead to deadlocks due to signals
|
|
||||||
# the signals load other related rows which another process may be locking, and happens in practice
|
|
||||||
# of=('self',) - keeps FK tables out of the lock list, another way deadlocks can happen
|
|
||||||
# .get - just load the single job
|
|
||||||
instance_data = UnifiedJob.objects.select_for_update(of=('self',)).values('status', 'cancel_flag').get(pk=pk)
|
|
||||||
|
|
||||||
# If status is not waiting (obtained under lock) then this process does not have clearence to run
|
|
||||||
if instance_data['status'] == 'waiting':
|
|
||||||
if instance_data['cancel_flag']:
|
|
||||||
updated_status = 'canceled'
|
|
||||||
else:
|
|
||||||
updated_status = 'running'
|
|
||||||
# Explanation of the update:
|
|
||||||
# .filter - again, do not load the full object
|
|
||||||
# .update - a bulk update on just that one row, avoid loading unintended data
|
|
||||||
UnifiedJob.objects.filter(pk=pk).update(status=updated_status, start_args='')
|
|
||||||
elif instance_data['status'] == 'running':
|
|
||||||
logger.info(f'Job {pk} is being ran by another process, exiting')
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
@with_path_cleanup
|
@with_path_cleanup
|
||||||
@with_signal_handling
|
@with_signal_handling
|
||||||
def run(self, pk, **kwargs):
|
def run(self, pk, **kwargs):
|
||||||
"""
|
"""
|
||||||
Run the job/task and capture its output.
|
Run the job/task and capture its output.
|
||||||
"""
|
"""
|
||||||
if not self.instance: # Used to skip fetch for local runs
|
|
||||||
if not self.transition_status(pk):
|
|
||||||
logger.info(f'Job {pk} is being ran by another process, exiting')
|
|
||||||
return
|
|
||||||
|
|
||||||
# Load the instance
|
if not self.instance: # Used to skip fetch for local runs
|
||||||
self.instance = self.update_model(pk)
|
# Load the instance
|
||||||
|
self.instance = self.update_model(pk)
|
||||||
|
|
||||||
|
# status should be "running" from dispatch_waiting_jobs,
|
||||||
|
# but may still be "waiting" if the worker picked this up before the status update landed.
|
||||||
|
if self.instance.status == 'waiting':
|
||||||
|
UnifiedJob.objects.filter(pk=pk).update(status="running", start_args='')
|
||||||
|
self.instance.refresh_from_db()
|
||||||
|
|
||||||
if self.instance.status != 'running':
|
if self.instance.status != 'running':
|
||||||
logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected')
|
logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if self.instance.cancel_flag:
|
||||||
|
self.instance = self.update_model(pk, status='canceled')
|
||||||
|
self.instance.websocket_emit_status('canceled')
|
||||||
|
return
|
||||||
|
|
||||||
self.instance.websocket_emit_status("running")
|
self.instance.websocket_emit_status("running")
|
||||||
status, rc = 'error', None
|
status, rc = 'error', None
|
||||||
self.runner_callback.event_ct = 0
|
self.runner_callback.event_ct = 0
|
||||||
|
|||||||
@@ -29,3 +29,30 @@ def test_cancel_flag_on_start(jt_linked, caplog):
|
|||||||
|
|
||||||
job = Job.objects.get(id=job.id)
|
job = Job.objects.get(id=job.id)
|
||||||
assert job.status == 'canceled'
|
assert job.status == 'canceled'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_runjob_run_can_accept_waiting_status(jt_linked, mocker):
|
||||||
|
"""Test that RunJob.run() can accept a job in 'waiting' status and transition it to 'running'
|
||||||
|
before the pre_run_hook is called"""
|
||||||
|
job = jt_linked.create_unified_job()
|
||||||
|
job.status = 'waiting'
|
||||||
|
job.save()
|
||||||
|
|
||||||
|
status_at_pre_run = None
|
||||||
|
|
||||||
|
def capture_status(instance, private_data_dir):
|
||||||
|
nonlocal status_at_pre_run
|
||||||
|
instance.refresh_from_db()
|
||||||
|
status_at_pre_run = instance.status
|
||||||
|
|
||||||
|
mock_pre_run = mocker.patch.object(RunJob, 'pre_run_hook', side_effect=capture_status)
|
||||||
|
|
||||||
|
task = RunJob()
|
||||||
|
try:
|
||||||
|
task.run(job.id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
mock_pre_run.assert_called_once()
|
||||||
|
assert status_at_pre_run == 'running'
|
||||||
|
|||||||
Reference in New Issue
Block a user