diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index ce750489fa..93c1194366 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -17,7 +17,6 @@ import urllib.parse as urlparse # Django from django.conf import settings -from django.db import transaction # Shared code for the AWX platform from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT, get_incontainer_path @@ -205,6 +204,7 @@ def dispatch_waiting_jobs(binder): if not kwargs: kwargs = {} binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id}) + UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='running', start_args='') class BaseTask(object): @@ -551,48 +551,32 @@ class BaseTask(object): def should_use_fact_cache(self): return False - def transition_status(self, pk: int) -> bool: - """Atomically transition status to running, if False returned, another process got it""" - with transaction.atomic(): - # Explanation of parts for the fetch: - # .values - avoid loading a full object, this is known to lead to deadlocks due to signals - # the signals load other related rows which another process may be locking, and happens in practice - # of=('self',) - keeps FK tables out of the lock list, another way deadlocks can happen - # .get - just load the single job - instance_data = UnifiedJob.objects.select_for_update(of=('self',)).values('status', 'cancel_flag').get(pk=pk) - - # If status is not waiting (obtained under lock) then this process does not have clearence to run - if instance_data['status'] == 'waiting': - if instance_data['cancel_flag']: - updated_status = 'canceled' - else: - updated_status = 'running' - # Explanation of the update: - # .filter - again, do not load the full object - # .update - a bulk update on just that one row, avoid loading unintended data - UnifiedJob.objects.filter(pk=pk).update(status=updated_status, start_args='') - elif instance_data['status'] == 'running': - logger.info(f'Job {pk} is being ran by another process, exiting') - return False - return True - @with_path_cleanup @with_signal_handling def run(self, pk, **kwargs): """ Run the job/task and capture its output. """ - if not self.instance: # Used to skip fetch for local runs - if not self.transition_status(pk): - logger.info(f'Job {pk} is being ran by another process, exiting') - return - # Load the instance - self.instance = self.update_model(pk) + if not self.instance: # Used to skip fetch for local runs + # Load the instance + self.instance = self.update_model(pk) + + # status should be "running" from dispatch_waiting_jobs, + # but may still be "waiting" if the worker picked this up before the status update landed. + if self.instance.status == 'waiting': + UnifiedJob.objects.filter(pk=pk).update(status="running", start_args='') + self.instance.refresh_from_db() + if self.instance.status != 'running': logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected') return + if self.instance.cancel_flag: + self.instance = self.update_model(pk, status='canceled') + self.instance.websocket_emit_status('canceled') + return + self.instance.websocket_emit_status("running") status, rc = 'error', None self.runner_callback.event_ct = 0 diff --git a/awx/main/tests/functional/tasks/test_tasks_jobs.py b/awx/main/tests/functional/tasks/test_tasks_jobs.py index 012ee20fdb..ffd5fce4e8 100644 --- a/awx/main/tests/functional/tasks/test_tasks_jobs.py +++ b/awx/main/tests/functional/tasks/test_tasks_jobs.py @@ -29,3 +29,30 @@ def test_cancel_flag_on_start(jt_linked, caplog): job = Job.objects.get(id=job.id) assert job.status == 'canceled' + + +@pytest.mark.django_db +def test_runjob_run_can_accept_waiting_status(jt_linked, mocker): + """Test that RunJob.run() can accept a job in 'waiting' status and transition it to 'running' + before the pre_run_hook is called""" + job = jt_linked.create_unified_job() + job.status = 'waiting' + job.save() + + status_at_pre_run = None + + def capture_status(instance, private_data_dir): + nonlocal status_at_pre_run + instance.refresh_from_db() + status_at_pre_run = instance.status + + mock_pre_run = mocker.patch.object(RunJob, 'pre_run_hook', side_effect=capture_status) + + task = RunJob() + try: + task.run(job.id) + except Exception: + pass + + mock_pre_run.assert_called_once() + assert status_at_pre_run == 'running'