diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index a255e168cd..d9a3f36324 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -432,6 +432,10 @@ class AutoscalePool(WorkerPool): idx = random.choice(range(len(self.workers))) self.write(idx, m) + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting() + # if the database says a job is running on this node, but it's *not*, # then reap it running_uuids = [] diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index ba752f4fe6..fc67b5762f 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -24,13 +24,11 @@ def startup_reaping(): job_ids = [] for j in jobs: job_ids.append(j.id) - j.status = 'failed' - j.start_args = '' - j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.' - j.save(update_fields=['status', 'start_args', 'job_explanation']) - if hasattr(j, 'send_notification_templates'): - j.send_notification_templates('failed') - j.websocket_emit_status('failed') + reap_job( + j, + 'failed', + job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.', + ) if job_ids: logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') @@ -54,9 +52,9 @@ def reap_job(j, status): logger.error('{} is no longer running; reaping'.format(j.log_format)) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap_waiting(instance=None, status='failed', grace_period=60): """ - Reap all jobs in waiting|running for this instance. + Reap all jobs in waiting for this instance. """ me = instance if me is None: @@ -66,11 +64,25 @@ def reap(instance=None, status='failed', excluded_uuids=[]): logger.warning(f'Local instance is not registered, not running reaper: {e}') return now = tz_now() + jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + for j in jobs: + reap_job(j, status) + + +def reap(instance=None, status='failed', excluded_uuids=[]): + """ + Reap all jobs in running for this instance. + """ + me = instance + if me is None: + try: + me = Instance.objects.me() + except RuntimeError as e: + logger.warning(f'Local instance is not registered, not running reaper: {e}') + return workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( - (Q(status='running') | Q(status='waiting', modified__lte=now - timedelta(seconds=60))) - & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) - & ~Q(polymorphic_ctype_id=workflow_ctype_id) + Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) ).exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index a5f0a8de00..954ced13fd 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -36,6 +36,7 @@ from awx.main.constants import ( JOB_FOLDER_PREFIX, MAX_ISOLATED_PATH_COLON_DELIMITER, CONTAINER_VOLUMES_MOUNT_TYPES, + ACTIVE_STATES, ) from awx.main.models import ( Instance, @@ -451,6 +452,11 @@ class BaseTask(object): Run the job/task and capture its output. """ self.instance = self.model.objects.get(pk=pk) + if self.instance.status != 'canceled' and self.instance.cancel_flag: + self.instance = self.update_model(self.instance.pk, start_args='', status='canceled') + if self.instance.status not in ACTIVE_STATES: + # Prevent starting the job if it has been reaped or handled by another process. + raise RuntimeError(f'Not starting {self.instance.status} task pk={pk} because {self.instance.status} is not a valid active state') if self.instance.execution_environment_id is None: from awx.main.signals import disable_activity_stream diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 64b9623657..932c8abe5b 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -105,6 +105,7 @@ def dispatch_startup(): apply_cluster_membership_policies() cluster_node_heartbeat() reaper.startup_reaping() + reaper.reap_waiting(grace_period=0) m = Metrics() m.reset_values() @@ -116,6 +117,10 @@ def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) + try: + reaper.reap_waiting(this_inst, grace_period=0) + except Exception: + logger.exception('failed to reap waiting jobs for {}'.format(this_inst.hostname)) logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname)) except Exception: logger.exception('Encountered problem with normal shutdown signal.') @@ -533,6 +538,7 @@ def cluster_node_heartbeat(): for other_inst in lost_instances: try: reaper.reap(other_inst) + reaper.reap_waiting(this_inst, grace_period=0) except Exception: logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index a6fcb5b6ce..4b65726b84 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -200,7 +200,8 @@ class TestAutoScaling: # cleanup should scale down to 8 workers with mock.patch('awx.main.dispatch.reaper.reap') as reap: - self.pool.cleanup() + with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap: + self.pool.cleanup() reap.assert_called() assert len(self.pool) == 2 @@ -250,7 +251,8 @@ class TestAutoScaling: # clean up and the dead worker with mock.patch('awx.main.dispatch.reaper.reap') as reap: - self.pool.cleanup() + with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap: + self.pool.cleanup() reap.assert_called() assert len(self.pool) == 1 assert self.pool.workers[0].pid == alive_pid @@ -353,7 +355,7 @@ class TestJobReaper(object): ('waiting', '', '', None, False), # waiting, not assigned to the instance ('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago ('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago - ('waiting', 'awx', '', yesterday, True), # waiting, assigned to the execution_node, stale + ('waiting', 'awx', '', yesterday, False), # waiting, managed by another node, ignore ('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale ], ) @@ -372,6 +374,7 @@ class TestJobReaper(object): # (because .save() overwrites it to _now_) Job.objects.filter(id=j.id).update(modified=modified) reaper.reap(i) + reaper.reap_waiting(i) job = Job.objects.first() if fail: assert job.status == 'failed' diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index 0e85dc389d..ce385cfced 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -4,6 +4,7 @@ import os import tempfile import shutil +from awx.main.tasks.jobs import RunJob from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files from awx.main.models import Instance, Job @@ -61,3 +62,16 @@ def test_folder_cleanup_running_job(mock_job_folder, mock_me): job.save(update_fields=['status']) _cleanup_images_and_files(grace_period=0) assert not os.path.exists(mock_job_folder) # job is finished and no grace period, should delete + + +@pytest.mark.django_db +def test_does_not_run_reaped_job(mocker, mock_me): + job = Job.objects.create(status='failed', job_explanation='This job has been reaped.') + mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run') + try: + RunJob().run(job.id) + except Exception: + pass + job.refresh_from_db() + assert job.status == 'failed' + mock_run.assert_not_called() diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index d91183efef..04fd42ec92 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -500,7 +500,7 @@ class TestGenericRun: with pytest.raises(Exception): task.run(1) - for c in [mock.call(1, status='running', start_args=''), mock.call(1, status='canceled')]: + for c in [mock.call(1, start_args='', status='canceled')]: assert c in task.update_model.call_args_list def test_event_count(self, mock_me):