From c850379222dc79a76316fe285ab41335ba5b71f0 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Wed, 11 Dec 2013 00:18:39 -0500 Subject: [PATCH] AC-752 Work on using celery with rabbitmq, posting job events via celery. --- awx/main/models/base.py | 2 ++ awx/main/tasks.py | 34 +++++++++++++++++----- awx/plugins/callback/job_event_callback.py | 22 ++++++++++++-- awx/settings/defaults.py | 1 + 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/awx/main/models/base.py b/awx/main/models/base.py index 7c32ded8bf..0eb2751e0f 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -12,6 +12,7 @@ import yaml # Django from django.db import models +from django.db import transaction from django.core.exceptions import ValidationError from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now @@ -375,6 +376,7 @@ class CommonTask(PrimordialModel): return False self.status = 'pending' self.save(update_fields=['status']) + transaction.commit() task_result = task_class().delay(self.pk, **opts) # Reload instance from database so we don't clobber results from task # (mainly from tests when using Django 1.4.x). diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 55ee9a49e1..fa70802dda 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,7 @@ from django.utils.datastructures import SortedDict from django.utils.timezone import now # AWX -from awx.main.models import Job, ProjectUpdate, InventoryUpdate +from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport'] @@ -45,6 +45,7 @@ class BaseTask(Task): name = None model = None + abstract = True def update_model(self, pk, **updates): ''' @@ -335,12 +336,14 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' + if settings.BROKER_URL.startswith('amqp://'): + env['BROKER_URL'] = settings.BROKER_URL # When using Ansible >= 1.3, allow the inventory script to include host # variables inline via ['_meta']['hostvars']. try: Version = distutils.version.StrictVersion - if Version( get_ansible_version()) >= Version('1.3'): + if Version(get_ansible_version()) >= Version('1.3'): env['INVENTORY_HOSTVARS'] = str(True) except ValueError: pass @@ -524,14 +527,29 @@ class RunJob(BaseTask): Hook for actions to run after job/task has completed. ''' super(RunJob, self).post_run_hook(job, **kwargs) - # Update job event fields after job has completed. - for job_event in job.job_events.order_by('pk'): - job_event.save(post_process=True) - + # Update job event fields after job has completed (only when using REST + # API callback). + if not settings.BROKER_URL.startswith('amqp://'): + for job_event in job.job_events.order_by('pk'): + job_event.save(post_process=True) + + +class SaveJobEvent(Task): + + name = 'awx.main.tasks.save_job_event' + + @transaction.commit_on_success + def run(self, *args, **kwargs): + for key in kwargs.keys(): + if key not in ('job_id', 'event', 'event_data'): + kwargs.pop(key) + job_event = JobEvent(**kwargs) + job_event.save(post_process=True) + class RunProjectUpdate(BaseTask): - name = 'run_project_update' + name = 'awx.main.tasks.run_project_update' model = ProjectUpdate def build_private_data(self, project_update, **kwargs): @@ -723,7 +741,7 @@ class RunProjectUpdate(BaseTask): class RunInventoryUpdate(BaseTask): - name = 'run_inventory_update' + name = 'awx.main.tasks.run_inventory_update' model = InventoryUpdate def build_private_data(self, inventory_update, **kwargs): diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4f0450c946..ba6f9cd8e1 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -47,6 +47,11 @@ except ImportError: sys.path.insert(0, local_site_packages) import requests +# Celery +from celery import Celery +from celery.execute import send_task + + class TokenAuth(requests.auth.AuthBase): def __init__(self, token): @@ -56,6 +61,7 @@ class TokenAuth(requests.auth.AuthBase): request.headers['Authorization'] = 'Token %s' % self.token return request + class CallbackModule(object): ''' Callback module for logging ansible-playbook job events via the REST API. @@ -78,8 +84,17 @@ class CallbackModule(object): def __init__(self): self.job_id = int(os.getenv('JOB_ID')) - self.base_url = os.getenv('REST_API_URL') + self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') + self.broker_url = os.getenv('BROKER_URL', '') + + def _post_msg(self, event, event_data): + app = Celery('tasks', broker=self.broker_url) + send_task('awx.main.tasks.save_job_event', kwargs={ + 'job_id': self.job_id, + 'event': event, + 'event_data': event_data, + }, serializer='json') def _post_data(self, event, event_data): data = json.dumps({ @@ -110,7 +125,10 @@ class CallbackModule(object): task = getattr(getattr(self, 'task', None), 'name', '') if task and event not in self.EVENTS_WITHOUT_TASK: event_data['task'] = task - self._post_data(event, event_data) + if self.broker_url: + self._post_msg(event, event_data) + else: + self._post_data(event, event_data) def on_any(self, *args, **kwargs): pass diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 08671efd2f..9c6d284545 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -276,6 +276,7 @@ djcelery.setup_loader() BROKER_URL = 'django://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' +CELERY_ACCEPT_CONTENT = ['json'] CELERY_TRACK_STARTED = True CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None