reap isolated jobs

This commit is contained in:
AlanCoding 2017-08-31 17:37:40 -04:00
parent 56e9d7b8e2
commit 878e7ef49f
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
2 changed files with 52 additions and 22 deletions

View File

@ -6,6 +6,7 @@ from datetime import datetime, timedelta
import logging
import uuid
from sets import Set
import itertools
# Django
from django.conf import settings
@ -421,26 +422,40 @@ class TaskManager():
if not found_acceptable_queue:
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time):
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
isolated=False):
for task in node_jobs:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
if isinstance(task, WorkflowJob):
continue
if task.modified > celery_task_start_time:
continue
task.status = 'failed'
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
new_status = 'failed'
if isolated:
new_status = 'error'
task.status = new_status
if isolated:
# TODO: cancel and reap artifacts of lost jobs from heartbeat
task.job_explanation += ' '.join((
'Task was marked as running in Tower but its'
'controller management daemon was not present in',
'Celery, so it has been marked as failed.',
'Task may still be running, but contactability is unknown.'
))
else:
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
try:
task.save(update_fields=['status', 'job_explanation'])
except DatabaseError:
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
continue
awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed')
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
awx_tasks._send_notification_templates(task, new_status)
task.websocket_emit_status(new_status)
logger.error("{}Task {} has no record in celery. Marking as failed".format(
'Isolated ' if isolated else '', task.log_format))
def cleanup_inconsistent_celery_tasks(self):
'''
@ -471,26 +486,41 @@ class TaskManager():
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
for node, node_jobs in running_tasks.iteritems():
isolated = False
if node in active_queues:
active_tasks = active_queues[node]
else:
'''
Node task list not found in celery. If tower thinks the node is down
then fail all the jobs on the node.
Node task list not found in celery. We may branch into cases:
- instance is unknown to tower, system is improperly configured
- instance is reported as down, then fail all jobs on the node
- instance is an isolated node, then check running tasks
among all allowed controller nodes for management process
'''
try:
instance = Instance.objects.get(hostname=node)
if instance.capacity == 0:
active_tasks = []
instance = Instance.objects.filter(hostname=node).first()
if instance is None:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(
node, [j.log_format for j in node_jobs]))
active_tasks = []
elif instance.capacity == 0:
active_tasks = []
else:
cids = instance.rampart_groups.filter(
controller__isnull=False).values_list('id', flat=True)
if cids:
control_hosts = Instance.objects.filter(
rampart_groups__in=cids).values_list('hostname', flat=True)
active_tasks = set(itertools.chain(active_queues[host] for host in control_hosts))
isolated = True
else:
continue
except Instance.DoesNotExist:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(node,
[j.log_format for j in node_jobs]))
active_tasks = []
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time)
self.fail_jobs_if_not_in_celery(
node_jobs, active_tasks, celery_task_start_time,
isolated=isolated
)
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)

View File

@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
@mock.patch.object(Instance.objects, 'prefetch_related', return_value=[])
@mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))