Merge pull request #382 from AlanCoding/isolated_reaper

reap isolated jobs
This commit is contained in:
Alan Rominger
2017-09-08 12:41:03 -07:00
committed by GitHub
2 changed files with 47 additions and 23 deletions

View File

@@ -421,26 +421,40 @@ class TaskManager():
if not found_acceptable_queue: if not found_acceptable_queue:
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time): def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
isolated=False):
for task in node_jobs: for task in node_jobs:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
if isinstance(task, WorkflowJob): if isinstance(task, WorkflowJob):
continue continue
if task.modified > celery_task_start_time: if task.modified > celery_task_start_time:
continue continue
task.status = 'failed' new_status = 'failed'
task.job_explanation += ' '.join(( if isolated:
'Task was marked as running in Tower but was not present in', new_status = 'error'
'Celery, so it has been marked as failed.', task.status = new_status
)) if isolated:
# TODO: cancel and reap artifacts of lost jobs from heartbeat
task.job_explanation += ' '.join((
'Task was marked as running in Tower but its ',
'controller management daemon was not present in',
'Celery, so it has been marked as failed.',
'Task may still be running, but contactability is unknown.'
))
else:
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
try: try:
task.save(update_fields=['status', 'job_explanation']) task.save(update_fields=['status', 'job_explanation'])
except DatabaseError: except DatabaseError:
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
continue continue
awx_tasks._send_notification_templates(task, 'failed') awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed') task.websocket_emit_status(new_status)
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format)) logger.error("{}Task {} has no record in celery. Marking as failed".format(
'Isolated ' if isolated else '', task.log_format))
def cleanup_inconsistent_celery_tasks(self): def cleanup_inconsistent_celery_tasks(self):
''' '''
@@ -471,26 +485,36 @@ class TaskManager():
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time) self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
for node, node_jobs in running_tasks.iteritems(): for node, node_jobs in running_tasks.iteritems():
isolated = False
if node in active_queues: if node in active_queues:
active_tasks = active_queues[node] active_tasks = active_queues[node]
else: else:
''' '''
Node task list not found in celery. If tower thinks the node is down Node task list not found in celery. We may branch into cases:
then fail all the jobs on the node. - instance is unknown to tower, system is improperly configured
- instance is reported as down, then fail all jobs on the node
- instance is an isolated node, then check running tasks
among all allowed controller nodes for management process
''' '''
try: instance = Instance.objects.filter(hostname=node).first()
instance = Instance.objects.get(hostname=node)
if instance.capacity == 0:
active_tasks = []
else:
continue
except Instance.DoesNotExist:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(node,
[j.log_format for j in node_jobs]))
active_tasks = []
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) if instance is None:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(
node, [j.log_format for j in node_jobs]))
active_tasks = []
elif instance.capacity == 0:
active_tasks = []
elif instance.rampart_groups.filter(controller__isnull=False).exists():
active_tasks = all_celery_task_ids
isolated = True
else:
continue
self.fail_jobs_if_not_in_celery(
node_jobs, active_tasks, celery_task_start_time,
isolated=isolated
)
def calculate_capacity_consumed(self, tasks): def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)

View File

@@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None))
@mock.patch('awx.main.scheduler.logger') @mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args): def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))