Merge remote-tracking branch 'origin/celery-refactor'

* origin/celery-refactor: (33 commits)
  Fix spelling mistake
  Jobs begin in the waiting state
  Don't raise an exception at the end of a task if we are running unit tests
  Enable canceling on certain levels.... fix up some unit tests
  Check for invalid tasks and mark created tasks as failed when constructing task chains
  Fix merge spacing from conflicted merge
  Fix some bugs and show more error detail on a current task when a previous task fails
  Leave another TODO on the job runner, fix a misspelling on the project update hook
  Initial work towards the celery refactor... adjusting logic to allow building a worker chain... temporarily relax requirements on status checks
  Remove 'awx-' prefix when reporting version
  Explicitly include the egg-info in rpm packaging
  Fixing store references to point to http://www.ansible.com/ansible-pricing
  Update make develop command, version check after package rename.
  AC-992 Fix inventory import tests.
  AC-967 Fixed job name in the host status fly-out. Text now wraps rather than extending beyond boundaries of pop-over. Removed status label- just show red/green dot. Clicking on ID or red/green dot shows the status dialog. When dialog closes the window.resize() watcher is restored.
  AC-976 now using custom javascript to apply ellipsis to long group and host names. Fixed indes.html title.
  Fix ansible-tower deb build
  Rename package awx to ansible-tower
  AC-976 add job template name to activity stream
  AC-979 using activity stream object arrays correctly. AC-980 fixed Action label on detail dialog.
  ...
This commit is contained in:
Matthew Jones 2014-01-30 11:59:17 -05:00
commit a85e109da7
12 changed files with 176 additions and 161 deletions

View File

@ -372,7 +372,9 @@ class CommonTask(PrimordialModel):
def _get_passwords_needed_to_start(self):
return []
def start(self, **kwargs):
def start_signature(self, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class()
if not self.can_start:
return False
@ -383,7 +385,13 @@ class CommonTask(PrimordialModel):
self.status = 'pending'
self.save(update_fields=['status'])
transaction.commit()
task_result = task_class().delay(self.pk, **opts)
task_actual = task_class().si(self.pk, **opts)
return task_actual
def start(self, **kwargs):
task_actual = self.start_signature(**kwargs)
# TODO: Callback for status
task_result = task_actual.delay()
# Reload instance from database so we don't clobber results from task
# (mainly from tests when using Django 1.4.x).
instance = self.__class__.objects.get(pk=self.pk)

View File

@ -686,6 +686,12 @@ class InventorySource(PrimordialModel):
# FIXME: Prevent update when another one is active!
return bool(self.source)
def update_signature(self, **kwargs):
if self.can_update:
inventory_update = self.inventory_updates.create()
inventory_update_sig = inventory_update.start_signature()
return (inventory_update, inventory_update_sig)
def update(self, **kwargs):
if self.can_update:
inventory_update = self.inventory_updates.create()

View File

@ -18,6 +18,7 @@ import yaml
# Django
from django.conf import settings
from django.db import models
from django.db import transaction
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.urlresolvers import reverse
@ -30,6 +31,11 @@ from jsonfield import JSONField
# AWX
from awx.main.models.base import *
# Celery
from celery import chain
logger = logging.getLogger('awx.main.models.jobs')
__all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent']
@ -328,6 +334,65 @@ class Job(CommonTask):
def processed_hosts(self):
return self._get_hosts(job_host_summaries__processed__gt=0)
def start(self, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()):
return False
self.status = 'waiting'
self.save(update_fields=['status'])
transaction.commit()
runnable_tasks = []
run_tasks = []
inventory_updates_actual = []
project_update_actual = None
has_setup_failures = False
setup_failure_message = ""
project = self.project
inventory = self.inventory
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
if project.scm_update_on_launch:
project_update_details = project.update_signature()
if not project_update_details:
has_setup_failures = True
setup_failure_message = "Failed to check dependent project update task"
else:
runnable_tasks.append({'obj': project_update_details[0],
'sig': project_update_details[1],
'type': 'project_update'})
if is_qs.count() and not has_setup_failures:
for inventory_source in is_qs:
inventory_update_details = inventory_source.update_signature()
if not inventory_update_details:
has_setup_failures = True
setup_failure_message = "Failed to check dependent inventory update task"
break
else:
runnable_tasks.append({'obj': inventory_update_details[0],
'sig': inventory_update_details[1],
'type': 'inventory_update'})
if has_setup_failures:
for each_task in runnable_tasks:
obj = each_task['obj']
obj.status = 'error'
obj.result_traceback = setup_failure_message
obj.save()
self.status = 'error'
self.result_traceback = setup_failure_message
self.save()
thisjob = {'type': 'job', 'id': self.id}
for idx in xrange(len(runnable_tasks)):
dependent_tasks = [{'type': r['type'], 'id': r['obj'].id} for r in runnable_tasks[idx:]] + [thisjob]
run_tasks.append(runnable_tasks[idx]['sig'].set(link_error=handle_work_error.s(subtasks=dependent_tasks)))
run_tasks.append(task_class().si(self.pk, **opts).set(link_error=handle_work_error.s(subtasks=[thisjob])))
res = chain(run_tasks)()
return True
class JobHostSummary(BaseModel):
'''

View File

@ -282,6 +282,12 @@ class Project(CommonModel):
# FIXME: Prevent update when another one is active!
return bool(self.scm_type)# and not self.current_update)
def update_signature(self, **kwargs):
if self.can_update:
project_update = self.project_updates.create()
project_update_sig = project_update.start_signature()
return (project_update, project_update_sig)
def update(self, **kwargs):
if self.can_update:
project_update = self.project_updates.create()

View File

@ -23,7 +23,7 @@ import uuid
import pexpect
# Celery
from celery import Task
from celery import Task, task
# Django
from django.conf import settings
@ -35,12 +35,44 @@ from django.utils.timezone import now
from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport']
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
logger = logging.getLogger('awx.main.tasks')
# FIXME: Cleanly cancel task when celery worker is stopped.
@task(bind=True)
def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks)))
first_task = None
first_task_type = ''
first_task_name = ''
if subtasks is not None:
for each_task in subtasks:
instance_name = ''
if each_task['type'] == 'project_update':
instance = ProjectUpdate.objects.get(id=each_task['id'])
instance_name = instance.project.name
elif each_task['type'] == 'inventory_update':
instance = InventoryUpdate.objects.get(id=each_task['id'])
instance_name = instance.inventory_source.inventory.name
elif each_task['type'] == 'job':
instance = Job.objects.get(id=each_task['id'])
instance_name = instance.job_template.name
else:
# Unknown task type
break
if first_task is None:
first_task = instance
first_task_type = each_task['type']
first_task_name = instance_name
if instance.celery_task_id != task_id:
instance.status = 'failed'
instance.failed = True
instance.result_traceback = "Previous Task Failed: %s for %s with celery task id: %s" % \
(first_task_type, first_task_name, task_id)
instance.save()
class BaseTask(Task):
name = None
@ -108,6 +140,7 @@ class BaseTask(Task):
'yes': 'yes',
'no': 'no',
'': '',
}
def build_env(self, instance, **kwargs):
@ -204,15 +237,10 @@ class BaseTask(Task):
if logfile_pos != logfile.tell():
logfile_pos = logfile.tell()
last_stdout_update = time.time()
# Update instance status here (also updates modified timestamp, so
# we have a way to know the task is still running, otherwise the
# post_run_hook below would cancel long-running tasks that are
# really still active).
instance = self.update_model(instance.pk, status='running')
# NOTE: In case revoke doesn't have an affect
if instance.cancel_flag:
child.close(True)
canceled = True
# FIXME: Find a way to determine if task is hung waiting at a prompt.
child.close(True)
canceled = True
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True)
canceled = True
@ -234,40 +262,14 @@ class BaseTask(Task):
return True
def post_run_hook(self, instance, **kwargs):
'''
Hook for actions to run after job/task has completed.
'''
# Cleanup instances that appear to be stuck.
try:
stuck_task_timeout = int(getattr(settings, 'STUCK_TASK_TIMEOUT', 300))
except (TypeError, ValueError):
stuck_task_timeout = 0
if stuck_task_timeout <= 0:
return
# Never less than 30 seconds so we're not messing with active tasks.
stuck_task_timeout = max(stuck_task_timeout, 30)
cutoff = now() - datetime.timedelta(seconds=stuck_task_timeout)
qs = self.model.objects.filter(status__in=('new', 'waiting', 'running'))
qs = qs.filter(modified__lt=cutoff)
for obj in qs:
# If new, created but never started. If waiting or running, the
# modified timestamp should updated regularly, else the task is
# probably stuck.
# If pending, we could be started but celeryd is not running, or
# we're waiting for an open slot in celeryd -- in either case we
# shouldn't necessarily cancel the task. Slim chance that somehow
# the task was started, picked up by celery, but hit an error
# before we could update the status.
obj.status = 'canceled'
obj.result_traceback += '\nCanceled stuck %s.' % unicode(self.model._meta.verbose_name)
obj.save(update_fields=['status', 'result_traceback'])
pass
@transaction.commit_on_success
def run(self, pk, **kwargs):
'''
Run the job/task and capture its output.
'''
instance = self.update_model(pk)
instance = self.update_model(pk, status='pending', celery_task_id=self.request.id)
status, stdout, tb = 'error', '', ''
output_replacements = []
try:
@ -305,6 +307,10 @@ class BaseTask(Task):
result_traceback=tb,
output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk)))
class RunJob(BaseTask):
'''
@ -431,6 +437,9 @@ class RunJob(BaseTask):
(job.project.local_path, root))
return cwd
def get_idle_timeout(self):
return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', 300)
def get_password_prompts(self):
d = super(RunJob, self).get_password_prompts()
d[re.compile(r'^Enter passphrase for .*:\s*?$', re.M)] = 'ssh_key_unlock'
@ -444,98 +453,14 @@ class RunJob(BaseTask):
'''
Hook for checking job before running.
'''
project_update = None
inventory_updates = None
while True:
pk = job.pk
if job.status in ('pending', 'waiting'):
project = job.project
pu_qs = project.project_updates.filter(status__in=('pending', 'running'))
inventory = job.inventory
base_iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory)
iu_qs = base_iu_qs.filter(status__in=('pending', 'running'))
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
# Refresh the current project_update instance (if set).
if project_update:
try:
project_update = project.project_updates.filter(pk=project_update.pk)[0]
except IndexError:
msg = 'Unable to check project update.'
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Refresh the current inventory_update instance(s) (if set).
if inventory_updates:
inventory_update_pks = [x.pk for x in inventory_updates]
inventory_updates = list(base_iu_qs.filter(pk__in=inventory_update_pks))
# If the job needs to update the project first (and there is no
# specific project update defined).
if not project_update and project.scm_update_on_launch:
job = self.update_model(pk, status='waiting')
try:
project_update = pu_qs[0]
except IndexError:
project_update = project.update()
if not project_update:
msg = 'Unable to update project before launch.'
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
#print 'job %d waiting on project update %d' % (pk, project_update.pk)
time.sleep(2.0)
# If the job needs to update any inventory first (and there are
# no current inventory updates pending).
elif inventory_updates is None and is_qs.count():
job = self.update_model(pk, status='waiting')
inventory_updates = []
msgs = []
for inventory_source in is_qs:
try:
inventory_update = iu_qs.filter(inventory_source=inventory_source)[0]
except IndexError:
inventory_update = inventory_source.update()
if not inventory_update:
msgs.append('Unable to update inventory source %d before launch' % inventory_source.pk)
continue
inventory_updates.append(inventory_update)
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
time.sleep(2.0)
# If project update has failed, abort the job.
elif project_update and project_update.failed:
msg = 'Project update %d failed with status = %s.' % (project_update.pk, project_update.status)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# If any inventory update has failed, abort the job.
elif inventory_updates and any([x.failed for x in inventory_updates]):
msgs = []
for inventory_update in inventory_updates:
if inventory_update.failed:
msgs.append('Inventory update %d failed with status = %s.' % (inventory_update.pk, inventory_update.status))
if msgs:
msg = '\n'.join(msgs)
job = self.update_model(pk, status='error',
result_traceback=msg)
return False
# Check if blocked by any other active project or inventory updates.
elif pu_qs.count() or iu_qs.count():
#print 'job %d waiting on' % pk, pu_qs
job = self.update_model(pk, status='waiting')
time.sleep(4.0)
# Otherwise continue running the job.
else:
job = self.update_model(pk, status='pending')
return True
elif job.cancel_flag:
job = self.update_model(pk, status='canceled')
return False
else:
return False
if job.status in ('pending', 'waiting'):
job = self.update_model(job.pk, status='pending')
return True
elif job.cancel_flag:
job = self.update_model(job.pk, status='canceled')
return False
else:
return False
def post_run_hook(self, job, **kwargs):
'''
@ -882,6 +807,9 @@ class RunInventoryUpdate(BaseTask):
def build_cwd(self, inventory_update, **kwargs):
return self.get_path_to('..', 'plugins', 'inventory')
def get_idle_timeout(self):
return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', 300)
def pre_run_check(self, inventory_update, **kwargs):
'''
Hook for checking inventory update before running.

View File

@ -52,6 +52,7 @@ class BaseTestMixin(object):
# callbacks.
if settings.BROKER_URL.startswith('amqp://'):
settings.BROKER_URL = 'django://'
settings.CELERY_UNIT_TEST = True
# Make temp job status directory for unit tests.
job_status_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(job_status_dir)

View File

@ -377,7 +377,7 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'successful')
# With days=1, no jobs will be deleted.

View File

@ -1551,7 +1551,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.assertTrue(job.status in ('successful', 'failed'))
self.assertEqual(self.project.project_updates.count(), 3)

View File

@ -404,7 +404,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.check_job_events(job, 'ok', 1, 2)
@ -433,7 +433,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.check_job_events(job, 'skipped', 1, 2)
@ -461,7 +461,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.check_job_events(job, 'failed', 1, 1)
@ -489,7 +489,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.check_job_events(job, 'ok', 1, 1, check_ignore_errors=True)
@ -612,7 +612,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
# Since we don't actually run the task, the --check should indicate
# everything is successful.
@ -653,7 +653,7 @@ class RunJobTest(BaseCeleryTest):
self.assertFalse(job.passwords_needed_to_start)
self.build_args_callback = self._cancel_job_callback
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'canceled')
self.assertEqual(job.cancel_flag, True)
@ -676,7 +676,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('--forks=3' in self.run_job_args)
@ -687,7 +687,7 @@ class RunJobTest(BaseCeleryTest):
job2 = self.create_test_job(job_template=job_template2)
self.assertEqual(job2.status, 'new')
self.assertTrue(job2.start())
self.assertEqual(job2.status, 'pending')
self.assertEqual(job2.status, 'waiting')
job2 = Job.objects.get(pk=job2.pk)
self.check_job_result(job2, 'successful')
# Test with extra_vars as YAML (should be converted to JSON in args).
@ -695,7 +695,7 @@ class RunJobTest(BaseCeleryTest):
job3 = self.create_test_job(job_template=job_template3)
self.assertEqual(job3.status, 'new')
self.assertTrue(job3.start())
self.assertEqual(job3.status, 'pending')
self.assertEqual(job3.status, 'waiting')
job3 = Job.objects.get(pk=job3.pk)
self.check_job_result(job3, 'successful')
@ -707,7 +707,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.assertTrue(len(job.job_args) > 1024)
self.check_job_result(job, 'successful')
@ -720,7 +720,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.assertTrue('-l' in self.run_job_args)
@ -733,7 +733,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
@ -746,7 +746,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('-u' in self.run_job_args)
@ -763,7 +763,7 @@ class RunJobTest(BaseCeleryTest):
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(ssh_password='sshpass'))
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('--ask-pass' in self.run_job_args)
@ -777,7 +777,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
# Job may fail if current user doesn't have password-less sudo
# privileges, but we're mainly checking the command line arguments.
@ -796,7 +796,7 @@ class RunJobTest(BaseCeleryTest):
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(sudo_password='sudopass'))
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
# Job may fail if current user doesn't have password-less sudo
# privileges, but we're mainly checking the command line arguments.
@ -811,7 +811,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
@ -825,7 +825,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
@ -840,7 +840,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.assertTrue('ssh-agent' in self.run_job_args)
@ -858,7 +858,7 @@ class RunJobTest(BaseCeleryTest):
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK))
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
@ -882,7 +882,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue(env_var1 in job.job_env)
@ -901,7 +901,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.check_job_events(job, 'ok', 1, 1, async=True)
@ -929,7 +929,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.check_job_events(job, 'failed', 1, 1, async=True)
@ -957,7 +957,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.check_job_events(job, 'failed', 1, 1, async=True,
@ -986,7 +986,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
self.assertEqual(job.status, 'waiting')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.check_job_events(job, 'ok', 1, 1, async=True, async_nowait=True)

View File

@ -282,6 +282,7 @@ CELERYD_TASK_TIME_LIMIT = None
CELERYD_TASK_SOFT_TIME_LIMIT = None
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_MAX_LOOP_INTERVAL = 60
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# Any ANSIBLE_* settings will be passed to the subprocess environment by the
# celery task.

View File

@ -118,4 +118,4 @@ angular.module('InventoryHostsDefinition', [])
}
});

View File

@ -459,4 +459,4 @@ angular.module('StreamWidget', ['RestServices', 'Utilities', 'StreamListDefiniti
}
}]);