diff --git a/awx/__init__.py b/awx/__init__.py index b35364ce35..aa38587de3 100644 --- a/awx/__init__.py +++ b/awx/__init__.py @@ -1,15 +1,17 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +from __future__ import absolute_import, unicode_literals import os import sys import warnings from pkg_resources import get_distribution +from .celery import app as celery_app __version__ = get_distribution('awx').version -__all__ = ['__version__'] +__all__ = ['__version__', 'celery_app'] # Check for the presence/absence of "devonly" module to determine if running # from a source code checkout or release packaage. diff --git a/awx/celery.py b/awx/celery.py new file mode 100644 index 0000000000..d1854d9487 --- /dev/null +++ b/awx/celery.py @@ -0,0 +1,23 @@ +# Copyright (c) 2017 Ansible, Inc. +# All Rights Reserved. + +from __future__ import absolute_import, unicode_literals + +import os +from celery import Celery + + +try: + import awx.devonly # noqa + MODE = 'development' +except ImportError: # pragma: no cover + MODE = 'production' + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'awx.settings.%s' % MODE) + +app = Celery('awx') +app.config_from_object('django.conf:settings', namespace='CELERY') +app.autodiscover_tasks() + +if __name__ == '__main__': + app.start() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 78c4d07137..5f9c773c27 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -25,7 +25,7 @@ from django.contrib.contenttypes.models import ContentType from polymorphic.models import PolymorphicModel # Django-Celery -from djcelery.models import TaskMeta +from django_celery_results.models import TaskResult # AWX from awx.main.models.base import * # noqa @@ -88,7 +88,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio ALL_STATUS_CHOICES = OrderedDict(PROJECT_STATUS_CHOICES + INVENTORY_SOURCE_STATUS_CHOICES + JOB_TEMPLATE_STATUS_CHOICES + DEPRECATED_STATUS_CHOICES).items() # NOTE: Working around a django-polymorphic issue: https://github.com/django-polymorphic/django-polymorphic/issues/229 - _base_manager = models.Manager() + base_manager_name = 'base_objects' class Meta: app_label = 'main' @@ -438,7 +438,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique PASSWORD_FIELDS = ('start_args',) # NOTE: Working around a django-polymorphic issue: https://github.com/django-polymorphic/django-polymorphic/issues/229 - _base_manager = models.Manager() + base_manager_name = 'base_objects' class Meta: app_label = 'main' @@ -872,8 +872,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def celery_task(self): try: if self.celery_task_id: - return TaskMeta.objects.get(task_id=self.celery_task_id) - except TaskMeta.DoesNotExist: + return TaskResult.objects.get(task_id=self.celery_task_id) + except TaskResult.DoesNotExist: pass def get_passwords_needed_to_start(self): @@ -1100,7 +1100,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique cancel_fields.append('job_explanation') self.save(update_fields=cancel_fields) self.websocket_emit_status("canceled") - if settings.BROKER_URL.startswith('amqp://'): + if settings.CELERY_BROKER_URL.startswith('amqp://'): self._force_cancel() return self.cancel_flag diff --git a/awx/main/queue.py b/awx/main/queue.py index 03a7b2a2cf..401c73f831 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -17,7 +17,7 @@ __all__ = ['CallbackQueueDispatcher'] class CallbackQueueDispatcher(object): def __init__(self): - self.callback_connection = getattr(settings, 'BROKER_URL', None) + self.callback_connection = getattr(settings, 'CELERY_BROKER_URL', None) self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '') self.connection = None self.exchange = None diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index ae97f367be..70d4c95354 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -3,7 +3,7 @@ import logging # Celery -from celery import Task, task +from celery import Task, shared_task # AWX from awx.main.scheduler import TaskManager @@ -21,17 +21,17 @@ class LogErrorsTask(Task): super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) -@task +@shared_task def run_job_launch(job_id): TaskManager().schedule() -@task +@shared_task def run_job_complete(job_id): TaskManager().schedule() -@task(base=LogErrorsTask) +@shared_task(base=LogErrorsTask) def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c9f768e8a8..ce75f775dc 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -27,7 +27,7 @@ except Exception: psutil = None # Celery -from celery import Task, task +from celery import Task, shared_task from celery.signals import celeryd_init, worker_process_init, worker_shutdown # Django @@ -46,6 +46,7 @@ from crum import impersonate # AWX from awx import __version__ as awx_application_version +from awx import celery_app from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS from awx.main.models import * # noqa from awx.main.models.unified_jobs import ACTIVE_STATES @@ -131,7 +132,7 @@ def inform_cluster_of_shutdown(*args, **kwargs): logger.exception('Encountered problem with normal shutdown signal.') -@task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) +@shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) def handle_setting_changes(self, setting_keys): orig_len = len(setting_keys) for i in range(orig_len): @@ -148,7 +149,7 @@ def handle_setting_changes(self, setting_keys): break -@task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower', base=LogErrorsTask) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -172,7 +173,7 @@ def send_notifications(notification_list, job_id=None): notification.save() -@task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower', base=LogErrorsTask) def run_administrative_checks(self): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: @@ -194,13 +195,13 @@ def run_administrative_checks(self): fail_silently=True) -@task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower', base=LogErrorsTask) def cleanup_authtokens(self): logger.warn("Cleaning up expired authtokens.") AuthToken.objects.filter(expires__lt=now()).delete() -@task(bind=True, base=LogErrorsTask) +@shared_task(bind=True, base=LogErrorsTask) def purge_old_stdout_files(self): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): @@ -209,7 +210,7 @@ def purge_old_stdout_files(self): logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f))) -@task(bind=True, base=LogErrorsTask) +@shared_task(bind=True, base=LogErrorsTask) def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") nowtime = now() @@ -262,7 +263,7 @@ def cluster_node_heartbeat(self): logger.exception('Error marking {} as lost'.format(other_inst.hostname)) -@task(bind=True, base=LogErrorsTask) +@shared_task(bind=True, base=LogErrorsTask) def awx_isolated_heartbeat(self): local_hostname = settings.CLUSTER_HOST_ID logger.debug("Controlling node checking for any isolated management tasks.") @@ -286,7 +287,7 @@ def awx_isolated_heartbeat(self): isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) -@task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower', base=LogErrorsTask) def awx_periodic_scheduler(self): run_now = now() state = TowerScheduleState.get_solo() @@ -340,7 +341,7 @@ def _send_notification_templates(instance, status_str): job_id=instance.id) -@task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower', base=LogErrorsTask) def handle_work_success(self, result, task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -356,7 +357,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower', base=LogErrorsTask) def handle_work_error(self, task_id, subtasks=None): logger.debug('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) first_instance = None @@ -399,7 +400,7 @@ def handle_work_error(self, task_id, subtasks=None): pass -@task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower', base=LogErrorsTask) def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -419,7 +420,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): raise -@task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower', base=LogErrorsTask) def update_host_smart_inventory_memberships(): try: with transaction.atomic(): @@ -435,7 +436,7 @@ def update_host_smart_inventory_memberships(): return -@task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5) +@shared_task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5) def delete_inventory(self, inventory_id, user_id): # Delete inventory as user if user_id is None: @@ -1003,7 +1004,7 @@ class RunJob(BaseTask): env['TOWER_HOST'] = settings.TOWER_URL_BASE env['AWX_HOST'] = settings.TOWER_URL_BASE env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE - env['CALLBACK_CONNECTION'] = settings.BROKER_URL + env['CALLBACK_CONNECTION'] = settings.CELERY_BROKER_URL env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' @@ -2054,7 +2055,7 @@ class RunAdHocCommand(BaseTask): env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1' env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline). env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE - env['CALLBACK_CONNECTION'] = settings.BROKER_URL + env['CALLBACK_CONNECTION'] = settings.CELERY_BROKER_URL env['ANSIBLE_SFTP_BATCH_MODE'] = 'False' env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): @@ -2221,3 +2222,10 @@ class RunSystemJob(BaseTask): def build_cwd(self, instance, **kwargs): return settings.BASE_DIR + + +celery_app.register_task(RunJob()) +celery_app.register_task(RunProjectUpdate()) +celery_app.register_task(RunInventoryUpdate()) +celery_app.register_task(RunAdHocCommand()) +celery_app.register_task(RunSystemJob())