fail all jobs on an offline node

This commit is contained in:
Chris Meyers
2017-08-14 15:41:33 -04:00
parent 9314db646b
commit de82707581
3 changed files with 141 additions and 47 deletions

View File

@@ -10,7 +10,7 @@ from sets import Set
# Django # Django
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.db import transaction, connection from django.db import transaction, connection, DatabaseError
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now as tz_now, utc from django.utils.timezone import now as tz_now, utc
from django.db.models import Q from django.db.models import Q
@@ -78,13 +78,9 @@ class TaskManager():
def get_running_tasks(self): def get_running_tasks(self):
execution_nodes = {} execution_nodes = {}
now = tz_now() now = tz_now()
jobs = list(UnifiedJob.objects.filter(Q(status='running') | jobs = UnifiedJob.objects.filter(Q(status='running') |
(Q(status='waiting', modified__lte=now - timedelta(seconds=60))))) Q(status='waiting', modified__lte=now - timedelta(seconds=60)))
for j in jobs: [execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs]
if j.execution_node in execution_nodes:
execution_nodes[j.execution_node].append(j)
elif j.execution_node not in execution_nodes:
execution_nodes[j.execution_node] = [j]
return execution_nodes return execution_nodes
''' '''
@@ -93,17 +89,21 @@ class TaskManager():
Transform: Transform:
{ {
"celery@ec2-54-204-222-62.compute-1.amazonaws.com": [], "celery@ec2-54-204-222-62.compute-1.amazonaws.com": [],
"celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{ "celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{
... ...
"id": "5238466a-f8c7-43b3-9180-5b78e9da8304", "id": "5238466a-f8c7-43b3-9180-5b78e9da8304",
... ...
}] }, {
...,
}, ...]
} }
to: to:
{ {
"celery@ec2-54-204-222-62.compute-1.amazonaws.com": [ "ec2-54-204-222-62.compute-1.amazonaws.com": [
"5238466a-f8c7-43b3-9180-5b78e9da8304", "5238466a-f8c7-43b3-9180-5b78e9da8304",
"5238466a-f8c7-43b3-9180-5b78e9da8306",
...
] ]
} }
''' '''
@@ -123,12 +123,9 @@ class TaskManager():
active_tasks = set() active_tasks = set()
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
# queue is of the form celery@myhost.com # celery worker name is of the form celery@myhost.com
queue_name = queue.split('@') queue_name = queue.split('@')
if len(queue_name) > 1: queue_name = queue_name[1 if len(queue_name) > 1 else 0]
queue_name = queue_name[1]
else:
queue_name = queue_name[0]
queues[queue_name] = active_tasks queues[queue_name] = active_tasks
else: else:
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
@@ -431,14 +428,27 @@ class TaskManager():
Only consider failing tasks on instances for which we obtained a task Only consider failing tasks on instances for which we obtained a task
list from celery for. list from celery for.
''' '''
execution_nodes_jobs = self.get_running_tasks() running_tasks = self.get_running_tasks()
for node, node_jobs in execution_nodes_jobs.iteritems(): for node, node_jobs in running_tasks.iteritems():
if node not in active_queues: if node in active_queues:
continue active_tasks = active_queues[node]
active_tasks = active_queues[node] else:
'''
Node task list not found in celery. If tower thinks the node is down
then fail all the jobs on the node.
'''
try:
instance = Instance.objects.get(hostname=node)
if instance.capacity == 0:
active_tasks = []
else:
continue
except Instance.DoesNotExist:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs]))
active_tasks = []
for task in node_jobs: for task in node_jobs:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# TODO: try catch the getting of the job. The job COULD have been deleted
if isinstance(task, WorkflowJob): if isinstance(task, WorkflowJob):
continue continue
if task.modified > celery_task_start_time: if task.modified > celery_task_start_time:
@@ -448,10 +458,14 @@ class TaskManager():
'Task was marked as running in Tower but was not present in', 'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.', 'Celery, so it has been marked as failed.',
)) ))
task.save() try:
task.save(update_fields=['status', 'job_explanation'])
except DatabaseError:
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
continue
awx_tasks._send_notification_templates(task, 'failed') awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed') task.websocket_emit_status('failed')
logger.error("Task %s appears orphaned... marking as failed" % task) logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
def calculate_capacity_used(self, tasks): def calculate_capacity_used(self, tasks):
for rampart_group in self.graph: for rampart_group in self.graph:

View File

@@ -8,6 +8,7 @@ from django.utils.timezone import now as tz_now
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager
from awx.main.models import ( from awx.main.models import (
Job, Job,
Instance,
) )
@@ -223,21 +224,33 @@ class TestReaper():
def all_jobs(self, mocker): def all_jobs(self, mocker):
now = tz_now() now = tz_now()
Instance.objects.create(hostname='host1', capacity=100)
Instance.objects.create(hostname='host2', capacity=100)
Instance.objects.create(hostname='host3_split', capacity=100)
Instance.objects.create(hostname='host4_offline', capacity=0)
j1 = Job.objects.create(status='pending', execution_node='host1') j1 = Job.objects.create(status='pending', execution_node='host1')
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1') j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1')
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1') j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1')
j3.modified = now - timedelta(seconds=60) j3.modified = now - timedelta(seconds=60)
j3.save(update_fields=['modified']) j3.save(update_fields=['modified'])
j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1') j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1')
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host2') j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host1')
j5.modified = now - timedelta(seconds=60) j5.modified = now - timedelta(seconds=60)
j5.save(update_fields=['modified']) j5.save(update_fields=['modified'])
j6 = Job.objects.create(status='waiting', celery_task_id='host2_j6', execution_node='host2_split') j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6', execution_node='host2')
j6.modified = now - timedelta(seconds=60) j6.modified = now - timedelta(seconds=60)
j6.save(update_fields=['modified']) j6.save(update_fields=['modified'])
j7 = Job.objects.create(status='running', celery_task_id='host2_j6', execution_node='host2_split') j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2')
j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2')
j9 = Job.objects.create(status='waiting', celery_task_id='host3_j8', execution_node='host3_split')
j9.modified = now - timedelta(seconds=60)
j9.save(update_fields=['modified'])
j10 = Job.objects.create(status='running', execution_node='host3_split')
js = [j1, j2, j3, j4, j5, j6, j7] j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11]
for j in js: for j in js:
j.save = mocker.Mock(wraps=j.save) j.save = mocker.Mock(wraps=j.save)
j.websocket_emit_status = mocker.Mock() j.websocket_emit_status = mocker.Mock()
@@ -245,11 +258,20 @@ class TestReaper():
@pytest.fixture @pytest.fixture
def considered_jobs(self, all_jobs): def considered_jobs(self, all_jobs):
return all_jobs[2:4] + [all_jobs[4]] return all_jobs[2:7] + [all_jobs[10]]
@pytest.fixture
def running_tasks(self, all_jobs):
return {
'host1': all_jobs[2:5],
'host2': all_jobs[5:8],
'host3_split': all_jobs[8:10],
'host4_offline': [all_jobs[10]],
}
@pytest.fixture @pytest.fixture
def reapable_jobs(self, all_jobs): def reapable_jobs(self, all_jobs):
return [all_jobs[4]] return [all_jobs[4], all_jobs[7], all_jobs[10]]
@pytest.fixture @pytest.fixture
def unconsidered_jobs(self, all_jobs): def unconsidered_jobs(self, all_jobs):
@@ -259,16 +281,16 @@ class TestReaper():
def active_tasks(self): def active_tasks(self):
return ([], { return ([], {
'host1': ['considered_j2', 'considered_j3', 'considered_j4',], 'host1': ['considered_j2', 'considered_j3', 'considered_j4',],
'host2_split': ['host2_j6', 'host2_j7'], 'host2': ['considered_j6', 'considered_j7'],
}) })
@pytest.mark.django_db @pytest.mark.django_db
@mock.patch('awx.main.tasks._send_notification_templates') @mock.patch('awx.main.tasks._send_notification_templates')
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], [])) @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, mocker): def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, mocker):
tm = TaskManager() tm = TaskManager()
#tm.get_running_tasks = mocker.Mock(return_value=considered_jobs) tm.get_running_tasks = mocker.Mock(return_value=running_tasks)
tm.get_active_tasks = mocker.Mock(return_value=active_tasks) tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
tm.cleanup_inconsistent_celery_tasks() tm.cleanup_inconsistent_celery_tasks()
@@ -277,15 +299,16 @@ class TestReaper():
if j not in reapable_jobs: if j not in reapable_jobs:
j.save.assert_not_called() j.save.assert_not_called()
for reaped_job in reapable_jobs: assert notify.call_count == 3
notify.assert_called_once_with(reaped_job, 'failed') notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True)
reaped_job.websocket_emit_status.assert_called_once_with('failed')
assert reaped_job.status == 'failed' for j in reapable_jobs:
assert reaped_job.job_explanation == ( j.websocket_emit_status.assert_called_once_with('failed')
assert j.status == 'failed'
assert j.job_explanation == (
'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.'
) )
@pytest.mark.django_db @pytest.mark.django_db
def test_get_running_tasks(self, all_jobs): def test_get_running_tasks(self, all_jobs):
tm = TaskManager() tm = TaskManager()
@@ -293,13 +316,18 @@ class TestReaper():
# Ensure the query grabs the expected jobs # Ensure the query grabs the expected jobs
execution_nodes_jobs = tm.get_running_tasks() execution_nodes_jobs = tm.get_running_tasks()
assert 'host1' in execution_nodes_jobs assert 'host1' in execution_nodes_jobs
assert 'host2_split' in execution_nodes_jobs assert 'host2' in execution_nodes_jobs
assert 'host3_split' in execution_nodes_jobs
assert all_jobs[1] in execution_nodes_jobs['host1']
assert all_jobs[2] in execution_nodes_jobs['host1'] assert all_jobs[2] in execution_nodes_jobs['host1']
assert all_jobs[3] in execution_nodes_jobs['host1'] assert all_jobs[3] in execution_nodes_jobs['host1']
assert all_jobs[4] in execution_nodes_jobs['host1'] assert all_jobs[4] in execution_nodes_jobs['host1']
assert all_jobs[5] in execution_nodes_jobs['host2_split']
assert all_jobs[6] in execution_nodes_jobs['host2_split']
assert all_jobs[5] in execution_nodes_jobs['host2']
assert all_jobs[6] in execution_nodes_jobs['host2']
assert all_jobs[7] in execution_nodes_jobs['host2']
assert all_jobs[8] in execution_nodes_jobs['host3_split']
assert all_jobs[9] in execution_nodes_jobs['host3_split']
assert all_jobs[10] in execution_nodes_jobs['host4_offline']

View File

@@ -0,0 +1,52 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
import mock
import pytest
from django.utils.timezone import now as tz_now
from django.db import DatabaseError
from awx.main.scheduler import TaskManager
from awx.main.models import (
Job,
Instance,
InstanceGroup,
)
from django.core.cache import cache
class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value={'host1': [Job(id=2), Job(id=3),]})
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
@mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
tm = TaskManager()
with pytest.raises(RuntimeError) as excinfo:
tm.cleanup_inconsistent_celery_tasks()
assert "mocked" in str(excinfo.value)
logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. "
"The node is currently executing jobs ['None-2-new', "
"'None-3-new']")
@mock.patch.object(cache, 'get', return_value=None)
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
@mock.patch.object(TaskManager, 'get_running_tasks')
@mock.patch('awx.main.scheduler.logger')
def test_save_failed(self, logger_mock, get_running_tasks, *args):
logger_mock.error = mock.MagicMock()
job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1')
job.websocket_emit_status = mock.MagicMock()
get_running_tasks.return_value = {'host1': [job]}
tm = TaskManager()
with mock.patch.object(job, 'save', side_effect=DatabaseError):
tm.cleanup_inconsistent_celery_tasks()
job.save.assert_called_once()
logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.")