simplify isolated job reaping by checking all task ids

This commit is contained in:
AlanCoding
2017-09-08 10:01:44 -07:00
parent 878e7ef49f
commit 8e1e60c187
2 changed files with 7 additions and 13 deletions

View File

@@ -6,7 +6,6 @@ from datetime import datetime, timedelta
import logging import logging
import uuid import uuid
from sets import Set from sets import Set
import itertools
# Django # Django
from django.conf import settings from django.conf import settings
@@ -437,7 +436,7 @@ class TaskManager():
if isolated: if isolated:
# TODO: cancel and reap artifacts of lost jobs from heartbeat # TODO: cancel and reap artifacts of lost jobs from heartbeat
task.job_explanation += ' '.join(( task.job_explanation += ' '.join((
'Task was marked as running in Tower but its' 'Task was marked as running in Tower but its ',
'controller management daemon was not present in', 'controller management daemon was not present in',
'Celery, so it has been marked as failed.', 'Celery, so it has been marked as failed.',
'Task may still be running, but contactability is unknown.' 'Task may still be running, but contactability is unknown.'
@@ -452,7 +451,7 @@ class TaskManager():
except DatabaseError: except DatabaseError:
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format)) logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
continue continue
awx_tasks._send_notification_templates(task, new_status) awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status(new_status) task.websocket_emit_status(new_status)
logger.error("{}Task {} has no record in celery. Marking as failed".format( logger.error("{}Task {} has no record in celery. Marking as failed".format(
'Isolated ' if isolated else '', task.log_format)) 'Isolated ' if isolated else '', task.log_format))
@@ -506,16 +505,11 @@ class TaskManager():
active_tasks = [] active_tasks = []
elif instance.capacity == 0: elif instance.capacity == 0:
active_tasks = [] active_tasks = []
elif instance.rampart_groups.filter(controller__isnull=False).exists():
active_tasks = all_celery_task_ids
isolated = True
else: else:
cids = instance.rampart_groups.filter( continue
controller__isnull=False).values_list('id', flat=True)
if cids:
control_hosts = Instance.objects.filter(
rampart_groups__in=cids).values_list('hostname', flat=True)
active_tasks = set(itertools.chain(active_queues[host] for host in control_hosts))
isolated = True
else:
continue
self.fail_jobs_if_not_in_celery( self.fail_jobs_if_not_in_celery(
node_jobs, active_tasks, celery_task_start_time, node_jobs, active_tasks, celery_task_start_time,

View File

@@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'prefetch_related', return_value=[]) @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None))
@mock.patch('awx.main.scheduler.logger') @mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args): def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))