Split reaper for running and waiting jobs

Avoid running jobs that have already been reapted

Co-authored-by: Elijah DeLee <kdelee@redhat.com>

Remove unnecessary extra actions

Fix waiting jobs in other cases of reaping
This commit is contained in:
Alan Rominger
2022-06-28 14:58:41 -04:00
parent 64157f7207
commit 278db2cdde
7 changed files with 61 additions and 16 deletions

View File

@@ -432,6 +432,10 @@ class AutoscalePool(WorkerPool):
idx = random.choice(range(len(self.workers))) idx = random.choice(range(len(self.workers)))
self.write(idx, m) self.write(idx, m)
# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting()
# if the database says a job is running on this node, but it's *not*, # if the database says a job is running on this node, but it's *not*,
# then reap it # then reap it
running_uuids = [] running_uuids = []

View File

@@ -24,13 +24,11 @@ def startup_reaping():
job_ids = [] job_ids = []
for j in jobs: for j in jobs:
job_ids.append(j.id) job_ids.append(j.id)
j.status = 'failed' reap_job(
j.start_args = '' j,
j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.' 'failed',
j.save(update_fields=['status', 'start_args', 'job_explanation']) job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.',
if hasattr(j, 'send_notification_templates'): )
j.send_notification_templates('failed')
j.websocket_emit_status('failed')
if job_ids: if job_ids:
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')
@@ -54,9 +52,9 @@ def reap_job(j, status):
logger.error('{} is no longer running; reaping'.format(j.log_format)) logger.error('{} is no longer running; reaping'.format(j.log_format))
def reap(instance=None, status='failed', excluded_uuids=[]): def reap_waiting(instance=None, status='failed', grace_period=60):
""" """
Reap all jobs in waiting|running for this instance. Reap all jobs in waiting for this instance.
""" """
me = instance me = instance
if me is None: if me is None:
@@ -66,11 +64,25 @@ def reap(instance=None, status='failed', excluded_uuids=[]):
logger.warning(f'Local instance is not registered, not running reaper: {e}') logger.warning(f'Local instance is not registered, not running reaper: {e}')
return return
now = tz_now() now = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
for j in jobs:
reap_job(j, status)
def reap(instance=None, status='failed', excluded_uuids=[]):
"""
Reap all jobs in running for this instance.
"""
me = instance
if me is None:
try:
me = Instance.objects.me()
except RuntimeError as e:
logger.warning(f'Local instance is not registered, not running reaper: {e}')
return
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter( jobs = UnifiedJob.objects.filter(
(Q(status='running') | Q(status='waiting', modified__lte=now - timedelta(seconds=60))) Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
& (Q(execution_node=me.hostname) | Q(controller_node=me.hostname))
& ~Q(polymorphic_ctype_id=workflow_ctype_id)
).exclude(celery_task_id__in=excluded_uuids) ).exclude(celery_task_id__in=excluded_uuids)
for j in jobs: for j in jobs:
reap_job(j, status) reap_job(j, status)

View File

@@ -36,6 +36,7 @@ from awx.main.constants import (
JOB_FOLDER_PREFIX, JOB_FOLDER_PREFIX,
MAX_ISOLATED_PATH_COLON_DELIMITER, MAX_ISOLATED_PATH_COLON_DELIMITER,
CONTAINER_VOLUMES_MOUNT_TYPES, CONTAINER_VOLUMES_MOUNT_TYPES,
ACTIVE_STATES,
) )
from awx.main.models import ( from awx.main.models import (
Instance, Instance,
@@ -451,6 +452,11 @@ class BaseTask(object):
Run the job/task and capture its output. Run the job/task and capture its output.
""" """
self.instance = self.model.objects.get(pk=pk) self.instance = self.model.objects.get(pk=pk)
if self.instance.status != 'canceled' and self.instance.cancel_flag:
self.instance = self.update_model(self.instance.pk, start_args='', status='canceled')
if self.instance.status not in ACTIVE_STATES:
# Prevent starting the job if it has been reaped or handled by another process.
raise RuntimeError(f'Not starting {self.instance.status} task pk={pk} because {self.instance.status} is not a valid active state')
if self.instance.execution_environment_id is None: if self.instance.execution_environment_id is None:
from awx.main.signals import disable_activity_stream from awx.main.signals import disable_activity_stream

View File

@@ -105,6 +105,7 @@ def dispatch_startup():
apply_cluster_membership_policies() apply_cluster_membership_policies()
cluster_node_heartbeat() cluster_node_heartbeat()
reaper.startup_reaping() reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
m = Metrics() m = Metrics()
m.reset_values() m.reset_values()
@@ -116,6 +117,10 @@ def inform_cluster_of_shutdown():
try: try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
try:
reaper.reap_waiting(this_inst, grace_period=0)
except Exception:
logger.exception('failed to reap waiting jobs for {}'.format(this_inst.hostname))
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname)) logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
except Exception: except Exception:
logger.exception('Encountered problem with normal shutdown signal.') logger.exception('Encountered problem with normal shutdown signal.')
@@ -533,6 +538,7 @@ def cluster_node_heartbeat():
for other_inst in lost_instances: for other_inst in lost_instances:
try: try:
reaper.reap(other_inst) reaper.reap(other_inst)
reaper.reap_waiting(this_inst, grace_period=0)
except Exception: except Exception:
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try: try:

View File

@@ -200,7 +200,8 @@ class TestAutoScaling:
# cleanup should scale down to 8 workers # cleanup should scale down to 8 workers
with mock.patch('awx.main.dispatch.reaper.reap') as reap: with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup() with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap:
self.pool.cleanup()
reap.assert_called() reap.assert_called()
assert len(self.pool) == 2 assert len(self.pool) == 2
@@ -250,7 +251,8 @@ class TestAutoScaling:
# clean up and the dead worker # clean up and the dead worker
with mock.patch('awx.main.dispatch.reaper.reap') as reap: with mock.patch('awx.main.dispatch.reaper.reap') as reap:
self.pool.cleanup() with mock.patch('awx.main.dispatch.reaper.reap_waiting') as reap:
self.pool.cleanup()
reap.assert_called() reap.assert_called()
assert len(self.pool) == 1 assert len(self.pool) == 1
assert self.pool.workers[0].pid == alive_pid assert self.pool.workers[0].pid == alive_pid
@@ -353,7 +355,7 @@ class TestJobReaper(object):
('waiting', '', '', None, False), # waiting, not assigned to the instance ('waiting', '', '', None, False), # waiting, not assigned to the instance
('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago ('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago
('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago ('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago
('waiting', 'awx', '', yesterday, True), # waiting, assigned to the execution_node, stale ('waiting', 'awx', '', yesterday, False), # waiting, managed by another node, ignore
('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale ('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale
], ],
) )
@@ -372,6 +374,7 @@ class TestJobReaper(object):
# (because .save() overwrites it to _now_) # (because .save() overwrites it to _now_)
Job.objects.filter(id=j.id).update(modified=modified) Job.objects.filter(id=j.id).update(modified=modified)
reaper.reap(i) reaper.reap(i)
reaper.reap_waiting(i)
job = Job.objects.first() job = Job.objects.first()
if fail: if fail:
assert job.status == 'failed' assert job.status == 'failed'

View File

@@ -4,6 +4,7 @@ import os
import tempfile import tempfile
import shutil import shutil
from awx.main.tasks.jobs import RunJob
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files
from awx.main.models import Instance, Job from awx.main.models import Instance, Job
@@ -61,3 +62,16 @@ def test_folder_cleanup_running_job(mock_job_folder, mock_me):
job.save(update_fields=['status']) job.save(update_fields=['status'])
_cleanup_images_and_files(grace_period=0) _cleanup_images_and_files(grace_period=0)
assert not os.path.exists(mock_job_folder) # job is finished and no grace period, should delete assert not os.path.exists(mock_job_folder) # job is finished and no grace period, should delete
@pytest.mark.django_db
def test_does_not_run_reaped_job(mocker, mock_me):
job = Job.objects.create(status='failed', job_explanation='This job has been reaped.')
mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run')
try:
RunJob().run(job.id)
except Exception:
pass
job.refresh_from_db()
assert job.status == 'failed'
mock_run.assert_not_called()

View File

@@ -500,7 +500,7 @@ class TestGenericRun:
with pytest.raises(Exception): with pytest.raises(Exception):
task.run(1) task.run(1)
for c in [mock.call(1, status='running', start_args=''), mock.call(1, status='canceled')]: for c in [mock.call(1, start_args='', status='canceled')]:
assert c in task.update_model.call_args_list assert c in task.update_model.call_args_list
def test_event_count(self, mock_me): def test_event_count(self, mock_me):