mirror of
https://github.com/ansible/awx.git
synced 2026-05-20 07:17:40 -02:30
Changes to celery tasks to support success signals
Linking in a success callback that will be invoked by our UnifiedJobs in the case they terminate normally. This is where we'll hook in the success notification type.
This commit is contained in:
@@ -15,7 +15,7 @@ from django.core.management.base import NoArgsCommand
|
|||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import * # noqa
|
from awx.main.models import * # noqa
|
||||||
from awx.main.queue import FifoQueue
|
from awx.main.queue import FifoQueue
|
||||||
from awx.main.tasks import handle_work_error
|
from awx.main.tasks import handle_work_error, handle_work_success
|
||||||
from awx.main.utils import get_system_task_capacity
|
from awx.main.utils import get_system_task_capacity
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
@@ -265,14 +265,15 @@ def process_graph(graph, task_capacity):
|
|||||||
[{'type': graph.get_node_type(n['node_object']),
|
[{'type': graph.get_node_type(n['node_object']),
|
||||||
'id': n['node_object'].id} for n in node_dependencies]
|
'id': n['node_object'].id} for n in node_dependencies]
|
||||||
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
||||||
start_status = node_obj.start(error_callback=error_handler)
|
success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj),
|
||||||
|
'id': node_obj.id})
|
||||||
|
start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler)
|
||||||
if not start_status:
|
if not start_status:
|
||||||
node_obj.status = 'failed'
|
node_obj.status = 'failed'
|
||||||
if node_obj.job_explanation:
|
if node_obj.job_explanation:
|
||||||
node_obj.job_explanation += ' '
|
node_obj.job_explanation += ' '
|
||||||
node_obj.job_explanation += 'Task failed pre-start check.'
|
node_obj.job_explanation += 'Task failed pre-start check.'
|
||||||
node_obj.save()
|
node_obj.save()
|
||||||
# TODO: Run error handler
|
|
||||||
continue
|
continue
|
||||||
remaining_volume -= impact
|
remaining_volume -= impact
|
||||||
running_impact += impact
|
running_impact += impact
|
||||||
|
|||||||
@@ -717,7 +717,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
tasks that might preclude creating one'''
|
tasks that might preclude creating one'''
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def start(self, error_callback, **kwargs):
|
def start(self, error_callback, success_callback, **kwargs):
|
||||||
'''
|
'''
|
||||||
Start the task running via Celery.
|
Start the task running via Celery.
|
||||||
'''
|
'''
|
||||||
@@ -743,7 +743,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
# if field not in needed])
|
# if field not in needed])
|
||||||
if 'extra_vars' in kwargs:
|
if 'extra_vars' in kwargs:
|
||||||
self.handle_extra_data(kwargs['extra_vars'])
|
self.handle_extra_data(kwargs['extra_vars'])
|
||||||
task_class().apply_async((self.pk,), opts, link_error=error_callback)
|
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def signal_start(self, **kwargs):
|
def signal_start(self, **kwargs):
|
||||||
@@ -765,7 +765,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
|
|
||||||
# Sanity check: If we are running unit tests, then run synchronously.
|
# Sanity check: If we are running unit tests, then run synchronously.
|
||||||
if getattr(settings, 'CELERY_UNIT_TEST', False):
|
if getattr(settings, 'CELERY_UNIT_TEST', False):
|
||||||
return self.start(None, **kwargs)
|
return self.start(None, None, **kwargs)
|
||||||
|
|
||||||
# Save the pending status, and inform the SocketIO listener.
|
# Save the pending status, and inform the SocketIO listener.
|
||||||
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
||||||
|
|||||||
@@ -52,7 +52,8 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
|||||||
from awx.fact.utils.connection import test_mongo_connection
|
from awx.fact.utils.connection import test_mongo_connection
|
||||||
|
|
||||||
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
|
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
|
||||||
'RunAdHocCommand', 'handle_work_error', 'update_inventory_computed_fields']
|
'RunAdHocCommand', 'handle_work_error', 'handle_work_success',
|
||||||
|
'update_inventory_computed_fields']
|
||||||
|
|
||||||
HIDDEN_PASSWORD = '**********'
|
HIDDEN_PASSWORD = '**********'
|
||||||
|
|
||||||
@@ -159,8 +160,14 @@ def mongodb_control(cmd):
|
|||||||
p = subprocess.Popen('sudo mongod --shutdown -f /etc/mongod.conf', shell=True)
|
p = subprocess.Popen('sudo mongod --shutdown -f /etc/mongod.conf', shell=True)
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
|
@task(bind=True)
|
||||||
|
def handle_work_success(self, result, task_actual):
|
||||||
|
# TODO: Perform Notification tasks
|
||||||
|
pass
|
||||||
|
|
||||||
@task(bind=True)
|
@task(bind=True)
|
||||||
def handle_work_error(self, task_id, subtasks=None):
|
def handle_work_error(self, task_id, subtasks=None):
|
||||||
|
# TODO: Perform Notification tasks
|
||||||
print('Executing error task id %s, subtasks: %s' %
|
print('Executing error task id %s, subtasks: %s' %
|
||||||
(str(self.request.id), str(subtasks)))
|
(str(self.request.id), str(subtasks)))
|
||||||
first_task = None
|
first_task = None
|
||||||
|
|||||||
Reference in New Issue
Block a user