mirror of
https://github.com/ansible/awx.git
synced 2026-03-24 20:35:02 -02:30
Revert "Merge pull request #5354 from ansible/jobs_killed_via_receptor_should_get_reaped"
This reverts commit8736858d80, reversing changes made to84e77c9db9.
This commit is contained in:
committed by
Shane McDonald
parent
8e9fc14b0e
commit
ecb84e090c
@@ -108,15 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
|||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
from awx.main.utils.handlers import SpecialInventoryHandler
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
||||||
from awx.main.utils.receptor import (
|
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper
|
||||||
get_receptor_ctl,
|
|
||||||
worker_info,
|
|
||||||
get_conn_type,
|
|
||||||
get_tls_client,
|
|
||||||
worker_cleanup,
|
|
||||||
administrative_workunit_reaper,
|
|
||||||
RECEPTOR_ACTIVE_STATES,
|
|
||||||
)
|
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main import analytics
|
from awx.main import analytics
|
||||||
from awx.conf import settings_registry
|
from awx.conf import settings_registry
|
||||||
@@ -655,37 +647,12 @@ def awx_receptor_workunit_reaper():
|
|||||||
receptor_ctl = get_receptor_ctl()
|
receptor_ctl = get_receptor_ctl()
|
||||||
receptor_work_list = receptor_ctl.simple_command("work list")
|
receptor_work_list = receptor_ctl.simple_command("work list")
|
||||||
|
|
||||||
# Release work units for inactive jobs
|
|
||||||
unit_ids = [id for id in receptor_work_list]
|
unit_ids = [id for id in receptor_work_list]
|
||||||
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
|
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
|
||||||
for job in jobs_with_unreleased_receptor_units:
|
for job in jobs_with_unreleased_receptor_units:
|
||||||
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
|
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
|
||||||
try:
|
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
|
||||||
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
|
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
|
||||||
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
|
|
||||||
except RuntimeError as exc:
|
|
||||||
if 'unknown work unit' not in str(exc):
|
|
||||||
logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}')
|
|
||||||
|
|
||||||
# Cancel jobs missing active work units
|
|
||||||
active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES]
|
|
||||||
active_unit_ids.append('') # exclude jobs that have not yet started a receptor work unit
|
|
||||||
|
|
||||||
jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude(
|
|
||||||
work_unit_id__in=active_unit_ids
|
|
||||||
)
|
|
||||||
|
|
||||||
for job in jobs_without_active_work_unit:
|
|
||||||
job.cancel_flag = True
|
|
||||||
job.status = 'error'
|
|
||||||
if job.work_unit_id in receptor_work_list:
|
|
||||||
receptor_status = receptor_work_list[job.work_unit_id]['StateName']
|
|
||||||
logger.warn(f'Canceling {job.log_format} because of inactive work unit, data:\n{receptor_work_list[job.work_unit_id]}')
|
|
||||||
job.job_explanation = f'Canceled by reaper because receptor work unit reported a status of {receptor_status}'
|
|
||||||
else:
|
|
||||||
logger.warn(f'Canceling {job.log_format} because of missing work unit {job.work_unit_id}')
|
|
||||||
job.job_explanation = f'Canceled by reaper because receptor work unit could not be found for this job'
|
|
||||||
job.save(update_fields=['cancel_flag', 'status', 'job_explanation'])
|
|
||||||
|
|
||||||
administrative_workunit_reaper(receptor_work_list)
|
administrative_workunit_reaper(receptor_work_list)
|
||||||
|
|
||||||
@@ -3104,10 +3071,7 @@ class AWXReceptorJob:
|
|||||||
finally:
|
finally:
|
||||||
# Make sure to always release the work unit if we established it
|
# Make sure to always release the work unit if we established it
|
||||||
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
|
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
|
||||||
try:
|
receptor_ctl.simple_command(f"work release {self.unit_id}")
|
||||||
receptor_ctl.simple_command(f"work release {self.unit_id}")
|
|
||||||
except RuntimeError:
|
|
||||||
logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}")
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sign_work(self):
|
def sign_work(self):
|
||||||
@@ -3164,10 +3128,10 @@ class AWXReceptorJob:
|
|||||||
unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}')
|
unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}')
|
||||||
detail = unit_status.get('Detail', None)
|
detail = unit_status.get('Detail', None)
|
||||||
state_name = unit_status.get('StateName', None)
|
state_name = unit_status.get('StateName', None)
|
||||||
except RuntimeError:
|
except RuntimeError as e:
|
||||||
detail = ''
|
detail = ''
|
||||||
state_name = ''
|
state_name = ''
|
||||||
logger.exception(f'Unable to retrieve work status for {self.unit_id}')
|
logger.exception(e)
|
||||||
|
|
||||||
if 'exceeded quota' in detail:
|
if 'exceeded quota' in detail:
|
||||||
logger.warn(detail)
|
logger.warn(detail)
|
||||||
@@ -3175,26 +3139,20 @@ class AWXReceptorJob:
|
|||||||
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
||||||
self.task.update_model(self.task.instance.pk, status='pending')
|
self.task.update_model(self.task.instance.pk, status='pending')
|
||||||
return
|
return
|
||||||
|
|
||||||
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
||||||
# is saved via the status_handler passed in to the processor.
|
# is saved via the status_handler passed in to the processor.
|
||||||
if state_name == 'Succeeded':
|
if state_name == 'Succeeded':
|
||||||
return res
|
return res
|
||||||
|
|
||||||
if not (self.task.instance.result_traceback or self.task.instance.job_explanation):
|
if not self.task.instance.result_traceback:
|
||||||
try:
|
try:
|
||||||
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
||||||
lines = resultsock.readlines()
|
lines = resultsock.readlines()
|
||||||
self.task.instance.result_traceback = b"".join(lines).decode()
|
self.task.instance.result_traceback = b"".join(lines).decode()
|
||||||
self.task.instance.save(update_fields=['result_traceback'])
|
self.task.instance.save(update_fields=['result_traceback'])
|
||||||
except Exception:
|
except Exception:
|
||||||
resultsock.shutdown(socket.SHUT_RDWR)
|
|
||||||
resultfile.close()
|
|
||||||
raise RuntimeError(detail)
|
raise RuntimeError(detail)
|
||||||
|
|
||||||
resultsock.shutdown(socket.SHUT_RDWR)
|
|
||||||
resultfile.close()
|
|
||||||
|
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@@ -3265,11 +3223,8 @@ class AWXReceptorJob:
|
|||||||
return processor_future.result()
|
return processor_future.result()
|
||||||
|
|
||||||
if self.task.cancel_callback():
|
if self.task.cancel_callback():
|
||||||
status = self.task.instance.status
|
|
||||||
if status == 'running':
|
|
||||||
status = 'canceled'
|
|
||||||
result = namedtuple('result', ['status', 'rc'])
|
result = namedtuple('result', ['status', 'rc'])
|
||||||
return result(status, 1)
|
return result('canceled', 1)
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|||||||
@@ -1990,51 +1990,3 @@ def test_project_update_no_ee():
|
|||||||
task.build_env(job, {})
|
task.build_env(job, {})
|
||||||
|
|
||||||
assert 'The project could not sync because there is no Execution Environment' in str(e.value)
|
assert 'The project could not sync because there is no Execution Environment' in str(e.value)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('awx.main.tasks.administrative_workunit_reaper')
|
|
||||||
@mock.patch('awx.main.tasks.UnifiedJob.objects.filter')
|
|
||||||
@mock.patch('awx.main.tasks.get_receptor_ctl')
|
|
||||||
def test_awx_receptor_workunit_reaper_cancels_work_units_missing_active_jobs(
|
|
||||||
mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker
|
|
||||||
):
|
|
||||||
mock_receptor_ctl = mocker.MagicMock()
|
|
||||||
mock_receptor_ctl.simple_command.return_value = {
|
|
||||||
'4PiSBdGz': {'Detail': 'Running: PID: 134', 'ExtraData': None, 'State': 1, 'StateName': 'Running', 'StdoutSize': 106, 'WorkType': 'ansible-runner'}
|
|
||||||
}
|
|
||||||
mock_get_receptor_ctl.return_value = mock_receptor_ctl
|
|
||||||
|
|
||||||
filtered_objects = mocker.MagicMock()
|
|
||||||
mock_uj_objects.return_value = filtered_objects
|
|
||||||
|
|
||||||
mock_finished_job = mocker.MagicMock(spec=UnifiedJob)
|
|
||||||
filtered_objects.exclude.side_effect = [[mock_finished_job], []]
|
|
||||||
|
|
||||||
tasks.awx_receptor_workunit_reaper()
|
|
||||||
assert mock_receptor_ctl.simple_command.called_with('work cancel 4PiSBdGz')
|
|
||||||
assert mock_receptor_ctl.simple_command.called_with('work release 4PiSBdGz')
|
|
||||||
|
|
||||||
assert mock_administrative_workunit_reaper.called
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('awx.main.tasks.administrative_workunit_reaper')
|
|
||||||
@mock.patch('awx.main.tasks.UnifiedJob.objects.filter')
|
|
||||||
@mock.patch('awx.main.tasks.get_receptor_ctl')
|
|
||||||
def test_awx_receptor_workunit_reaper_reaps_jobs_missing_work_units(mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker):
|
|
||||||
mock_receptor_ctl = mocker.MagicMock()
|
|
||||||
mock_receptor_ctl.simple_command.return_value = {}
|
|
||||||
mock_get_receptor_ctl.return_value = mock_receptor_ctl
|
|
||||||
|
|
||||||
filtered_objects = mocker.MagicMock()
|
|
||||||
mock_uj_objects.return_value = filtered_objects
|
|
||||||
|
|
||||||
mock_job = mocker.MagicMock(spec=UnifiedJob)
|
|
||||||
filtered_objects.exclude.side_effect = [[], [mock_job]]
|
|
||||||
|
|
||||||
tasks.awx_receptor_workunit_reaper()
|
|
||||||
assert mock_job.cancel_flag == True
|
|
||||||
assert mock_job.status == 'error'
|
|
||||||
assert mock_job.job_explanation == 'Canceled by reaper because receptor work unit could not be found for this job'
|
|
||||||
assert mock_job.save.called_once_with(update_fields=['cancel_flag', 'status', 'job_explanation'])
|
|
||||||
|
|
||||||
assert mock_administrative_workunit_reaper.called
|
|
||||||
|
|||||||
Reference in New Issue
Block a user