mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
account for waiting tasks not having execution_nodes yet
* Reap running tasks on non-netsplit nodes * Reap running tasks on known to be offline nodes * Reap waiting tasks with no celery id anywhere if waiting >= 60 seconds
This commit is contained in:
@@ -78,13 +78,18 @@ class TaskManager():
|
|||||||
'''
|
'''
|
||||||
def get_running_tasks(self):
|
def get_running_tasks(self):
|
||||||
execution_nodes = {}
|
execution_nodes = {}
|
||||||
|
waiting_jobs = []
|
||||||
now = tz_now()
|
now = tz_now()
|
||||||
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
||||||
jobs = UnifiedJob.objects.filter((Q(status='running') |
|
jobs = UnifiedJob.objects.filter((Q(status='running') |
|
||||||
Q(status='waiting', modified__lte=now - timedelta(seconds=60))) &
|
Q(status='waiting', modified__lte=now - timedelta(seconds=60))) &
|
||||||
~Q(polymorphic_ctype_id=workflow_ctype_id))
|
~Q(polymorphic_ctype_id=workflow_ctype_id))
|
||||||
[execution_nodes.setdefault(j.execution_node, [j]).append(j) for j in jobs]
|
for j in jobs:
|
||||||
return execution_nodes
|
if j.execution_node:
|
||||||
|
execution_nodes.setdefault(j.execution_node, [j]).append(j)
|
||||||
|
else:
|
||||||
|
waiting_jobs.append(j)
|
||||||
|
return (execution_nodes, waiting_jobs)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Tasks that are currently running in celery
|
Tasks that are currently running in celery
|
||||||
@@ -410,6 +415,27 @@ class TaskManager():
|
|||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
||||||
|
|
||||||
|
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time):
|
||||||
|
for task in node_jobs:
|
||||||
|
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||||
|
if isinstance(task, WorkflowJob):
|
||||||
|
continue
|
||||||
|
if task.modified > celery_task_start_time:
|
||||||
|
continue
|
||||||
|
task.status = 'failed'
|
||||||
|
task.job_explanation += ' '.join((
|
||||||
|
'Task was marked as running in Tower but was not present in',
|
||||||
|
'Celery, so it has been marked as failed.',
|
||||||
|
))
|
||||||
|
try:
|
||||||
|
task.save(update_fields=['status', 'job_explanation'])
|
||||||
|
except DatabaseError:
|
||||||
|
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
||||||
|
continue
|
||||||
|
awx_tasks._send_notification_templates(task, 'failed')
|
||||||
|
task.websocket_emit_status('failed')
|
||||||
|
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
|
||||||
|
|
||||||
def cleanup_inconsistent_celery_tasks(self):
|
def cleanup_inconsistent_celery_tasks(self):
|
||||||
'''
|
'''
|
||||||
Rectify tower db <-> celery inconsistent view of jobs state
|
Rectify tower db <-> celery inconsistent view of jobs state
|
||||||
@@ -431,7 +457,13 @@ class TaskManager():
|
|||||||
Only consider failing tasks on instances for which we obtained a task
|
Only consider failing tasks on instances for which we obtained a task
|
||||||
list from celery for.
|
list from celery for.
|
||||||
'''
|
'''
|
||||||
running_tasks = self.get_running_tasks()
|
running_tasks, waiting_tasks = self.get_running_tasks()
|
||||||
|
all_celery_task_ids = []
|
||||||
|
for node, node_jobs in active_queues.iteritems():
|
||||||
|
all_celery_task_ids.extend(node_jobs)
|
||||||
|
|
||||||
|
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
|
||||||
|
|
||||||
for node, node_jobs in running_tasks.iteritems():
|
for node, node_jobs in running_tasks.iteritems():
|
||||||
if node in active_queues:
|
if node in active_queues:
|
||||||
active_tasks = active_queues[node]
|
active_tasks = active_queues[node]
|
||||||
@@ -451,25 +483,8 @@ class TaskManager():
|
|||||||
"The node is currently executing jobs {}".format(node,
|
"The node is currently executing jobs {}".format(node,
|
||||||
[j.log_format for j in node_jobs]))
|
[j.log_format for j in node_jobs]))
|
||||||
active_tasks = []
|
active_tasks = []
|
||||||
for task in node_jobs:
|
|
||||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time)
|
||||||
if isinstance(task, WorkflowJob):
|
|
||||||
continue
|
|
||||||
if task.modified > celery_task_start_time:
|
|
||||||
continue
|
|
||||||
task.status = 'failed'
|
|
||||||
task.job_explanation += ' '.join((
|
|
||||||
'Task was marked as running in Tower but was not present in',
|
|
||||||
'Celery, so it has been marked as failed.',
|
|
||||||
))
|
|
||||||
try:
|
|
||||||
task.save(update_fields=['status', 'job_explanation'])
|
|
||||||
except DatabaseError:
|
|
||||||
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
|
||||||
continue
|
|
||||||
awx_tasks._send_notification_templates(task, 'failed')
|
|
||||||
task.websocket_emit_status('failed')
|
|
||||||
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
|
|
||||||
|
|
||||||
def calculate_capacity_used(self, tasks):
|
def calculate_capacity_used(self, tasks):
|
||||||
for rampart_group in self.graph:
|
for rampart_group in self.graph:
|
||||||
|
|||||||
@@ -231,23 +231,23 @@ class TestReaper():
|
|||||||
Instance.objects.create(hostname='host4_offline', capacity=0)
|
Instance.objects.create(hostname='host4_offline', capacity=0)
|
||||||
|
|
||||||
j1 = Job.objects.create(status='pending', execution_node='host1')
|
j1 = Job.objects.create(status='pending', execution_node='host1')
|
||||||
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2', execution_node='host1')
|
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
|
||||||
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3', execution_node='host1')
|
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3')
|
||||||
j3.modified = now - timedelta(seconds=60)
|
j3.modified = now - timedelta(seconds=60)
|
||||||
j3.save(update_fields=['modified'])
|
j3.save(update_fields=['modified'])
|
||||||
j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1')
|
j4 = Job.objects.create(status='running', celery_task_id='considered_j4', execution_node='host1')
|
||||||
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5', execution_node='host1')
|
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5')
|
||||||
j5.modified = now - timedelta(seconds=60)
|
j5.modified = now - timedelta(seconds=60)
|
||||||
j5.save(update_fields=['modified'])
|
j5.save(update_fields=['modified'])
|
||||||
j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6', execution_node='host2')
|
j6 = Job.objects.create(status='waiting', celery_task_id='considered_j6')
|
||||||
j6.modified = now - timedelta(seconds=60)
|
j6.modified = now - timedelta(seconds=60)
|
||||||
j6.save(update_fields=['modified'])
|
j6.save(update_fields=['modified'])
|
||||||
j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2')
|
j7 = Job.objects.create(status='running', celery_task_id='considered_j7', execution_node='host2')
|
||||||
j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2')
|
j8 = Job.objects.create(status='running', celery_task_id='reapable_j7', execution_node='host2')
|
||||||
j9 = Job.objects.create(status='waiting', celery_task_id='host3_j8', execution_node='host3_split')
|
j9 = Job.objects.create(status='waiting', celery_task_id='reapable_j8')
|
||||||
j9.modified = now - timedelta(seconds=60)
|
j9.modified = now - timedelta(seconds=60)
|
||||||
j9.save(update_fields=['modified'])
|
j9.save(update_fields=['modified'])
|
||||||
j10 = Job.objects.create(status='running', execution_node='host3_split')
|
j10 = Job.objects.create(status='running', celery_task_id='host3_j10', execution_node='host3_split')
|
||||||
|
|
||||||
j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
|
j11 = Job.objects.create(status='running', celery_task_id='host4_j11', execution_node='host4_offline')
|
||||||
|
|
||||||
@@ -266,12 +266,16 @@ class TestReaper():
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def running_tasks(self, all_jobs):
|
def running_tasks(self, all_jobs):
|
||||||
return {
|
return {
|
||||||
'host1': all_jobs[2:5] + [all_jobs[11]],
|
'host1': [all_jobs[3]],
|
||||||
'host2': all_jobs[5:8],
|
'host2': [all_jobs[7], all_jobs[8]],
|
||||||
'host3_split': all_jobs[8:10],
|
'host3_split': [all_jobs[9]],
|
||||||
'host4_offline': [all_jobs[10]],
|
'host4_offline': [all_jobs[10]],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def waiting_tasks(self, all_jobs):
|
||||||
|
return [all_jobs[2], all_jobs[4], all_jobs[5], all_jobs[8]]
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def reapable_jobs(self, all_jobs):
|
def reapable_jobs(self, all_jobs):
|
||||||
return [all_jobs[4], all_jobs[7], all_jobs[10]]
|
return [all_jobs[4], all_jobs[7], all_jobs[10]]
|
||||||
@@ -290,10 +294,10 @@ class TestReaper():
|
|||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@mock.patch('awx.main.tasks._send_notification_templates')
|
@mock.patch('awx.main.tasks._send_notification_templates')
|
||||||
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
|
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
|
||||||
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, mocker):
|
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker):
|
||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
|
|
||||||
tm.get_running_tasks = mocker.Mock(return_value=running_tasks)
|
tm.get_running_tasks = mocker.Mock(return_value=(running_tasks, waiting_tasks))
|
||||||
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
|
||||||
|
|
||||||
tm.cleanup_inconsistent_celery_tasks()
|
tm.cleanup_inconsistent_celery_tasks()
|
||||||
@@ -302,7 +306,7 @@ class TestReaper():
|
|||||||
if j not in reapable_jobs:
|
if j not in reapable_jobs:
|
||||||
j.save.assert_not_called()
|
j.save.assert_not_called()
|
||||||
|
|
||||||
assert notify.call_count == 3
|
assert notify.call_count == 4
|
||||||
notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True)
|
notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True)
|
||||||
|
|
||||||
for j in reapable_jobs:
|
for j in reapable_jobs:
|
||||||
@@ -317,22 +321,23 @@ class TestReaper():
|
|||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
|
|
||||||
# Ensure the query grabs the expected jobs
|
# Ensure the query grabs the expected jobs
|
||||||
execution_nodes_jobs = tm.get_running_tasks()
|
execution_nodes_jobs, waiting_jobs = tm.get_running_tasks()
|
||||||
assert 'host1' in execution_nodes_jobs
|
assert 'host1' in execution_nodes_jobs
|
||||||
assert 'host2' in execution_nodes_jobs
|
assert 'host2' in execution_nodes_jobs
|
||||||
assert 'host3_split' in execution_nodes_jobs
|
assert 'host3_split' in execution_nodes_jobs
|
||||||
|
|
||||||
assert all_jobs[2] in execution_nodes_jobs['host1']
|
|
||||||
assert all_jobs[3] in execution_nodes_jobs['host1']
|
assert all_jobs[3] in execution_nodes_jobs['host1']
|
||||||
assert all_jobs[4] in execution_nodes_jobs['host1']
|
|
||||||
|
|
||||||
assert all_jobs[5] in execution_nodes_jobs['host2']
|
|
||||||
assert all_jobs[6] in execution_nodes_jobs['host2']
|
assert all_jobs[6] in execution_nodes_jobs['host2']
|
||||||
assert all_jobs[7] in execution_nodes_jobs['host2']
|
assert all_jobs[7] in execution_nodes_jobs['host2']
|
||||||
|
|
||||||
assert all_jobs[8] in execution_nodes_jobs['host3_split']
|
|
||||||
assert all_jobs[9] in execution_nodes_jobs['host3_split']
|
assert all_jobs[9] in execution_nodes_jobs['host3_split']
|
||||||
|
|
||||||
assert all_jobs[10] in execution_nodes_jobs['host4_offline']
|
assert all_jobs[10] in execution_nodes_jobs['host4_offline']
|
||||||
|
|
||||||
assert all_jobs[11] not in execution_nodes_jobs['host1']
|
assert all_jobs[11] not in execution_nodes_jobs['host1']
|
||||||
|
|
||||||
|
assert all_jobs[2] in waiting_jobs
|
||||||
|
assert all_jobs[4] in waiting_jobs
|
||||||
|
assert all_jobs[5] in waiting_jobs
|
||||||
|
assert all_jobs[8] in waiting_jobs
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ from django.core.cache import cache
|
|||||||
class TestCleanupInconsistentCeleryTasks():
|
class TestCleanupInconsistentCeleryTasks():
|
||||||
@mock.patch.object(cache, 'get', return_value=None)
|
@mock.patch.object(cache, 'get', return_value=None)
|
||||||
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
|
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
|
||||||
@mock.patch.object(TaskManager, 'get_running_tasks', return_value={'host1': [Job(id=2), Job(id=3),]})
|
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
|
||||||
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
@mock.patch.object(InstanceGroup.objects, 'all', return_value=[])
|
||||||
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
|
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
|
||||||
@mock.patch('awx.main.scheduler.logger')
|
@mock.patch('awx.main.scheduler.logger')
|
||||||
@@ -43,7 +43,7 @@ class TestCleanupInconsistentCeleryTasks():
|
|||||||
logger_mock.error = mock.MagicMock()
|
logger_mock.error = mock.MagicMock()
|
||||||
job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1')
|
job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1')
|
||||||
job.websocket_emit_status = mock.MagicMock()
|
job.websocket_emit_status = mock.MagicMock()
|
||||||
get_running_tasks.return_value = {'host1': [job]}
|
get_running_tasks.return_value = ({'host1': [job]}, [])
|
||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
|
|
||||||
with mock.patch.object(job, 'save', side_effect=DatabaseError):
|
with mock.patch.object(job, 'save', side_effect=DatabaseError):
|
||||||
|
|||||||
Reference in New Issue
Block a user