Fix race with heartbeat and reaper logic

This commit is contained in:
Gabe Muniz 2023-03-16 19:16:51 -04:00
parent 3e6e0463b9
commit 6461ecc762
3 changed files with 17 additions and 10 deletions

View File

@ -70,7 +70,7 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per
reap_job(j, status, job_explanation=job_explanation)
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None):
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None, ref_time=None):
"""
Reap all jobs in running for this instance.
"""
@ -80,7 +80,7 @@ def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=No
hostname = instance.hostname
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
Q(status='running', modified__lte=ref_time) & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)

View File

@ -581,7 +581,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
active_task_ids = []
for task_list in worker_tasks.values():
active_task_ids.extend(task_list)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))

View File

@ -337,6 +337,8 @@ class TestTaskPublisher:
yesterday = tz_now() - datetime.timedelta(days=1)
minute = tz_now() - datetime.timedelta(seconds=120)
now = tz_now()
@pytest.mark.django_db
@ -368,7 +370,7 @@ class TestJobReaper(object):
# we have to edit the modification time _without_ calling save()
# (because .save() overwrites it to _now_)
Job.objects.filter(id=j.id).update(modified=modified)
reaper.reap(i)
reaper.reap(i, ref_time=now)
reaper.reap_waiting(i)
job = Job.objects.first()
if fail:
@ -379,13 +381,15 @@ class TestJobReaper(object):
assert job.status == status
@pytest.mark.parametrize(
'excluded_uuids, fail',
'excluded_uuids, fail, modified',
[
(['abc123'], False),
([], True),
(['abc123'], False, None),
([], False, None),
([], True, minute),
],
)
def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail):
def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail, modified):
"""Modified Test to account for ref_time in reap()"""
i = Instance(hostname='awx')
i.save()
j = Job(
@ -396,10 +400,13 @@ class TestJobReaper(object):
celery_task_id='abc123',
)
j.save()
if modified:
Job.objects.filter(id=j.id).update(modified=modified)
# if the UUID is excluded, don't reap it
reaper.reap(i, excluded_uuids=excluded_uuids)
reaper.reap(i, excluded_uuids=excluded_uuids, ref_time=now)
job = Job.objects.first()
if fail:
assert job.status == 'failed'
assert 'marked as failed' in job.job_explanation
@ -412,6 +419,6 @@ class TestJobReaper(object):
i.save()
j = WorkflowJob(status='running', execution_node='awx')
j.save()
reaper.reap(i)
reaper.reap(i, ref_time=now)
assert WorkflowJob.objects.first().status == 'running'