diff --git a/awx/main/models/base.py b/awx/main/models/base.py index 0c3de0af7e..7baf81e93b 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -372,7 +372,9 @@ class CommonTask(PrimordialModel): def _get_passwords_needed_to_start(self): return [] - def start(self, **kwargs): + def start_signature(self, **kwargs): + from awx.main.tasks import handle_work_error + task_class = self._get_task_class() if not self.can_start: return False @@ -383,7 +385,13 @@ class CommonTask(PrimordialModel): self.status = 'pending' self.save(update_fields=['status']) transaction.commit() - task_result = task_class().delay(self.pk, **opts) + task_actual = task_class().si(self.pk, **opts) + return task_actual + + def start(self, **kwargs): + task_actual = self.start_signature(**kwargs) + # TODO: Callback for status + task_result = task_actual.delay() # Reload instance from database so we don't clobber results from task # (mainly from tests when using Django 1.4.x). instance = self.__class__.objects.get(pk=self.pk) diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 87550cadc8..acd969b2e9 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -686,6 +686,12 @@ class InventorySource(PrimordialModel): # FIXME: Prevent update when another one is active! return bool(self.source) + def update_signature(self, **kwargs): + if self.can_update: + inventory_update = self.inventory_updates.create() + inventory_update_sig = inventory_update.start_signature() + return (inventory_update, inventory_update_sig) + def update(self, **kwargs): if self.can_update: inventory_update = self.inventory_updates.create() diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index d22099c676..6e53675363 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -18,6 +18,7 @@ import yaml # Django from django.conf import settings from django.db import models +from django.db import transaction from django.utils.translation import ugettext_lazy as _ from django.core.exceptions import ValidationError, NON_FIELD_ERRORS from django.core.urlresolvers import reverse @@ -30,6 +31,11 @@ from jsonfield import JSONField # AWX from awx.main.models.base import * +# Celery +from celery import chain + +logger = logging.getLogger('awx.main.models.jobs') + __all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent'] @@ -328,6 +334,65 @@ class Job(CommonTask): def processed_hosts(self): return self._get_hosts(job_host_summaries__processed__gt=0) + def start(self, **kwargs): + from awx.main.tasks import handle_work_error + task_class = self._get_task_class() + if not self.can_start: + return False + needed = self._get_passwords_needed_to_start() + opts = dict([(field, kwargs.get(field, '')) for field in needed]) + if not all(opts.values()): + return False + self.status = 'waiting' + self.save(update_fields=['status']) + transaction.commit() + + runnable_tasks = [] + run_tasks = [] + inventory_updates_actual = [] + project_update_actual = None + has_setup_failures = False + setup_failure_message = "" + + project = self.project + inventory = self.inventory + is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True) + if project.scm_update_on_launch: + project_update_details = project.update_signature() + if not project_update_details: + has_setup_failures = True + setup_failure_message = "Failed to check dependent project update task" + else: + runnable_tasks.append({'obj': project_update_details[0], + 'sig': project_update_details[1], + 'type': 'project_update'}) + if is_qs.count() and not has_setup_failures: + for inventory_source in is_qs: + inventory_update_details = inventory_source.update_signature() + if not inventory_update_details: + has_setup_failures = True + setup_failure_message = "Failed to check dependent inventory update task" + break + else: + runnable_tasks.append({'obj': inventory_update_details[0], + 'sig': inventory_update_details[1], + 'type': 'inventory_update'}) + if has_setup_failures: + for each_task in runnable_tasks: + obj = each_task['obj'] + obj.status = 'error' + obj.result_traceback = setup_failure_message + obj.save() + self.status = 'error' + self.result_traceback = setup_failure_message + self.save() + thisjob = {'type': 'job', 'id': self.id} + for idx in xrange(len(runnable_tasks)): + dependent_tasks = [{'type': r['type'], 'id': r['obj'].id} for r in runnable_tasks[idx:]] + [thisjob] + run_tasks.append(runnable_tasks[idx]['sig'].set(link_error=handle_work_error.s(subtasks=dependent_tasks))) + run_tasks.append(task_class().si(self.pk, **opts).set(link_error=handle_work_error.s(subtasks=[thisjob]))) + res = chain(run_tasks)() + return True class JobHostSummary(BaseModel): ''' diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7a531a9d10..4f36f00405 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -282,6 +282,12 @@ class Project(CommonModel): # FIXME: Prevent update when another one is active! return bool(self.scm_type)# and not self.current_update) + def update_signature(self, **kwargs): + if self.can_update: + project_update = self.project_updates.create() + project_update_sig = project_update.start_signature() + return (project_update, project_update_sig) + def update(self, **kwargs): if self.can_update: project_update = self.project_updates.create() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7b2702cea3..99d246010d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -23,7 +23,7 @@ import uuid import pexpect # Celery -from celery import Task +from celery import Task, task # Django from django.conf import settings @@ -35,12 +35,44 @@ from django.utils.timezone import now from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url -__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport'] +__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] logger = logging.getLogger('awx.main.tasks') # FIXME: Cleanly cancel task when celery worker is stopped. +@task(bind=True) +def handle_work_error(self, task_id, subtasks=None): + print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) + first_task = None + first_task_type = '' + first_task_name = '' + if subtasks is not None: + for each_task in subtasks: + instance_name = '' + if each_task['type'] == 'project_update': + instance = ProjectUpdate.objects.get(id=each_task['id']) + instance_name = instance.project.name + elif each_task['type'] == 'inventory_update': + instance = InventoryUpdate.objects.get(id=each_task['id']) + instance_name = instance.inventory_source.inventory.name + elif each_task['type'] == 'job': + instance = Job.objects.get(id=each_task['id']) + instance_name = instance.job_template.name + else: + # Unknown task type + break + if first_task is None: + first_task = instance + first_task_type = each_task['type'] + first_task_name = instance_name + if instance.celery_task_id != task_id: + instance.status = 'failed' + instance.failed = True + instance.result_traceback = "Previous Task Failed: %s for %s with celery task id: %s" % \ + (first_task_type, first_task_name, task_id) + instance.save() + class BaseTask(Task): name = None @@ -108,6 +140,7 @@ class BaseTask(Task): 'yes': 'yes', 'no': 'no', '': '', + } def build_env(self, instance, **kwargs): @@ -204,15 +237,10 @@ class BaseTask(Task): if logfile_pos != logfile.tell(): logfile_pos = logfile.tell() last_stdout_update = time.time() - # Update instance status here (also updates modified timestamp, so - # we have a way to know the task is still running, otherwise the - # post_run_hook below would cancel long-running tasks that are - # really still active). - instance = self.update_model(instance.pk, status='running') + # NOTE: In case revoke doesn't have an affect if instance.cancel_flag: - child.close(True) - canceled = True - # FIXME: Find a way to determine if task is hung waiting at a prompt. + child.close(True) + canceled = True if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) canceled = True @@ -234,40 +262,14 @@ class BaseTask(Task): return True def post_run_hook(self, instance, **kwargs): - ''' - Hook for actions to run after job/task has completed. - ''' - # Cleanup instances that appear to be stuck. - try: - stuck_task_timeout = int(getattr(settings, 'STUCK_TASK_TIMEOUT', 300)) - except (TypeError, ValueError): - stuck_task_timeout = 0 - if stuck_task_timeout <= 0: - return - # Never less than 30 seconds so we're not messing with active tasks. - stuck_task_timeout = max(stuck_task_timeout, 30) - cutoff = now() - datetime.timedelta(seconds=stuck_task_timeout) - qs = self.model.objects.filter(status__in=('new', 'waiting', 'running')) - qs = qs.filter(modified__lt=cutoff) - for obj in qs: - # If new, created but never started. If waiting or running, the - # modified timestamp should updated regularly, else the task is - # probably stuck. - # If pending, we could be started but celeryd is not running, or - # we're waiting for an open slot in celeryd -- in either case we - # shouldn't necessarily cancel the task. Slim chance that somehow - # the task was started, picked up by celery, but hit an error - # before we could update the status. - obj.status = 'canceled' - obj.result_traceback += '\nCanceled stuck %s.' % unicode(self.model._meta.verbose_name) - obj.save(update_fields=['status', 'result_traceback']) + pass @transaction.commit_on_success def run(self, pk, **kwargs): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk) + instance = self.update_model(pk, status='pending', celery_task_id=self.request.id) status, stdout, tb = 'error', '', '' output_replacements = [] try: @@ -305,6 +307,10 @@ class BaseTask(Task): result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) + if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): + # Raising an exception will mark the job as 'failed' in celery + # and will stop a task chain from continuing to execute + raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk))) class RunJob(BaseTask): ''' @@ -431,6 +437,9 @@ class RunJob(BaseTask): (job.project.local_path, root)) return cwd + def get_idle_timeout(self): + return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', 300) + def get_password_prompts(self): d = super(RunJob, self).get_password_prompts() d[re.compile(r'^Enter passphrase for .*:\s*?$', re.M)] = 'ssh_key_unlock' @@ -444,98 +453,14 @@ class RunJob(BaseTask): ''' Hook for checking job before running. ''' - project_update = None - inventory_updates = None - while True: - pk = job.pk - if job.status in ('pending', 'waiting'): - project = job.project - pu_qs = project.project_updates.filter(status__in=('pending', 'running')) - inventory = job.inventory - base_iu_qs = InventoryUpdate.objects.filter(inventory_source__inventory=inventory) - iu_qs = base_iu_qs.filter(status__in=('pending', 'running')) - is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True) - # Refresh the current project_update instance (if set). - if project_update: - try: - project_update = project.project_updates.filter(pk=project_update.pk)[0] - except IndexError: - msg = 'Unable to check project update.' - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # Refresh the current inventory_update instance(s) (if set). - if inventory_updates: - inventory_update_pks = [x.pk for x in inventory_updates] - inventory_updates = list(base_iu_qs.filter(pk__in=inventory_update_pks)) - - # If the job needs to update the project first (and there is no - # specific project update defined). - if not project_update and project.scm_update_on_launch: - job = self.update_model(pk, status='waiting') - try: - project_update = pu_qs[0] - except IndexError: - project_update = project.update() - if not project_update: - msg = 'Unable to update project before launch.' - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - #print 'job %d waiting on project update %d' % (pk, project_update.pk) - time.sleep(2.0) - # If the job needs to update any inventory first (and there are - # no current inventory updates pending). - elif inventory_updates is None and is_qs.count(): - job = self.update_model(pk, status='waiting') - inventory_updates = [] - msgs = [] - for inventory_source in is_qs: - try: - inventory_update = iu_qs.filter(inventory_source=inventory_source)[0] - except IndexError: - inventory_update = inventory_source.update() - if not inventory_update: - msgs.append('Unable to update inventory source %d before launch' % inventory_source.pk) - continue - inventory_updates.append(inventory_update) - if msgs: - msg = '\n'.join(msgs) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - time.sleep(2.0) - # If project update has failed, abort the job. - elif project_update and project_update.failed: - msg = 'Project update %d failed with status = %s.' % (project_update.pk, project_update.status) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # If any inventory update has failed, abort the job. - elif inventory_updates and any([x.failed for x in inventory_updates]): - msgs = [] - for inventory_update in inventory_updates: - if inventory_update.failed: - msgs.append('Inventory update %d failed with status = %s.' % (inventory_update.pk, inventory_update.status)) - if msgs: - msg = '\n'.join(msgs) - job = self.update_model(pk, status='error', - result_traceback=msg) - return False - # Check if blocked by any other active project or inventory updates. - elif pu_qs.count() or iu_qs.count(): - #print 'job %d waiting on' % pk, pu_qs - job = self.update_model(pk, status='waiting') - time.sleep(4.0) - # Otherwise continue running the job. - else: - job = self.update_model(pk, status='pending') - return True - elif job.cancel_flag: - job = self.update_model(pk, status='canceled') - return False - else: - return False + if job.status in ('pending', 'waiting'): + job = self.update_model(job.pk, status='pending') + return True + elif job.cancel_flag: + job = self.update_model(job.pk, status='canceled') + return False + else: + return False def post_run_hook(self, job, **kwargs): ''' @@ -882,6 +807,9 @@ class RunInventoryUpdate(BaseTask): def build_cwd(self, inventory_update, **kwargs): return self.get_path_to('..', 'plugins', 'inventory') + def get_idle_timeout(self): + return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', 300) + def pre_run_check(self, inventory_update, **kwargs): ''' Hook for checking inventory update before running. diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index fc96dc6366..23f851d6e9 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -52,6 +52,7 @@ class BaseTestMixin(object): # callbacks. if settings.BROKER_URL.startswith('amqp://'): settings.BROKER_URL = 'django://' + settings.CELERY_UNIT_TEST = True # Make temp job status directory for unit tests. job_status_dir = tempfile.mkdtemp() self._temp_project_dirs.append(job_status_dir) diff --git a/awx/main/tests/commands.py b/awx/main/tests/commands.py index 05dd37c665..b25d6a04d1 100644 --- a/awx/main/tests/commands.py +++ b/awx/main/tests/commands.py @@ -377,7 +377,7 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.assertEqual(job.status, 'successful') # With days=1, no jobs will be deleted. diff --git a/awx/main/tests/projects.py b/awx/main/tests/projects.py index 199be21531..19dd4777a4 100644 --- a/awx/main/tests/projects.py +++ b/awx/main/tests/projects.py @@ -1551,7 +1551,7 @@ class ProjectUpdatesTest(BaseTransactionTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.assertTrue(job.status in ('successful', 'failed')) self.assertEqual(self.project.project_updates.count(), 3) diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index b781ab5fc3..63936c23d1 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -404,7 +404,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.check_job_events(job, 'ok', 1, 2) @@ -433,7 +433,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.check_job_events(job, 'skipped', 1, 2) @@ -461,7 +461,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'failed') self.check_job_events(job, 'failed', 1, 1) @@ -489,7 +489,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.check_job_events(job, 'ok', 1, 1, check_ignore_errors=True) @@ -612,7 +612,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) # Since we don't actually run the task, the --check should indicate # everything is successful. @@ -653,7 +653,7 @@ class RunJobTest(BaseCeleryTest): self.assertFalse(job.passwords_needed_to_start) self.build_args_callback = self._cancel_job_callback self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'canceled') self.assertEqual(job.cancel_flag, True) @@ -676,7 +676,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('--forks=3' in self.run_job_args) @@ -687,7 +687,7 @@ class RunJobTest(BaseCeleryTest): job2 = self.create_test_job(job_template=job_template2) self.assertEqual(job2.status, 'new') self.assertTrue(job2.start()) - self.assertEqual(job2.status, 'pending') + self.assertEqual(job2.status, 'waiting') job2 = Job.objects.get(pk=job2.pk) self.check_job_result(job2, 'successful') # Test with extra_vars as YAML (should be converted to JSON in args). @@ -695,7 +695,7 @@ class RunJobTest(BaseCeleryTest): job3 = self.create_test_job(job_template=job_template3) self.assertEqual(job3.status, 'new') self.assertTrue(job3.start()) - self.assertEqual(job3.status, 'pending') + self.assertEqual(job3.status, 'waiting') job3 = Job.objects.get(pk=job3.pk) self.check_job_result(job3, 'successful') @@ -707,7 +707,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.assertTrue(len(job.job_args) > 1024) self.check_job_result(job, 'successful') @@ -720,7 +720,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'failed') self.assertTrue('-l' in self.run_job_args) @@ -733,7 +733,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('ssh-agent' in self.run_job_args) @@ -746,7 +746,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('-u' in self.run_job_args) @@ -763,7 +763,7 @@ class RunJobTest(BaseCeleryTest): self.assertFalse(job.start()) self.assertEqual(job.status, 'new') self.assertTrue(job.start(ssh_password='sshpass')) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('--ask-pass' in self.run_job_args) @@ -777,7 +777,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) # Job may fail if current user doesn't have password-less sudo # privileges, but we're mainly checking the command line arguments. @@ -796,7 +796,7 @@ class RunJobTest(BaseCeleryTest): self.assertFalse(job.start()) self.assertEqual(job.status, 'new') self.assertTrue(job.start(sudo_password='sudopass')) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) # Job may fail if current user doesn't have password-less sudo # privileges, but we're mainly checking the command line arguments. @@ -811,7 +811,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('ssh-agent' in self.run_job_args) @@ -825,7 +825,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('ssh-agent' in self.run_job_args) @@ -840,7 +840,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'failed') self.assertTrue('ssh-agent' in self.run_job_args) @@ -858,7 +858,7 @@ class RunJobTest(BaseCeleryTest): self.assertFalse(job.start()) self.assertEqual(job.status, 'new') self.assertTrue(job.start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK)) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue('ssh-agent' in self.run_job_args) @@ -882,7 +882,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.assertTrue(env_var1 in job.job_env) @@ -901,7 +901,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.check_job_events(job, 'ok', 1, 1, async=True) @@ -929,7 +929,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'failed') self.check_job_events(job, 'failed', 1, 1, async=True) @@ -957,7 +957,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'failed') self.check_job_events(job, 'failed', 1, 1, async=True, @@ -986,7 +986,7 @@ class RunJobTest(BaseCeleryTest): self.assertEqual(job.status, 'new') self.assertFalse(job.passwords_needed_to_start) self.assertTrue(job.start()) - self.assertEqual(job.status, 'pending') + self.assertEqual(job.status, 'waiting') job = Job.objects.get(pk=job.pk) self.check_job_result(job, 'successful') self.check_job_events(job, 'ok', 1, 1, async=True, async_nowait=True) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1498d9ef27..fca79f1480 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -282,6 +282,7 @@ CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 +CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # Any ANSIBLE_* settings will be passed to the subprocess environment by the # celery task. diff --git a/awx/ui/static/js/lists/InventoryHosts.js b/awx/ui/static/js/lists/InventoryHosts.js index b40cd9ea6b..c0f9ad40b0 100644 --- a/awx/ui/static/js/lists/InventoryHosts.js +++ b/awx/ui/static/js/lists/InventoryHosts.js @@ -118,4 +118,4 @@ angular.module('InventoryHostsDefinition', []) } }); - \ No newline at end of file + diff --git a/awx/ui/static/js/widgets/Stream.js b/awx/ui/static/js/widgets/Stream.js index 25baf67b82..fe34fc04d4 100644 --- a/awx/ui/static/js/widgets/Stream.js +++ b/awx/ui/static/js/widgets/Stream.js @@ -459,4 +459,4 @@ angular.module('StreamWidget', ['RestServices', 'Utilities', 'StreamListDefiniti } }]); - \ No newline at end of file +