mirror of
https://github.com/ansible/awx.git
synced 2026-01-10 15:32:07 -03:30
Merge pull request #256 from chrismeyersfsu/fix-7399
fix reaper for netsplit and offline nodes
This commit is contained in:
commit
c0cff5e0d2
@ -10,7 +10,7 @@ from sets import Set
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from django.db import transaction, connection
|
||||
from django.db import transaction, connection, DatabaseError
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.timezone import now as tz_now, utc
|
||||
from django.db.models import Q
|
||||
@ -69,14 +69,43 @@ class TaskManager():
|
||||
|
||||
'''
|
||||
Tasks that are running and SHOULD have a celery task.
|
||||
{
|
||||
'execution_node': [j1, j2,...],
|
||||
'execution_node': [j3],
|
||||
...
|
||||
}
|
||||
'''
|
||||
def get_running_tasks(self):
|
||||
execution_nodes = {}
|
||||
now = tz_now()
|
||||
return list(UnifiedJob.objects.filter(Q(status='running') |
|
||||
(Q(status='waiting', modified__lte=now - timedelta(seconds=60)))))
|
||||
jobs = UnifiedJob.objects.filter(Q(status='running') |
|
||||
Q(status='waiting', modified__lte=now - timedelta(seconds=60)))
|
||||
[execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs]
|
||||
return execution_nodes
|
||||
|
||||
'''
|
||||
Tasks that are currently running in celery
|
||||
|
||||
Transform:
|
||||
{
|
||||
"celery@ec2-54-204-222-62.compute-1.amazonaws.com": [],
|
||||
"celery@ec2-54-163-144-168.compute-1.amazonaws.com": [{
|
||||
...
|
||||
"id": "5238466a-f8c7-43b3-9180-5b78e9da8304",
|
||||
...
|
||||
}, {
|
||||
...,
|
||||
}, ...]
|
||||
}
|
||||
|
||||
to:
|
||||
{
|
||||
"ec2-54-204-222-62.compute-1.amazonaws.com": [
|
||||
"5238466a-f8c7-43b3-9180-5b78e9da8304",
|
||||
"5238466a-f8c7-43b3-9180-5b78e9da8306",
|
||||
...
|
||||
]
|
||||
}
|
||||
'''
|
||||
def get_active_tasks(self):
|
||||
inspector = inspect()
|
||||
@ -86,15 +115,23 @@ class TaskManager():
|
||||
logger.warn("Ignoring celery task inspector")
|
||||
active_task_queues = None
|
||||
|
||||
active_tasks = set()
|
||||
queues = None
|
||||
|
||||
if active_task_queues is not None:
|
||||
queues = {}
|
||||
for queue in active_task_queues:
|
||||
active_tasks = set()
|
||||
map(lambda at: active_tasks.add(at['id']), active_task_queues[queue])
|
||||
|
||||
# celery worker name is of the form celery@myhost.com
|
||||
queue_name = queue.split('@')
|
||||
queue_name = queue_name[1 if len(queue_name) > 1 else 0]
|
||||
queues[queue_name] = active_tasks
|
||||
else:
|
||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
return (None, None)
|
||||
|
||||
return (active_task_queues, active_tasks)
|
||||
return (active_task_queues, queues)
|
||||
|
||||
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
||||
project_ids = Set()
|
||||
@ -380,32 +417,55 @@ class TaskManager():
|
||||
|
||||
logger.debug("Failing inconsistent running jobs.")
|
||||
celery_task_start_time = tz_now()
|
||||
active_task_queues, active_tasks = self.get_active_tasks()
|
||||
active_task_queues, active_queues = self.get_active_tasks()
|
||||
cache.set('last_celery_task_cleanup', tz_now())
|
||||
|
||||
if active_tasks is None:
|
||||
if active_queues is None:
|
||||
logger.error('Failed to retrieve active tasks from celery')
|
||||
return None
|
||||
|
||||
all_running_sorted_tasks = self.get_running_tasks()
|
||||
for task in all_running_sorted_tasks:
|
||||
|
||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
# TODO: try catch the getting of the job. The job COULD have been deleted
|
||||
if isinstance(task, WorkflowJob):
|
||||
continue
|
||||
if task.modified > celery_task_start_time:
|
||||
continue
|
||||
task.status = 'failed'
|
||||
task.job_explanation += ' '.join((
|
||||
'Task was marked as running in Tower but was not present in',
|
||||
'Celery, so it has been marked as failed.',
|
||||
))
|
||||
task.save()
|
||||
awx_tasks._send_notification_templates(task, 'failed')
|
||||
task.websocket_emit_status('failed')
|
||||
logger.error("%s appears orphaned... marking as failed", task.log_format)
|
||||
|
||||
'''
|
||||
Only consider failing tasks on instances for which we obtained a task
|
||||
list from celery for.
|
||||
'''
|
||||
running_tasks = self.get_running_tasks()
|
||||
for node, node_jobs in running_tasks.iteritems():
|
||||
if node in active_queues:
|
||||
active_tasks = active_queues[node]
|
||||
else:
|
||||
'''
|
||||
Node task list not found in celery. If tower thinks the node is down
|
||||
then fail all the jobs on the node.
|
||||
'''
|
||||
try:
|
||||
instance = Instance.objects.get(hostname=node)
|
||||
if instance.capacity == 0:
|
||||
active_tasks = []
|
||||
else:
|
||||
continue
|
||||
except Instance.DoesNotExist:
|
||||
logger.error("Execution node Instance {} not found in database. "
|
||||
"The node is currently executing jobs {}".format(node, [str(j) for j in node_jobs]))
|
||||
active_tasks = []
|
||||
for task in node_jobs:
|
||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
if isinstance(task, WorkflowJob):
|
||||
continue
|
||||
if task.modified > celery_task_start_time:
|
||||
continue
|
||||
task.status = 'failed'
|
||||
task.job_explanation += ' '.join((
|
||||
'Task was marked as running in Tower but was not present in',
|
||||
'Celery, so it has been marked as failed.',
|
||||
))
|
||||
try:
|
||||
task.save(update_fields=['status', 'job_explanation'])
|
||||
except DatabaseError:
|
||||
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
||||
continue
|
||||
awx_tasks._send_notification_templates(task, 'failed')
|
||||
task.websocket_emit_status('failed')
|
||||
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
|
||||
|
||||
def calculate_capacity_used(self, tasks):
|
||||
for rampart_group in self.graph:
|
||||
|
||||
@ -8,49 +8,10 @@ from django.utils.timezone import now as tz_now
|
||||
from awx.main.scheduler import TaskManager
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
Instance,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def all_jobs(mocker):
|
||||
now = tz_now()
|
||||
j1 = Job.objects.create(status='pending')
|
||||
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
|
||||
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3')
|
||||
j3.modified = now - timedelta(seconds=60)
|
||||
j3.save(update_fields=['modified'])
|
||||
j4 = Job.objects.create(status='running', celery_task_id='considered_j4')
|
||||
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5')
|
||||
j5.modified = now - timedelta(seconds=60)
|
||||
j5.save(update_fields=['modified'])
|
||||
|
||||
js = [j1, j2, j3, j4, j5]
|
||||
for j in js:
|
||||
j.save = mocker.Mock(wraps=j.save)
|
||||
j.websocket_emit_status = mocker.Mock()
|
||||
return js
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def considered_jobs(all_jobs):
|
||||
return all_jobs[2:4] + [all_jobs[4]]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reapable_jobs(all_jobs):
|
||||
return [all_jobs[4]]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unconsidered_jobs(all_jobs):
|
||||
return all_jobs[0:1]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def active_tasks():
|
||||
return ([], ['considered_j2', 'considered_j3', 'considered_j4',])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
|
||||
objects = job_template_factory('jt', organization='org1', project='proj',
|
||||
@ -258,41 +219,115 @@ def test_cleanup_interval():
|
||||
assert cache.get('last_celery_task_cleanup') == last_cleanup
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.tasks._send_notification_templates')
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []])
|
||||
def test_cleanup_inconsistent_task(notify, active_tasks, considered_jobs, reapable_jobs, mocker):
|
||||
tm = TaskManager()
|
||||
class TestReaper():
|
||||
@pytest.fixture
|
||||
def all_jobs(self, mocker):
|
||||
now = tz_now()
|
||||
|
||||
tm.get_running_tasks = mocker.Mock(return_value=considered_jobs)
|
||||
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
||||
|
||||
tm.cleanup_inconsistent_celery_tasks()
|
||||
|
||||
for j in considered_jobs:
|
||||
if j not in reapable_jobs:
|
||||
j.save.assert_not_called()
|
||||
Instance.objects.create(hostname='host1', capacity=100)
|
||||
Instance.objects.create(hostname='host2', capacity=100)
|
||||
Instance.objects.create(hostname='host3_split', capacity=100)
|
||||
Instance.objects.create(hostname='host4_offline', capacity=0)
|
||||
|
||||
for reaped_job in reapable_jobs:
|
||||
notify.assert_called_once_with(reaped_job, 'failed')
|
||||
reaped_job.websocket_emit_status.assert_called_once_with('failed')
|
||||
assert reaped_job.status == 'failed'
|
||||
assert reaped_job.job_explanation == (
|
||||
'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.'
|
||||
)
|
||||
j1 = Job.objects.create(status='pending', execution_node='host1')
|
||||
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1')
|
||||
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1')
|
||||
j3.modified = now - timedelta(seconds=60)
|
||||
j3.save(update_fields=['modified'])
|
||||
j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1')
|
||||
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host1')
|
||||
j5.modified = now - timedelta(seconds=60)
|
||||
j5.save(update_fields=['modified'])
|
||||
j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6', execution_node='host2')
|
||||
j6.modified = now - timedelta(seconds=60)
|
||||
j6.save(update_fields=['modified'])
|
||||
j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2')
|
||||
j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2')
|
||||
j9 = Job.objects.create(status='waiting', celery_task_id='host3_j8', execution_node='host3_split')
|
||||
j9.modified = now - timedelta(seconds=60)
|
||||
j9.save(update_fields=['modified'])
|
||||
j10 = Job.objects.create(status='running', execution_node='host3_split')
|
||||
|
||||
j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_running_tasks(considered_jobs, reapable_jobs, unconsidered_jobs):
|
||||
tm = TaskManager()
|
||||
js = [j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11]
|
||||
for j in js:
|
||||
j.save = mocker.Mock(wraps=j.save)
|
||||
j.websocket_emit_status = mocker.Mock()
|
||||
return js
|
||||
|
||||
# Ensure the query grabs the expected jobs
|
||||
rt = tm.get_running_tasks()
|
||||
for j in considered_jobs:
|
||||
assert j in rt
|
||||
for j in reapable_jobs:
|
||||
assert j in rt
|
||||
for j in unconsidered_jobs:
|
||||
assert j in unconsidered_jobs
|
||||
@pytest.fixture
|
||||
def considered_jobs(self, all_jobs):
|
||||
return all_jobs[2:7] + [all_jobs[10]]
|
||||
|
||||
@pytest.fixture
|
||||
def running_tasks(self, all_jobs):
|
||||
return {
|
||||
'host1': all_jobs[2:5],
|
||||
'host2': all_jobs[5:8],
|
||||
'host3_split': all_jobs[8:10],
|
||||
'host4_offline': [all_jobs[10]],
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def reapable_jobs(self, all_jobs):
|
||||
return [all_jobs[4], all_jobs[7], all_jobs[10]]
|
||||
|
||||
@pytest.fixture
|
||||
def unconsidered_jobs(self, all_jobs):
|
||||
return all_jobs[0:1] + all_jobs[5:7]
|
||||
|
||||
@pytest.fixture
|
||||
def active_tasks(self):
|
||||
return ([], {
|
||||
'host1': ['considered_j2', 'considered_j3', 'considered_j4',],
|
||||
'host2': ['considered_j6', 'considered_j7'],
|
||||
})
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.tasks._send_notification_templates')
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
|
||||
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, mocker):
|
||||
tm = TaskManager()
|
||||
|
||||
tm.get_running_tasks = mocker.Mock(return_value=running_tasks)
|
||||
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
||||
|
||||
tm.cleanup_inconsistent_celery_tasks()
|
||||
|
||||
for j in considered_jobs:
|
||||
if j not in reapable_jobs:
|
||||
j.save.assert_not_called()
|
||||
|
||||
assert notify.call_count == 3
|
||||
notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True)
|
||||
|
||||
for j in reapable_jobs:
|
||||
j.websocket_emit_status.assert_called_once_with('failed')
|
||||
assert j.status == 'failed'
|
||||
assert j.job_explanation == (
|
||||
'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.'
|
||||
)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_running_tasks(self, all_jobs):
|
||||
tm = TaskManager()
|
||||
|
||||
# Ensure the query grabs the expected jobs
|
||||
execution_nodes_jobs = tm.get_running_tasks()
|
||||
assert 'host1' in execution_nodes_jobs
|
||||
assert 'host2' in execution_nodes_jobs
|
||||
assert 'host3_split' in execution_nodes_jobs
|
||||
|
||||
assert all_jobs[2] in execution_nodes_jobs['host1']
|
||||
assert all_jobs[3] in execution_nodes_jobs['host1']
|
||||
assert all_jobs[4] in execution_nodes_jobs['host1']
|
||||
|
||||
assert all_jobs[5] in execution_nodes_jobs['host2']
|
||||
assert all_jobs[6] in execution_nodes_jobs['host2']
|
||||
assert all_jobs[7] in execution_nodes_jobs['host2']
|
||||
|
||||
assert all_jobs[8] in execution_nodes_jobs['host3_split']
|
||||
assert all_jobs[9] in execution_nodes_jobs['host3_split']
|
||||
|
||||
assert all_jobs[10] in execution_nodes_jobs['host4_offline']
|
||||
|
||||
52
awx/main/tests/unit/test_task_manager.py
Normal file
52
awx/main/tests/unit/test_task_manager.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright (c) 2017 Ansible by Red Hat
|
||||
# All Rights Reserved.
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from django.utils.timezone import now as tz_now
|
||||
from django.db import DatabaseError
|
||||
|
||||
from awx.main.scheduler import TaskManager
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
Instance,
|
||||
InstanceGroup,
|
||||
)
|
||||
from django.core.cache import cache
|
||||
|
||||
|
||||
class TestCleanupInconsistentCeleryTasks():
|
||||
@mock.patch.object(cache, 'get', return_value=None)
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
|
||||
@mock.patch.object(TaskManager, 'get_running_tasks', return_value={'host1': [Job(id=2), Job(id=3),]})
|
||||
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
||||
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
|
||||
@mock.patch('awx.main.scheduler.logger')
|
||||
def test_instance_does_not_exist(self, logger_mock, *args):
|
||||
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))
|
||||
tm = TaskManager()
|
||||
with pytest.raises(RuntimeError) as excinfo:
|
||||
tm.cleanup_inconsistent_celery_tasks()
|
||||
|
||||
assert "mocked" in str(excinfo.value)
|
||||
logger_mock.error.assert_called_once_with("Execution node Instance host1 not found in database. "
|
||||
"The node is currently executing jobs ['None-2-new', "
|
||||
"'None-3-new']")
|
||||
|
||||
@mock.patch.object(cache, 'get', return_value=None)
|
||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []}))
|
||||
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
||||
@mock.patch.object(TaskManager, 'get_running_tasks')
|
||||
@mock.patch('awx.main.scheduler.logger')
|
||||
def test_save_failed(self, logger_mock, get_running_tasks, *args):
|
||||
logger_mock.error = mock.MagicMock()
|
||||
job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1')
|
||||
job.websocket_emit_status = mock.MagicMock()
|
||||
get_running_tasks.return_value = {'host1': [job]}
|
||||
tm = TaskManager()
|
||||
|
||||
with mock.patch.object(job, 'save', side_effect=DatabaseError):
|
||||
tm.cleanup_inconsistent_celery_tasks()
|
||||
job.save.assert_called_once()
|
||||
logger_mock.error.assert_called_once_with("Task job 2 (failed) DB error in marking failed. Job possibly deleted.")
|
||||
Loading…
x
Reference in New Issue
Block a user