Compare commits

...

2 Commits

Author SHA1 Message Date
Gabe Muniz
3f83647600 Fix tests to fail when over drift over heartbeat time 2023-03-17 00:24:25 -04:00
Gabe Muniz
6461ecc762 Fix race with heartbeat and reaper logic 2023-03-16 19:16:51 -04:00
3 changed files with 19 additions and 12 deletions

View File

@@ -70,7 +70,7 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per
reap_job(j, status, job_explanation=job_explanation) reap_job(j, status, job_explanation=job_explanation)
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None, ref_time=None):
""" """
Reap all jobs in running for this instance. Reap all jobs in running for this instance.
""" """
@@ -80,7 +80,7 @@ def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=No
hostname = instance.hostname hostname = instance.hostname
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter( jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) Q(status='running', modified__lte=ref_time) & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
) )
if excluded_uuids: if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids) jobs = jobs.exclude(celery_task_id__in=excluded_uuids)

View File

@@ -581,7 +581,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
active_task_ids = [] active_task_ids = []
for task_list in worker_tasks.values(): for task_list in worker_tasks.values():
active_task_ids.extend(task_list) active_task_ids.extend(task_list)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids) reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
if max(len(task_list) for task_list in worker_tasks.values()) <= 1: if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))

View File

@@ -337,6 +337,8 @@ class TestTaskPublisher:
yesterday = tz_now() - datetime.timedelta(days=1) yesterday = tz_now() - datetime.timedelta(days=1)
minute = tz_now() - datetime.timedelta(seconds=120)
now = tz_now()
@pytest.mark.django_db @pytest.mark.django_db
@@ -345,8 +347,8 @@ class TestJobReaper(object):
'status, execution_node, controller_node, modified, fail', 'status, execution_node, controller_node, modified, fail',
[ [
('running', '', '', None, False), # running, not assigned to the instance ('running', '', '', None, False), # running, not assigned to the instance
('running', 'awx', '', None, True), # running, has the instance as its execution_node ('running', 'awx', '', minute, True), # running, has the instance as its execution_node
('running', '', 'awx', None, True), # running, has the instance as its controller_node ('running', '', 'awx', minute, True), # running, has the instance as its controller_node
('waiting', '', '', None, False), # waiting, not assigned to the instance ('waiting', '', '', None, False), # waiting, not assigned to the instance
('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago ('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago
('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago ('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago
@@ -368,7 +370,7 @@ class TestJobReaper(object):
# we have to edit the modification time _without_ calling save() # we have to edit the modification time _without_ calling save()
# (because .save() overwrites it to _now_) # (because .save() overwrites it to _now_)
Job.objects.filter(id=j.id).update(modified=modified) Job.objects.filter(id=j.id).update(modified=modified)
reaper.reap(i) reaper.reap(i, ref_time=now)
reaper.reap_waiting(i) reaper.reap_waiting(i)
job = Job.objects.first() job = Job.objects.first()
if fail: if fail:
@@ -379,13 +381,15 @@ class TestJobReaper(object):
assert job.status == status assert job.status == status
@pytest.mark.parametrize( @pytest.mark.parametrize(
'excluded_uuids, fail', 'excluded_uuids, fail, modified',
[ [
(['abc123'], False), (['abc123'], False, None),
([], True), ([], False, None),
([], True, minute),
], ],
) )
def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail): def test_do_not_reap_excluded_uuids(self, excluded_uuids, fail, modified):
"""Modified Test to account for ref_time in reap()"""
i = Instance(hostname='awx') i = Instance(hostname='awx')
i.save() i.save()
j = Job( j = Job(
@@ -396,10 +400,13 @@ class TestJobReaper(object):
celery_task_id='abc123', celery_task_id='abc123',
) )
j.save() j.save()
if modified:
Job.objects.filter(id=j.id).update(modified=modified)
# if the UUID is excluded, don't reap it # if the UUID is excluded, don't reap it
reaper.reap(i, excluded_uuids=excluded_uuids) reaper.reap(i, excluded_uuids=excluded_uuids, ref_time=now)
job = Job.objects.first() job = Job.objects.first()
if fail: if fail:
assert job.status == 'failed' assert job.status == 'failed'
assert 'marked as failed' in job.job_explanation assert 'marked as failed' in job.job_explanation
@@ -412,6 +419,6 @@ class TestJobReaper(object):
i.save() i.save()
j = WorkflowJob(status='running', execution_node='awx') j = WorkflowJob(status='running', execution_node='awx')
j.save() j.save()
reaper.reap(i) reaper.reap(i, ref_time=now)
assert WorkflowJob.objects.first().status == 'running' assert WorkflowJob.objects.first().status == 'running'