mirror of
https://github.com/ansible/awx.git
synced 2026-05-17 14:27:42 -02:30
AC-1015 Changed where save_job_events task is started so that job events don't appear to be missing when they're stuck in the queue.
This commit is contained in:
@@ -26,7 +26,8 @@ import pexpect
|
|||||||
from kombu import Connection, Exchange, Queue
|
from kombu import Connection, Exchange, Queue
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery import Task, task
|
from celery import Celery, Task, task
|
||||||
|
from celery.execute import send_task
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -476,6 +477,12 @@ class RunJob(BaseTask):
|
|||||||
'''
|
'''
|
||||||
if job.status in ('pending', 'waiting'):
|
if job.status in ('pending', 'waiting'):
|
||||||
job = self.update_model(job.pk, status='pending')
|
job = self.update_model(job.pk, status='pending')
|
||||||
|
# Start another task to process job events.
|
||||||
|
if settings.BROKER_URL.startswith('amqp://'):
|
||||||
|
app = Celery('tasks', broker=settings.BROKER_URL)
|
||||||
|
send_task('awx.main.tasks.save_job_events', kwargs={
|
||||||
|
'job_id': job.id,
|
||||||
|
}, serializer='json')
|
||||||
return True
|
return True
|
||||||
elif job.cancel_flag:
|
elif job.cancel_flag:
|
||||||
job = self.update_model(job.pk, status='canceled')
|
job = self.update_model(job.pk, status='canceled')
|
||||||
|
|||||||
@@ -51,10 +51,6 @@ except ImportError:
|
|||||||
# Kombu
|
# Kombu
|
||||||
from kombu import Connection, Exchange, Queue
|
from kombu import Connection, Exchange, Queue
|
||||||
|
|
||||||
# Celery
|
|
||||||
from celery import Celery
|
|
||||||
from celery.execute import send_task
|
|
||||||
|
|
||||||
|
|
||||||
class TokenAuth(requests.auth.AuthBase):
|
class TokenAuth(requests.auth.AuthBase):
|
||||||
|
|
||||||
@@ -96,12 +92,6 @@ class CallbackModule(object):
|
|||||||
def __del__(self):
|
def __del__(self):
|
||||||
self._cleanup_connection()
|
self._cleanup_connection()
|
||||||
|
|
||||||
def _start_save_job_events_task(self):
|
|
||||||
app = Celery('tasks', broker=self.broker_url)
|
|
||||||
send_task('awx.main.tasks.save_job_events', kwargs={
|
|
||||||
'job_id': self.job_id,
|
|
||||||
}, serializer='json')
|
|
||||||
|
|
||||||
def _publish_errback(self, exc, interval):
|
def _publish_errback(self, exc, interval):
|
||||||
if self.job_callback_debug:
|
if self.job_callback_debug:
|
||||||
print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid())
|
print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid())
|
||||||
@@ -194,8 +184,6 @@ class CallbackModule(object):
|
|||||||
if task and event not in self.EVENTS_WITHOUT_TASK:
|
if task and event not in self.EVENTS_WITHOUT_TASK:
|
||||||
event_data['task'] = task
|
event_data['task'] = task
|
||||||
if self.broker_url:
|
if self.broker_url:
|
||||||
if event == 'playbook_on_start':
|
|
||||||
self._start_save_job_events_task()
|
|
||||||
self._post_job_event_queue_msg(event, event_data)
|
self._post_job_event_queue_msg(event, event_data)
|
||||||
else:
|
else:
|
||||||
self._post_rest_api_event(event, event_data)
|
self._post_rest_api_event(event, event_data)
|
||||||
|
|||||||
Reference in New Issue
Block a user