From 4810b9532d4b7522081dfee0181b7acbefe16b1e Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 22 Apr 2014 15:52:45 -0400 Subject: [PATCH] Generalize socketio sender from task and emit more data specific to the type of unified job sending the update --- awx/main/management/commands/run_task_system.py | 4 ++-- awx/main/models/inventory.py | 5 ++++- awx/main/models/projects.py | 5 ++++- awx/main/models/unified_jobs.py | 15 ++++++++++++--- awx/main/tasks.py | 10 +++++----- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 3a88fb4124..6b22fe8621 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -23,7 +23,7 @@ from django.utils.tzinfo import FixedOffset # AWX from awx.main.models import * from awx.main.tasks import handle_work_error -from awx.main.utils import get_system_task_capacity, decrypt_field, emit_websocket_notification +from awx.main.utils import get_system_task_capacity, decrypt_field # ZeroMQ import zmq @@ -175,7 +175,7 @@ def rebuild_graph(message): task.status = 'failed' task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.save() - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=task.id, status='failed')) + task.socketio_emit_status("failed") running_tasks.pop(running_tasks.index(task)) print("Task %s appears orphaned... marking as failed" % task) diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 5c85105ab4..1050bccf09 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -731,7 +731,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions): default=False, editable=False, ) - + @classmethod def _get_parent_field_name(cls): return 'inventory_source' @@ -741,6 +741,9 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions): from awx.main.tasks import RunInventoryUpdate return RunInventoryUpdate + def socketio_emit_data(self): + return dict(group_id=self.inventory_source.group.id) + def save(self, *args, **kwargs): update_fields = kwargs.get('update_fields', []) if bool('license' in self.result_stdout and diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 6817a44f4e..857ba85f1b 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -48,7 +48,7 @@ class ProjectOptions(models.Model): ('hg', _('Mercurial')), ('svn', _('Subversion')), ] - + class Meta: abstract = True @@ -346,6 +346,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions): return True return False + def socketio_emit_data(self): + return dict(project_id=self.project.id) + @property def task_impact(self): return 20 diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index d8ba7e5e6c..2a0898facb 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -539,6 +539,15 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique ''' Given another task object determine if this task would be blocked by it ''' raise NotImplementedError # Implement in subclass. + def socketio_emit_data(self): + ''' Return extra data that should be included when submitting data to the browser over the websocket connection ''' + return {} + + def socketio_emit_status(self, status): + status_data = dict(unified_job_id=self.id, status=status) + status_data.update(self.socketio_emit_data()) + emit_websocket_notification('/socket.io/jobs', 'status_changed', status_data) + def generate_dependencies(self, active_tasks): ''' Generate any tasks that the current task might be dependent on given a list of active tasks that might preclude creating one''' @@ -583,7 +592,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if not all(opts.values()): return False self.update_fields(start_args=json.dumps(kwargs), status='pending') - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=self.id, status='pending')) + self.socketio_emit_status("pending") task_type = get_type_for_model(self) # notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs)) return True @@ -623,7 +632,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique instance.job_explanation = 'Forced cancel' update_fields.append('job_explanation') instance.save(update_fields=update_fields) - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=instance.id, status='canceled')) + self.socketio_emit_status("canceled") except: # FIXME: Log this exception! if settings.DEBUG: raise @@ -633,7 +642,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if not self.cancel_flag: self.cancel_flag = True self.save(update_fields=['cancel_flag']) - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=self.id, status='canceled')) + self.socketio_emit_status("canceled") if settings.BROKER_URL.startswith('amqp://'): self._force_cancel() return self.cancel_flag diff --git a/awx/main/tasks.py b/awx/main/tasks.py index aa2851a5d5..26a5bf2ef0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -37,7 +37,7 @@ from django.utils.timezone import now # AWX from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate -from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url, emit_websocket_notification +from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] @@ -84,7 +84,7 @@ def tower_periodic_scheduler(self): new_unified_job.status = 'failed' new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" new_unified_job.save(update_fields=['job_status', 'job_explanation']) - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=new_unified_job.id, status='failed')) + new_unified_job.socketio_emit_status("failed") @task() def notify_task_runner(metadata_dict): @@ -124,7 +124,7 @@ def handle_work_error(self, task_id, subtasks=None): instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \ (first_task_type, first_task_name, task_id) instance.save() - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=instance.id, status='failed')) + instance.socketio_emit_status("failed") class BaseTask(Task): @@ -335,8 +335,8 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=pk, status='running')) instance = self.update_model(pk, status='running', celery_task_id=self.request.id) + instance.socketio_emit_status("running") status, tb = 'error', '' output_replacements = [] try: @@ -384,7 +384,7 @@ class BaseTask(Task): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) - emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=pk, status=status)) + instance.socketio_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute