Fix race with heartbeat and reaper logic (#13713)

* Fix race with heartbeat and reaper logic

* Fix tests to fail when over drift over heartbeat time

* replaced modified with started time for reap() code and added test

* fixed logic bug and cleaned up tests

* Added comments to tests to call out reasoning
This commit is contained in:
Gabriel Muniz
2023-03-17 14:24:31 -04:00
committed by GitHub
parent 3e6e0463b9
commit e15f4de0dd
3 changed files with 36 additions and 10 deletions

View File

@@ -70,7 +70,7 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per
reap_job(j, status, job_explanation=job_explanation)
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None):
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None, ref_time=None):
"""
Reap all jobs in running for this instance.
"""
@@ -79,9 +79,11 @@ def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=No
else:
hostname = instance.hostname
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
)
base_Q = Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
if ref_time:
jobs = UnifiedJob.objects.filter(base_Q & Q(started__lte=ref_time))
else:
jobs = UnifiedJob.objects.filter(base_Q)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs: