From 805514990b4eb95065407c80311e10641ab9829b Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 1 Feb 2016 16:55:57 -0500 Subject: [PATCH] Changes to celery tasks to support success signals Linking in a success callback that will be invoked by our UnifiedJobs in the case they terminate normally. This is where we'll hook in the success notification type. --- awx/main/management/commands/run_task_system.py | 7 ++++--- awx/main/models/unified_jobs.py | 6 +++--- awx/main/tasks.py | 9 ++++++++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index d49dbf1669..5b5dd3bff0 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -15,7 +15,7 @@ from django.core.management.base import NoArgsCommand # AWX from awx.main.models import * # noqa from awx.main.queue import FifoQueue -from awx.main.tasks import handle_work_error +from awx.main.tasks import handle_work_error, handle_work_success from awx.main.utils import get_system_task_capacity # Celery @@ -265,14 +265,15 @@ def process_graph(graph, task_capacity): [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in node_dependencies] error_handler = handle_work_error.s(subtasks=dependent_nodes) - start_status = node_obj.start(error_callback=error_handler) + success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), + 'id': node_obj.id}) + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) if not start_status: node_obj.status = 'failed' if node_obj.job_explanation: node_obj.job_explanation += ' ' node_obj.job_explanation += 'Task failed pre-start check.' node_obj.save() - # TODO: Run error handler continue remaining_volume -= impact running_impact += impact diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 86ab0b3143..cd519af726 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -717,7 +717,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique tasks that might preclude creating one''' return [] - def start(self, error_callback, **kwargs): + def start(self, error_callback, success_callback, **kwargs): ''' Start the task running via Celery. ''' @@ -743,7 +743,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # if field not in needed]) if 'extra_vars' in kwargs: self.handle_extra_data(kwargs['extra_vars']) - task_class().apply_async((self.pk,), opts, link_error=error_callback) + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) return True def signal_start(self, **kwargs): @@ -765,7 +765,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # Sanity check: If we are running unit tests, then run synchronously. if getattr(settings, 'CELERY_UNIT_TEST', False): - return self.start(None, **kwargs) + return self.start(None, None, **kwargs) # Save the pending status, and inform the SocketIO listener. self.update_fields(start_args=json.dumps(kwargs), status='pending') diff --git a/awx/main/tasks.py b/awx/main/tasks.py index acfe2022ae..478bb6275c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -52,7 +52,8 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, from awx.fact.utils.connection import test_mongo_connection __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', - 'RunAdHocCommand', 'handle_work_error', 'update_inventory_computed_fields'] + 'RunAdHocCommand', 'handle_work_error', 'handle_work_success', + 'update_inventory_computed_fields'] HIDDEN_PASSWORD = '**********' @@ -159,8 +160,14 @@ def mongodb_control(cmd): p = subprocess.Popen('sudo mongod --shutdown -f /etc/mongod.conf', shell=True) p.wait() +@task(bind=True) +def handle_work_success(self, result, task_actual): + # TODO: Perform Notification tasks + pass + @task(bind=True) def handle_work_error(self, task_id, subtasks=None): + # TODO: Perform Notification tasks print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) first_task = None