mirror of
https://github.com/ansible/awx.git
synced 2026-06-25 00:18:07 -02:30
[AAP-74497] Reset orphaned waiting jobs when controller node is deprovisioned (#16467)
Reset orphaned waiting jobs when controller node is deprovisioned Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -688,6 +688,17 @@ class TaskManager(TaskBase):
|
|||||||
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
|
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
|
||||||
reap_job(j, 'failed')
|
reap_job(j, 'failed')
|
||||||
|
|
||||||
|
# Reset waiting jobs whose controller_node was deprovisioned (e.g. K8s pod replaced).
|
||||||
|
# These jobs will never be picked up because no live node is listening for them.
|
||||||
|
registered_control_nodes = Instance.objects.filter(node_type__in=('control', 'hybrid')).values_list('hostname', flat=True)
|
||||||
|
orphaned_waiting = UnifiedJob.objects.filter(status='waiting').exclude(controller_node__in=registered_control_nodes)
|
||||||
|
for j in orphaned_waiting:
|
||||||
|
logger.warning(f'{j.controller_node} is not a registered instance; resetting {j.log_format} to pending')
|
||||||
|
j.status = 'pending'
|
||||||
|
j.controller_node = ''
|
||||||
|
j.execution_node = ''
|
||||||
|
j.save(update_fields=['status', 'controller_node', 'execution_node'])
|
||||||
|
|
||||||
def process_tasks(self):
|
def process_tasks(self):
|
||||||
# maintain a list of jobs that went to an early failure state,
|
# maintain a list of jobs that went to an early failure state,
|
||||||
# meaning the dispatcher never got these jobs,
|
# meaning the dispatcher never got these jobs,
|
||||||
|
|||||||
@@ -160,3 +160,38 @@ class TestJobReaper(object):
|
|||||||
assert job.started > ref_time
|
assert job.started > ref_time
|
||||||
assert job.status == 'running'
|
assert job.status == 'running'
|
||||||
assert job.job_explanation == ''
|
assert job.job_explanation == ''
|
||||||
|
|
||||||
|
def test_waiting_job_reset_when_controller_node_deprovisioned(self):
|
||||||
|
"""When a controller pod is replaced (e.g. K8s rollout), waiting jobs
|
||||||
|
assigned to the now-gone controller_node should be reset to pending
|
||||||
|
by the task manager so they can be re-dispatched."""
|
||||||
|
from awx.main.scheduler import TaskManager
|
||||||
|
|
||||||
|
live_inst = Instance(hostname='awx-task-live', node_type='control')
|
||||||
|
live_inst.save()
|
||||||
|
# No instance record for 'awx-task-dead' — it was already deprovisioned
|
||||||
|
job = Job.objects.create(status='waiting', controller_node='awx-task-dead', execution_node='')
|
||||||
|
|
||||||
|
tm = TaskManager()
|
||||||
|
tm.reap_jobs_from_orphaned_instances()
|
||||||
|
|
||||||
|
job.refresh_from_db()
|
||||||
|
assert job.status == 'pending'
|
||||||
|
assert job.controller_node == ''
|
||||||
|
assert job.execution_node == ''
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('node_type', ['control', 'hybrid'])
|
||||||
|
def test_waiting_job_not_reset_when_controller_node_alive(self, node_type):
|
||||||
|
"""Waiting jobs on a live control or hybrid node should not be touched."""
|
||||||
|
from awx.main.scheduler import TaskManager
|
||||||
|
|
||||||
|
live_inst = Instance(hostname='awx-task-live', node_type=node_type)
|
||||||
|
live_inst.save()
|
||||||
|
job = Job.objects.create(status='waiting', controller_node='awx-task-live', execution_node='')
|
||||||
|
|
||||||
|
tm = TaskManager()
|
||||||
|
tm.reap_jobs_from_orphaned_instances()
|
||||||
|
|
||||||
|
job.refresh_from_db()
|
||||||
|
assert job.status == 'waiting'
|
||||||
|
assert job.controller_node == 'awx-task-live'
|
||||||
|
|||||||
Reference in New Issue
Block a user