Merge pull request #12442 from AlanCoding/waiting_reaper

Fix false reaper false-positives of waiting jobs that are waiting for worker
This commit is contained in:
Alan Rominger 2022-08-17 11:20:05 -04:00 committed by GitHub
commit cfc1255812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 16 deletions

View File

@ -432,6 +432,10 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers)))
self.write(idx, m)
# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting()
# if the database says a job is running on this node, but it's *not*,
# then reap it
running_uuids = []

View File

@ -24,13 +24,11 @@ def startup_reaping():
job_ids = []
for j in jobs:
job_ids.append(j.id)
j.status = 'failed'
j.start_args = ''
j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.'
j.save(update_fields=['status', 'start_args', 'job_explanation'])
if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed')
j.websocket_emit_status('failed')
reap_job(
j,
'failed',
job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.',
)
if job_ids:
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')
@ -54,9 +52,9 @@ def reap_job(j, status):
logger.error('{} is no longer running; reaping'.format(j.log_format))
def reap(instance=None, status='failed', excluded_uuids=[]):
def reap_waiting(instance=None, status='failed', grace_period=60):
"""
Reap all jobs in waiting|running for this instance.
Reap all jobs in waiting for this instance.
"""
me = instance
if me is None:
@ -66,11 +64,25 @@ def reap(instance=None, status='failed', excluded_uuids=[]):
logger.warning(f'Local instance is not registered, not running reaper: {e}')
return
now = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
for j in jobs:
reap_job(j, status)
def reap(instance=None, status='failed', excluded_uuids=[]):
"""
Reap all jobs in running for this instance.
"""
me = instance
if me is None:
try:
me = Instance.objects.me()
except RuntimeError as e:
logger.warning(f'Local instance is not registered, not running reaper: {e}')
return
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter(
(Q(status='running') | Q(status='waiting', modified__lte=now - timedelta(seconds=60)))
& (Q(execution_node=me.hostname) | Q(controller_node=me.hostname))
& ~Q(polymorphic_ctype_id=workflow_ctype_id)
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
).exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)

View File

@ -36,6 +36,7 @@ from awx.main.constants import (
JOB_FOLDER_PREFIX,
MAX_ISOLATED_PATH_COLON_DELIMITER,
CONTAINER_VOLUMES_MOUNT_TYPES,
ACTIVE_STATES,
)
from awx.main.models import (
Instance,
@ -451,6 +452,11 @@ class BaseTask(object):
Run the job/task and capture its output.
"""
self.instance = self.model.objects.get(pk=pk)
if self.instance.status != 'canceled' and self.instance.cancel_flag:
self.instance = self.update_model(self.instance.pk, start_args='', status='canceled')
if self.instance.status not in ACTIVE_STATES:
# Prevent starting the job if it has been reaped or handled by another process.
raise RuntimeError(f'Not starting {self.instance.status} task pk={pk} because {self.instance.status} is not a valid active state')
if self.instance.execution_environment_id is None:
from awx.main.signals import disable_activity_stream

View File

@ -105,6 +105,7 @@ def dispatch_startup():
apply_cluster_membership_policies()
cluster_node_heartbeat()
reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
m = Metrics()
m.reset_values()
@ -116,6 +117,10 @@ def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
try:
reaper.reap_waiting(this_inst, grace_period=0)
except Exception:
logger.exception('failed to reap waiting jobs for {}'.format(this_inst.hostname))
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
except Exception:
logger.exception('Encountered problem with normal shutdown signal.')
@ -533,6 +538,7 @@ def cluster_node_heartbeat():
for other_inst in lost_instances:
try:
reaper.reap(other_inst)
reaper.reap_waiting(this_inst, grace_period=0)
except Exception:
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try:

View File

@ -200,7 +200,8 @@ class TestAutoScaling:
# cleanup should scale down to 8 workers
with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup()
with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap:
self.pool.cleanup()
reap.assert_called()
assert len(self.pool) == 2
@ -250,7 +251,8 @@ class TestAutoScaling:
# clean up and the dead worker
with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup()
with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap:
self.pool.cleanup()
reap.assert_called()
assert len(self.pool) == 1
assert self.pool.workers[0].pid == alive_pid
@ -353,7 +355,7 @@ class TestJobReaper(object):
('waiting', '', '', None, False), # waiting, not assigned to the instance
('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago
('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago
('waiting', 'awx', '', yesterday, True), # waiting, assigned to the execution_node, stale
('waiting', 'awx', '', yesterday, False), # waiting, managed by another node, ignore
('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale
],
)
@ -372,6 +374,7 @@ class TestJobReaper(object):
# (because .save() overwrites it to _now_)
Job.objects.filter(id=j.id).update(modified=modified)
reaper.reap(i)
reaper.reap_waiting(i)
job = Job.objects.first()
if fail:
assert job.status == 'failed'

View File

@ -4,6 +4,7 @@ import os
import tempfile
import shutil
from awx.main.tasks.jobs import RunJob
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files
from awx.main.models import Instance, Job
@ -61,3 +62,16 @@ def test_folder_cleanup_running_job(mock_job_folder, mock_me):
job.save(update_fields=['status'])
_cleanup_images_and_files(grace_period=0)
assert not os.path.exists(mock_job_folder) # job is finished and no grace period, should delete
@pytest.mark.django_db
def test_does_not_run_reaped_job(mocker, mock_me):
job = Job.objects.create(status='failed', job_explanation='This job has been reaped.')
mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run')
try:
RunJob().run(job.id)
except Exception:
pass
job.refresh_from_db()
assert job.status == 'failed'
mock_run.assert_not_called()

View File

@ -500,7 +500,7 @@ class TestGenericRun:
with pytest.raises(Exception):
task.run(1)
for c in [mock.call(1, status='running', start_args=''), mock.call(1, status='canceled')]:
for c in [mock.call(1, start_args='', status='canceled')]:
assert c in task.update_model.call_args_list
def test_event_count(self, mock_me):