AC-752 Work on using celery with rabbitmq, posting job events via celery.

This commit is contained in:
Chris Church
2013-12-11 00:18:39 -05:00
parent dfcedc42fe
commit c850379222
4 changed files with 49 additions and 10 deletions

View File

@@ -12,6 +12,7 @@ import yaml
# Django # Django
from django.db import models from django.db import models
from django.db import transaction
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now from django.utils.timezone import now
@@ -375,6 +376,7 @@ class CommonTask(PrimordialModel):
return False return False
self.status = 'pending' self.status = 'pending'
self.save(update_fields=['status']) self.save(update_fields=['status'])
transaction.commit()
task_result = task_class().delay(self.pk, **opts) task_result = task_class().delay(self.pk, **opts)
# Reload instance from database so we don't clobber results from task # Reload instance from database so we don't clobber results from task
# (mainly from tests when using Django 1.4.x). # (mainly from tests when using Django 1.4.x).

View File

@@ -32,7 +32,7 @@ from django.utils.datastructures import SortedDict
from django.utils.timezone import now from django.utils.timezone import now
# AWX # AWX
from awx.main.models import Job, ProjectUpdate, InventoryUpdate from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport'] __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport']
@@ -45,6 +45,7 @@ class BaseTask(Task):
name = None name = None
model = None model = None
abstract = True
def update_model(self, pk, **updates): def update_model(self, pk, **updates):
''' '''
@@ -335,12 +336,14 @@ class RunJob(BaseTask):
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or '' env['REST_API_TOKEN'] = job.task_auth_token or ''
if settings.BROKER_URL.startswith('amqp://'):
env['BROKER_URL'] = settings.BROKER_URL
# When using Ansible >= 1.3, allow the inventory script to include host # When using Ansible >= 1.3, allow the inventory script to include host
# variables inline via ['_meta']['hostvars']. # variables inline via ['_meta']['hostvars'].
try: try:
Version = distutils.version.StrictVersion Version = distutils.version.StrictVersion
if Version( get_ansible_version()) >= Version('1.3'): if Version(get_ansible_version()) >= Version('1.3'):
env['INVENTORY_HOSTVARS'] = str(True) env['INVENTORY_HOSTVARS'] = str(True)
except ValueError: except ValueError:
pass pass
@@ -524,14 +527,29 @@ class RunJob(BaseTask):
Hook for actions to run after job/task has completed. Hook for actions to run after job/task has completed.
''' '''
super(RunJob, self).post_run_hook(job, **kwargs) super(RunJob, self).post_run_hook(job, **kwargs)
# Update job event fields after job has completed. # Update job event fields after job has completed (only when using REST
for job_event in job.job_events.order_by('pk'): # API callback).
job_event.save(post_process=True) if not settings.BROKER_URL.startswith('amqp://'):
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)
class SaveJobEvent(Task):
name = 'awx.main.tasks.save_job_event'
@transaction.commit_on_success
def run(self, *args, **kwargs):
for key in kwargs.keys():
if key not in ('job_id', 'event', 'event_data'):
kwargs.pop(key)
job_event = JobEvent(**kwargs)
job_event.save(post_process=True)
class RunProjectUpdate(BaseTask): class RunProjectUpdate(BaseTask):
name = 'run_project_update' name = 'awx.main.tasks.run_project_update'
model = ProjectUpdate model = ProjectUpdate
def build_private_data(self, project_update, **kwargs): def build_private_data(self, project_update, **kwargs):
@@ -723,7 +741,7 @@ class RunProjectUpdate(BaseTask):
class RunInventoryUpdate(BaseTask): class RunInventoryUpdate(BaseTask):
name = 'run_inventory_update' name = 'awx.main.tasks.run_inventory_update'
model = InventoryUpdate model = InventoryUpdate
def build_private_data(self, inventory_update, **kwargs): def build_private_data(self, inventory_update, **kwargs):

View File

@@ -47,6 +47,11 @@ except ImportError:
sys.path.insert(0, local_site_packages) sys.path.insert(0, local_site_packages)
import requests import requests
# Celery
from celery import Celery
from celery.execute import send_task
class TokenAuth(requests.auth.AuthBase): class TokenAuth(requests.auth.AuthBase):
def __init__(self, token): def __init__(self, token):
@@ -56,6 +61,7 @@ class TokenAuth(requests.auth.AuthBase):
request.headers['Authorization'] = 'Token %s' % self.token request.headers['Authorization'] = 'Token %s' % self.token
return request return request
class CallbackModule(object): class CallbackModule(object):
''' '''
Callback module for logging ansible-playbook job events via the REST API. Callback module for logging ansible-playbook job events via the REST API.
@@ -78,8 +84,17 @@ class CallbackModule(object):
def __init__(self): def __init__(self):
self.job_id = int(os.getenv('JOB_ID')) self.job_id = int(os.getenv('JOB_ID'))
self.base_url = os.getenv('REST_API_URL') self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '') self.auth_token = os.getenv('REST_API_TOKEN', '')
self.broker_url = os.getenv('BROKER_URL', '')
def _post_msg(self, event, event_data):
app = Celery('tasks', broker=self.broker_url)
send_task('awx.main.tasks.save_job_event', kwargs={
'job_id': self.job_id,
'event': event,
'event_data': event_data,
}, serializer='json')
def _post_data(self, event, event_data): def _post_data(self, event, event_data):
data = json.dumps({ data = json.dumps({
@@ -110,7 +125,10 @@ class CallbackModule(object):
task = getattr(getattr(self, 'task', None), 'name', '') task = getattr(getattr(self, 'task', None), 'name', '')
if task and event not in self.EVENTS_WITHOUT_TASK: if task and event not in self.EVENTS_WITHOUT_TASK:
event_data['task'] = task event_data['task'] = task
self._post_data(event, event_data) if self.broker_url:
self._post_msg(event, event_data)
else:
self._post_data(event, event_data)
def on_any(self, *args, **kwargs): def on_any(self, *args, **kwargs):
pass pass

View File

@@ -276,6 +276,7 @@ djcelery.setup_loader()
BROKER_URL = 'django://' BROKER_URL = 'django://'
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TRACK_STARTED = True CELERY_TRACK_STARTED = True
CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_TIME_LIMIT = None
CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None