reap waiting processes if crash

This commit is contained in:
Chris Meyers
2017-08-08 15:36:42 -04:00
parent 0c17744871
commit c3f24d878d
6 changed files with 110 additions and 21 deletions

View File

@@ -958,6 +958,15 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return (True, opts) return (True, opts)
def start_celery_task(self, opts, error_callback, success_callback, queue): def start_celery_task(self, opts, error_callback, success_callback, queue):
kwargs = {
'link_error': error_callback,
'link': success_callback,
'queue': None,
'task_id': None,
}
if not self.celery_task_id:
raise RuntimeError("Expected celery_task_id to be set on model.")
kwargs['task_id'] = self.celery_task_id
task_class = self._get_task_class() task_class = self._get_task_class()
from awx.main.models.ha import InstanceGroup from awx.main.models.ha import InstanceGroup
ig = InstanceGroup.objects.get(name=queue) ig = InstanceGroup.objects.get(name=queue)
@@ -968,7 +977,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
args.append(isolated_instance.hostname) args.append(isolated_instance.hostname)
else: # proj & inv updates, system jobs run on controller else: # proj & inv updates, system jobs run on controller
queue = ig.controller.name queue = ig.controller.name
task_class().apply_async(args, opts, link_error=error_callback, link=success_callback, queue=queue) kwargs['queue'] = queue
task_class().apply_async(args, opts, **kwargs)
def start(self, error_callback, success_callback, **kwargs): def start(self, error_callback, success_callback, **kwargs):
''' '''

View File

@@ -4,6 +4,7 @@
# Python # Python
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import uuid
from sets import Set from sets import Set
# Django # Django
@@ -12,6 +13,7 @@ from django.core.cache import cache
from django.db import transaction, connection from django.db import transaction, connection
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now as tz_now, utc from django.utils.timezone import now as tz_now, utc
from django.db.models import Q
# AWX # AWX
from awx.main.models import * # noqa from awx.main.models import * # noqa
@@ -68,10 +70,10 @@ class TaskManager():
''' '''
Tasks that are running and SHOULD have a celery task. Tasks that are running and SHOULD have a celery task.
''' '''
def get_running_tasks(self, all_tasks=None): def get_running_tasks(self):
if all_tasks is None: now = tz_now()
return self.get_tasks(status_list=('running',)) return list(UnifiedJob.objects.filter(Q(status='running') |
return filter(lambda t: t.status == 'running', all_tasks) (Q(status='waiting', modified__lte=now - timedelta(seconds=60)))))
''' '''
Tasks that are currently running in celery Tasks that are currently running in celery
@@ -216,6 +218,7 @@ class TaskManager():
task.instance_group = rampart_group task.instance_group = rampart_group
logger.info('Submitting job {} to instance group {}.'.format(task.id, task.instance_group_id)) logger.info('Submitting job {} to instance group {}.'.format(task.id, task.instance_group_id))
with disable_activity_stream(): with disable_activity_stream():
task.celery_task_id = uuid.uuid4()
task.save() task.save()
self.consume_capacity(task, rampart_group.name) self.consume_capacity(task, rampart_group.name)

View File

@@ -755,7 +755,7 @@ class BaseTask(LogErrorsTask):
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) instance = self.update_model(pk, status='running')
instance.websocket_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''

View File

@@ -78,7 +78,8 @@ class TestIsolatedRuns:
iso_ig.instances.create(hostname='iso1', capacity=50) iso_ig.instances.create(hostname='iso1', capacity=50)
i2 = iso_ig.instances.create(hostname='iso2', capacity=200) i2 = iso_ig.instances.create(hostname='iso2', capacity=200)
job = Job.objects.create( job = Job.objects.create(
instance_group=iso_ig instance_group=iso_ig,
celery_task_id='something',
) )
mock_async = mock.MagicMock() mock_async = mock.MagicMock()
@@ -91,7 +92,11 @@ class TestIsolatedRuns:
with mock.patch.object(job, '_get_task_class') as task_class: with mock.patch.object(job, '_get_task_class') as task_class:
task_class.return_value = MockTaskClass task_class.return_value = MockTaskClass
job.start_celery_task([], error_callback, success_callback, 'thepentagon') job.start_celery_task([], error_callback, success_callback, 'thepentagon')
mock_async.assert_called_with([job.id, 'iso2'], [], link_error=error_callback, link=success_callback, queue='thepentagon') mock_async.assert_called_with([job.id, 'iso2'], [],
link_error=error_callback,
link=success_callback,
queue='thepentagon',
task_id='something')
i2.capacity = 20 i2.capacity = 20
i2.save() i2.save()
@@ -99,4 +104,8 @@ class TestIsolatedRuns:
with mock.patch.object(job, '_get_task_class') as task_class: with mock.patch.object(job, '_get_task_class') as task_class:
task_class.return_value = MockTaskClass task_class.return_value = MockTaskClass
job.start_celery_task([], error_callback, success_callback, 'thepentagon') job.start_celery_task([], error_callback, success_callback, 'thepentagon')
mock_async.assert_called_with([job.id, 'iso1'], [], link_error=error_callback, link=success_callback, queue='thepentagon') mock_async.assert_called_with([job.id, 'iso1'], [],
link_error=error_callback,
link=success_callback,
queue='thepentagon',
task_id='something')

View File

@@ -3,8 +3,52 @@ import mock
from datetime import timedelta, datetime from datetime import timedelta, datetime
from django.core.cache import cache from django.core.cache import cache
from django.utils.timezone import now as tz_now
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager
from awx.main.models import (
Job,
)
@pytest.fixture
def all_jobs(mocker):
now = tz_now()
j1 = Job.objects.create(status='pending')
j2 = Job.objects.create(status='waiting', celery_task_id='considered_j2')
j3 = Job.objects.create(status='waiting', celery_task_id='considered_j3')
j3.modified = now - timedelta(seconds=60)
j3.save(update_fields=['modified'])
j4 = Job.objects.create(status='running', celery_task_id='considered_j4')
j5 = Job.objects.create(status='waiting', celery_task_id='reapable_j5')
j5.modified = now - timedelta(seconds=60)
j5.save(update_fields=['modified'])
js = [j1, j2, j3, j4, j5]
for j in js:
j.save = mocker.Mock(wraps=j.save)
j.websocket_emit_status = mocker.Mock()
return js
@pytest.fixture
def considered_jobs(all_jobs):
return all_jobs[2:4] + [all_jobs[4]]
@pytest.fixture
def reapable_jobs(all_jobs):
return [all_jobs[4]]
@pytest.fixture
def unconsidered_jobs(all_jobs):
return all_jobs[0:1]
@pytest.fixture
def active_tasks():
return ([], ['considered_j2', 'considered_j3', 'considered_j4',])
@pytest.mark.django_db @pytest.mark.django_db
@@ -217,15 +261,38 @@ def test_cleanup_interval():
@pytest.mark.django_db @pytest.mark.django_db
@mock.patch('awx.main.tasks._send_notification_templates') @mock.patch('awx.main.tasks._send_notification_templates')
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []]) @mock.patch.object(TaskManager, 'get_active_tasks', lambda self: [[], []])
@mock.patch.object(TaskManager, 'get_running_tasks') def test_cleanup_inconsistent_task(notify, active_tasks, considered_jobs, reapable_jobs, mocker):
def test_cleanup_inconsistent_task(get_running_tasks, notify): tm = TaskManager()
orphaned_task = mock.Mock(job_explanation='')
get_running_tasks.return_value = [orphaned_task]
TaskManager().cleanup_inconsistent_celery_tasks()
notify.assert_called_once_with(orphaned_task, 'failed') tm.get_running_tasks = mocker.Mock(return_value=considered_jobs)
orphaned_task.websocket_emit_status.assert_called_once_with('failed') tm.get_active_tasks = mocker.Mock(return_value=active_tasks)
assert orphaned_task.status == 'failed'
assert orphaned_task.job_explanation == ( tm.cleanup_inconsistent_celery_tasks()
for j in considered_jobs:
if j not in reapable_jobs:
j.save.assert_not_called()
for reaped_job in reapable_jobs:
notify.assert_called_once_with(reaped_job, 'failed')
reaped_job.websocket_emit_status.assert_called_once_with('failed')
assert reaped_job.status == 'failed'
assert reaped_job.job_explanation == (
'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.' 'Task was marked as running in Tower but was not present in Celery, so it has been marked as failed.'
) )
@pytest.mark.django_db
def test_get_running_tasks(considered_jobs, reapable_jobs, unconsidered_jobs):
tm = TaskManager()
# Ensure the query grabs the expected jobs
rt = tm.get_running_tasks()
for j in considered_jobs:
assert j in rt
for j in reapable_jobs:
assert j in rt
for j in unconsidered_jobs:
assert j in unconsidered_jobs

View File

@@ -257,7 +257,7 @@ class TestGenericRun(TestJobExecution):
with pytest.raises(Exception): with pytest.raises(Exception):
self.task.run(self.pk) self.task.run(self.pk)
for c in [ for c in [
mock.call(self.pk, celery_task_id='', status='running'), mock.call(self.pk, status='running'),
mock.call(self.pk, output_replacements=[], result_traceback=mock.ANY, status='canceled') mock.call(self.pk, output_replacements=[], result_traceback=mock.ANY, status='canceled')
]: ]:
assert c in self.task.update_model.call_args_list assert c in self.task.update_model.call_args_list