diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 1b3c497612..3b66dcba3f 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -70,7 +70,7 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per reap_job(j, status, job_explanation=job_explanation) -def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): +def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None, ref_time=None): """ Reap all jobs in running for this instance. """ @@ -79,9 +79,11 @@ def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=No else: hostname = instance.hostname workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id - jobs = UnifiedJob.objects.filter( - Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ) + base_Q = Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) + if ref_time: + jobs = UnifiedJob.objects.filter(base_Q & Q(started__lte=ref_time)) + else: + jobs = UnifiedJob.objects.filter(base_Q) if excluded_uuids: jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 482c168af2..ddff6b4ec4 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -581,7 +581,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): active_task_ids = [] for task_list in worker_tasks.values(): active_task_ids.extend(task_list) - reaper.reap(instance=this_inst, excluded_uuids=active_task_ids) + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) if max(len(task_list) for task_list in worker_tasks.values()) <= 1: reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index c93b6a7d6a..6a7c8d624d 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -337,6 +337,8 @@ class TestTaskPublisher: yesterday = tz_now() - datetime.timedelta(days=1) +minute = tz_now() - datetime.timedelta(seconds=120) +now = tz_now() @pytest.mark.django_db @@ -379,13 +381,15 @@ class TestJobReaper(object): assert job.status == status @pytest.mark.parametrize( - 'excluded_uuids, fail', + 'excluded_uuids, fail, started', [ - (['abc123'], False), - ([], True), + (['abc123'], False, None), + ([], False, None), + ([], True, minute), ], ) - def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail): + def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail, started): + """Modified Test to account for ref_time in reap()""" i = Instance(hostname='awx') i.save() j = Job( @@ -396,10 +400,13 @@ class TestJobReaper(object): celery_task_id='abc123', ) j.save() + if started: + Job.objects.filter(id=j.id).update(started=started) # if the UUID is excluded, don't reap it - reaper.reap(i, excluded_uuids=excluded_uuids) + reaper.reap(i, excluded_uuids=excluded_uuids, ref_time=now) job = Job.objects.first() + if fail: assert job.status == 'failed' assert 'marked as failed' in job.job_explanation @@ -415,3 +422,20 @@ class TestJobReaper(object): reaper.reap(i) assert WorkflowJob.objects.first().status == 'running' + + def test_should_not_reap_new(self): + """ + This test is designed specifically to ensure that jobs that are launched after the dispatcher has provided a list of UUIDs aren't reaped. + It is very racy and this test is designed with that in mind + """ + i = Instance(hostname='awx') + # ref_time is set to 10 seconds in the past to mimic someone launching a job in the heartbeat window. + ref_time = tz_now() - datetime.timedelta(seconds=10) + # creating job at current time + job = Job.objects.create(status='running', controller_node=i.hostname) + reaper.reap(i, ref_time=ref_time) + # explictly refreshing from db to ensure up to date cache + job.refresh_from_db() + assert job.started > ref_time + assert job.status == 'running' + assert job.job_explanation == ''