mirror of
https://github.com/ansible/awx.git
synced 2026-03-23 11:55:04 -02:30
Alternative for reaping lost jobs, in work unit reaper
This commit is contained in:
@@ -108,7 +108,15 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
|||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
from awx.main.utils.handlers import SpecialInventoryHandler
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
||||||
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper
|
from awx.main.utils.receptor import (
|
||||||
|
get_receptor_ctl,
|
||||||
|
worker_info,
|
||||||
|
get_conn_type,
|
||||||
|
get_tls_client,
|
||||||
|
worker_cleanup,
|
||||||
|
administrative_workunit_reaper,
|
||||||
|
RECEPTOR_ACTIVE_STATES,
|
||||||
|
)
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main import analytics
|
from awx.main import analytics
|
||||||
from awx.conf import settings_registry
|
from awx.conf import settings_registry
|
||||||
@@ -647,8 +655,31 @@ def awx_receptor_workunit_reaper():
|
|||||||
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
|
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
|
||||||
for job in jobs_with_unreleased_receptor_units:
|
for job in jobs_with_unreleased_receptor_units:
|
||||||
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
|
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
|
||||||
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
|
try:
|
||||||
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
|
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
|
||||||
|
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
|
||||||
|
except RuntimeError as exc:
|
||||||
|
if 'unknown work unit' not in str(exc):
|
||||||
|
logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}')
|
||||||
|
|
||||||
|
active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES]
|
||||||
|
|
||||||
|
jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude(
|
||||||
|
work_unit_id__in=active_unit_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
for job in jobs_without_active_work_unit:
|
||||||
|
# TODO: skip if events came in recently
|
||||||
|
job.cancel_flag = True
|
||||||
|
job.status = 'error'
|
||||||
|
if job.work_unit_id in receptor_work_list:
|
||||||
|
receptor_status = receptor_work_list[job.work_unit_id]['StateName']
|
||||||
|
logger.warn(f'Canceling {job.log_format} because of inactive work unit, data:\n{receptor_work_list[job.work_unit_id]}')
|
||||||
|
job.job_explanation = f'Canceled by reaper because receptor work unit reported a status of {receptor_status}'
|
||||||
|
else:
|
||||||
|
logger.warn(f'Canceling {job.log_format} because of missing work unit {job.work_unit_id}')
|
||||||
|
job.job_explanation = f'Canceled by reaper because receptor work unit could not be found for this job'
|
||||||
|
job.save(update_fields=['cancel_flag', 'status', 'job_explanation'])
|
||||||
|
|
||||||
administrative_workunit_reaper(receptor_work_list)
|
administrative_workunit_reaper(receptor_work_list)
|
||||||
|
|
||||||
@@ -3066,7 +3097,7 @@ class AWXReceptorJob:
|
|||||||
try:
|
try:
|
||||||
receptor_ctl.simple_command(f"work release {self.unit_id}")
|
receptor_ctl.simple_command(f"work release {self.unit_id}")
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
logger.exception(f"Unable to release work item {self.unit_id}")
|
logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}")
|
||||||
# If an error occured without the job itself failing, it could be a broken instance
|
# If an error occured without the job itself failing, it could be a broken instance
|
||||||
if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)):
|
if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)):
|
||||||
execution_node_health_check.delay(self.task.instance.execution_node)
|
execution_node_health_check.delay(self.task.instance.execution_node)
|
||||||
@@ -3221,24 +3252,16 @@ class AWXReceptorJob:
|
|||||||
|
|
||||||
@cleanup_new_process
|
@cleanup_new_process
|
||||||
def cancel_watcher(self, processor_future):
|
def cancel_watcher(self, processor_future):
|
||||||
receptor_ctl = get_receptor_ctl()
|
|
||||||
while True:
|
while True:
|
||||||
if processor_future.done():
|
if processor_future.done():
|
||||||
return processor_future.result()
|
return processor_future.result()
|
||||||
|
|
||||||
# cancel job if receptor no longer knows about work item
|
|
||||||
try:
|
|
||||||
receptor_ctl.simple_command(f'work status {self.unit_id}')
|
|
||||||
except RuntimeError:
|
|
||||||
self.task.instance.result_traceback = traceback.format_exc()
|
|
||||||
self.task.instance.save(update_fields=['result_traceback'])
|
|
||||||
|
|
||||||
result = namedtuple('result', ['status', 'rc'])
|
|
||||||
return result('error', 1)
|
|
||||||
|
|
||||||
if self.task.cancel_callback():
|
if self.task.cancel_callback():
|
||||||
|
status = self.task.instance.status
|
||||||
|
if status == 'running':
|
||||||
|
status = 'canceled'
|
||||||
result = namedtuple('result', ['status', 'rc'])
|
result = namedtuple('result', ['status', 'rc'])
|
||||||
return result('canceled', 1)
|
return result(status, 1)
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user