From 6ede4567158616ba1f730bae2b7fb50a7cbb3882 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 18 Apr 2014 09:58:27 -0400 Subject: [PATCH] Send signals when jobs/tasks change states --- awx/main/management/commands/run_task_system.py | 3 ++- awx/main/models/unified_jobs.py | 4 +++- awx/main/tasks.py | 9 ++++++--- awx/main/utils.py | 16 +++++++++++++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 4f4dc6c8fd..4859a23d9e 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -23,7 +23,7 @@ from django.utils.tzinfo import FixedOffset # AWX from awx.main.models import * from awx.main.tasks import handle_work_error -from awx.main.utils import get_system_task_capacity, decrypt_field +from awx.main.utils import get_system_task_capacity, decrypt_field, emit_websocket_notification # ZeroMQ import zmq @@ -175,6 +175,7 @@ def rebuild_graph(message): task.status = 'failed' task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.save() + emit_websocket_notification('/socket.io/jobs', 'job_canceled', dict(unified_job_id=task.id)) running_tasks.pop(running_tasks.index(task)) print("Task %s appears orphaned... marking as failed" % task) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index f10f238041..fb941369ef 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -33,7 +33,7 @@ from djcelery.models import TaskMeta # AWX from awx.main.models.base import * from awx.main.models.schedules import Schedule -from awx.main.utils import decrypt_field, get_type_for_model +from awx.main.utils import decrypt_field, get_type_for_model, emit_websocket_notification __all__ = ['UnifiedJobTemplate', 'UnifiedJob'] @@ -583,6 +583,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if not all(opts.values()): return False self.update_fields(start_args=json.dumps(kwargs), status='pending') + emit_websocket_notification('/socket.io/jobs', 'job_started', dict(unified_job_id=self.id)) task_type = get_type_for_model(self) # notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs)) return True @@ -631,6 +632,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique if not self.cancel_flag: self.cancel_flag = True self.save(update_fields=['cancel_flag']) + emit_websocket_notification('/socket.io/jobs', 'job_canceled', dict(unified_job_id=self.id)) if settings.BROKER_URL.startswith('amqp://'): self._force_cancel() return self.cancel_flag diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 55bc4023c6..3c315f02cd 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -37,7 +37,7 @@ from django.utils.timezone import now # AWX from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate -from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url +from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url, emit_websocket_notification __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] @@ -123,9 +123,10 @@ def handle_work_error(self, task_id, subtasks=None): instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \ (first_task_type, first_task_name, task_id) instance.save() + emit_websocket_notification('/socket.io/jobs', 'job_error', dict(unified_job_id=instance.id)) class BaseTask(Task): - + name = None model = None abstract = True @@ -333,6 +334,7 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' + emit_websocket_notification('/socket.io/jobs', 'job_running', dict(unified_job_id=pk)) instance = self.update_model(pk, status='running', celery_task_id=self.request.id) status, tb = 'error', '' output_replacements = [] @@ -381,6 +383,7 @@ class BaseTask(Task): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) + emit_websocket_notification('/socket.io/jobs', 'job_finished', dict(unified_job_id=pk)) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute @@ -581,7 +584,7 @@ class RunJob(BaseTask): class RunProjectUpdate(BaseTask): - + name = 'awx.main.tasks.run_project_update' model = ProjectUpdate diff --git a/awx/main/utils.py b/awx/main/utils.py index 912f6603f2..c9d185b30b 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -16,6 +16,9 @@ from rest_framework.exceptions import ParseError, PermissionDenied # PyCrypto from Crypto.Cipher import AES +# ZeroMQ +import zmq + __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'get_ansible_version', 'get_awx_version', 'update_scm_url', 'get_type_for_model', 'get_model_for_type'] @@ -162,7 +165,7 @@ def update_scm_url(scm_type, url, username=True, password=True, elif scm_type == 'git' and ':' in url: if url.count(':') > 1: raise ValueError('Invalid %s URL' % scm_type) - + modified_url = '/'.join(url.split(':', 1)) parts = urlparse.urlsplit('ssh://%s' % modified_url) # Handle local paths specified without file scheme (e.g. /path/to/foo). @@ -325,6 +328,9 @@ def get_model_for_type(type): return ct_model def get_system_task_capacity(): + ''' + Measure system memory and use it as a baseline for determining the system's capacity + ''' from django.conf import settings if hasattr(settings, 'SYSTEM_TASK_CAPACITY'): return settings.SYSTEM_TASK_CAPACITY @@ -334,3 +340,11 @@ def get_system_task_capacity(): if int(total_mem_value) <= 2048: return 50 return 50 + ((int(total_mem_value) / 1024) - 2) * 75 + +def emit_websocket_notification(endpoint, event, payload): + emit_context = zmq.Context() + emit_socket = emit_context.socket(zmq.PUSH) + emit_socket.connect(settings.SOCKETIO_NOTIFICATION_PORT) + payload['event'] = event + payload['endpoint'] = endpoint + emit_socket.send_json(payload);