Update celery environ and tasks

This commit is contained in:
Wayne Witzel III 2017-11-09 17:21:19 -05:00
parent de376292ba
commit 14c5123fda
No known key found for this signature in database
GPG Key ID: B4F07BDC564D6301
6 changed files with 61 additions and 28 deletions

View File

@ -1,15 +1,17 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
from __future__ import absolute_import, unicode_literals
import os
import sys
import warnings
from pkg_resources import get_distribution
from .celery import app as celery_app
__version__ = get_distribution('awx').version
__all__ = ['__version__']
__all__ = ['__version__', 'celery_app']
# Check for the presence/absence of "devonly" module to determine if running
# from a source code checkout or release packaage.

23
awx/celery.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
try:
import awx.devonly # noqa
MODE = 'development'
except ImportError: # pragma: no cover
MODE = 'production'
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'awx.settings.%s' % MODE)
app = Celery('awx')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
if __name__ == '__main__':
app.start()

View File

@ -25,7 +25,7 @@ from django.contrib.contenttypes.models import ContentType
from polymorphic.models import PolymorphicModel
# Django-Celery
from djcelery.models import TaskMeta
from django_celery_results.models import TaskResult
# AWX
from awx.main.models.base import * # noqa
@ -88,7 +88,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
ALL_STATUS_CHOICES = OrderedDict(PROJECT_STATUS_CHOICES + INVENTORY_SOURCE_STATUS_CHOICES + JOB_TEMPLATE_STATUS_CHOICES + DEPRECATED_STATUS_CHOICES).items()
# NOTE: Working around a django-polymorphic issue: https://github.com/django-polymorphic/django-polymorphic/issues/229
_base_manager = models.Manager()
base_manager_name = 'base_objects'
class Meta:
app_label = 'main'
@ -438,7 +438,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
PASSWORD_FIELDS = ('start_args',)
# NOTE: Working around a django-polymorphic issue: https://github.com/django-polymorphic/django-polymorphic/issues/229
_base_manager = models.Manager()
base_manager_name = 'base_objects'
class Meta:
app_label = 'main'
@ -872,8 +872,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def celery_task(self):
try:
if self.celery_task_id:
return TaskMeta.objects.get(task_id=self.celery_task_id)
except TaskMeta.DoesNotExist:
return TaskResult.objects.get(task_id=self.celery_task_id)
except TaskResult.DoesNotExist:
pass
def get_passwords_needed_to_start(self):
@ -1100,7 +1100,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
cancel_fields.append('job_explanation')
self.save(update_fields=cancel_fields)
self.websocket_emit_status("canceled")
if settings.BROKER_URL.startswith('amqp://'):
if settings.CELERY_BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag

View File

@ -17,7 +17,7 @@ __all__ = ['CallbackQueueDispatcher']
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.callback_connection = getattr(settings, 'CELERY_BROKER_URL', None)
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None

View File

@ -3,7 +3,7 @@
import logging
# Celery
from celery import Task, task
from celery import Task, shared_task
# AWX
from awx.main.scheduler import TaskManager
@ -21,17 +21,17 @@ class LogErrorsTask(Task):
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@task
@shared_task
def run_job_launch(job_id):
TaskManager().schedule()
@task
@shared_task
def run_job_complete(job_id):
TaskManager().schedule()
@task(base=LogErrorsTask)
@shared_task(base=LogErrorsTask)
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()

View File

@ -27,7 +27,7 @@ except Exception:
psutil = None
# Celery
from celery import Task, task
from celery import Task, shared_task
from celery.signals import celeryd_init, worker_process_init, worker_shutdown
# Django
@ -46,6 +46,7 @@ from crum import impersonate
# AWX
from awx import __version__ as awx_application_version
from awx import celery_app
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
@ -131,7 +132,7 @@ def inform_cluster_of_shutdown(*args, **kwargs):
logger.exception('Encountered problem with normal shutdown signal.')
@task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask)
@shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask)
def handle_setting_changes(self, setting_keys):
orig_len = len(setting_keys)
for i in range(orig_len):
@ -148,7 +149,7 @@ def handle_setting_changes(self, setting_keys):
break
@task(queue='tower', base=LogErrorsTask)
@shared_task(queue='tower', base=LogErrorsTask)
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@ -172,7 +173,7 @@ def send_notifications(notification_list, job_id=None):
notification.save()
@task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
def run_administrative_checks(self):
logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS:
@ -194,13 +195,13 @@ def run_administrative_checks(self):
fail_silently=True)
@task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
def cleanup_authtokens(self):
logger.warn("Cleaning up expired authtokens.")
AuthToken.objects.filter(expires__lt=now()).delete()
@task(bind=True, base=LogErrorsTask)
@shared_task(bind=True, base=LogErrorsTask)
def purge_old_stdout_files(self):
nowtime = time.time()
for f in os.listdir(settings.JOBOUTPUT_ROOT):
@ -209,7 +210,7 @@ def purge_old_stdout_files(self):
logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
@task(bind=True, base=LogErrorsTask)
@shared_task(bind=True, base=LogErrorsTask)
def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.")
nowtime = now()
@ -262,7 +263,7 @@ def cluster_node_heartbeat(self):
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
@task(bind=True, base=LogErrorsTask)
@shared_task(bind=True, base=LogErrorsTask)
def awx_isolated_heartbeat(self):
local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.")
@ -286,7 +287,7 @@ def awx_isolated_heartbeat(self):
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
@task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
def awx_periodic_scheduler(self):
run_now = now()
state = TowerScheduleState.get_solo()
@ -340,7 +341,7 @@ def _send_notification_templates(instance, status_str):
job_id=instance.id)
@task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
def handle_work_success(self, result, task_actual):
try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@ -356,7 +357,7 @@ def handle_work_success(self, result, task_actual):
run_job_complete.delay(instance.id)
@task(bind=True, queue='tower', base=LogErrorsTask)
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
def handle_work_error(self, task_id, subtasks=None):
logger.debug('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks)))
first_instance = None
@ -399,7 +400,7 @@ def handle_work_error(self, task_id, subtasks=None):
pass
@task(queue='tower', base=LogErrorsTask)
@shared_task(queue='tower', base=LogErrorsTask)
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
'''
Signal handler and wrapper around inventory.update_computed_fields to
@ -419,7 +420,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
raise
@task(queue='tower', base=LogErrorsTask)
@shared_task(queue='tower', base=LogErrorsTask)
def update_host_smart_inventory_memberships():
try:
with transaction.atomic():
@ -435,7 +436,7 @@ def update_host_smart_inventory_memberships():
return
@task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5)
@shared_task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5)
def delete_inventory(self, inventory_id, user_id):
# Delete inventory as user
if user_id is None:
@ -1003,7 +1004,7 @@ class RunJob(BaseTask):
env['TOWER_HOST'] = settings.TOWER_URL_BASE
env['AWX_HOST'] = settings.TOWER_URL_BASE
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL
env['CALLBACK_CONNECTION'] = settings.CELERY_BROKER_URL
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
env['JOB_CALLBACK_DEBUG'] = '2'
@ -2054,7 +2055,7 @@ class RunAdHocCommand(BaseTask):
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline).
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL
env['CALLBACK_CONNECTION'] = settings.CELERY_BROKER_URL
env['ANSIBLE_SFTP_BATCH_MODE'] = 'False'
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
@ -2221,3 +2222,10 @@ class RunSystemJob(BaseTask):
def build_cwd(self, instance, **kwargs):
return settings.BASE_DIR
celery_app.register_task(RunJob())
celery_app.register_task(RunProjectUpdate())
celery_app.register_task(RunInventoryUpdate())
celery_app.register_task(RunAdHocCommand())
celery_app.register_task(RunSystemJob())