From 4c8aaf1aed000cc280f3b7d0f4b03b94a47e88c1 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Thu, 11 Aug 2016 15:06:07 -0400 Subject: [PATCH] converting from socketio to channels websocket --- awx/api/views.py | 23 ++---------- awx/main/consumers.py | 36 +++++++++++++++---- .../management/commands/run_task_system.py | 6 ++-- awx/main/models/inventory.py | 2 +- awx/main/models/jobs.py | 10 +++--- awx/main/models/projects.py | 2 +- awx/main/models/schedules.py | 5 +-- awx/main/models/unified_jobs.py | 19 +++++----- awx/main/routing.py | 3 +- awx/main/signals.py | 9 +++-- awx/main/tasks.py | 32 ++++++++++------- 11 files changed, 83 insertions(+), 64 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index 551bb814e9..6019ce62a2 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -72,8 +72,8 @@ from awx.api.permissions import * # noqa from awx.api.renderers import * # noqa from awx.api.serializers import * # noqa from awx.api.metadata import RoleMetadata -from awx.main.utils import emit_websocket_notification from awx.main.conf import tower_settings +from awx.main.consumers import emit_channel_notification logger = logging.getLogger('awx.api.views') @@ -544,11 +544,7 @@ class AuthTokenView(APIView): # Mark them as invalid and inform the user invalid_tokens = AuthToken.get_tokens_over_limit(serializer.validated_data['user']) for t in invalid_tokens: - # TODO: send socket notification - emit_websocket_notification('/socket.io/control', - 'limit_reached', - dict(reason=force_text(AuthToken.reason_long('limit_reached'))), - token_key=t.key) + emit_channel_notification('control-limit_reached', dict(reason=force_text(AuthToken.reason_long('limit_reached')), token_key=t.key)) t.invalidate(reason='limit_reached') # Note: This header is normally added in the middleware whenever an @@ -3183,21 +3179,8 @@ class JobJobTasksList(BaseJobEventsList): return ({'detail': 'Parent event not found.'}, -1, status.HTTP_404_NOT_FOUND) parent_task = parent_task[0] - # Some events correspond to a playbook or task starting up, - # and these are what we're interested in here. STARTING_EVENTS = ('playbook_on_task_start', 'playbook_on_setup') - - # We need to pull information about each start event. - # - # This is super tricky, because this table has a one-to-many - # relationship with itself (parent-child), and we're getting - # information for an arbitrary number of children. This means we - # need stats on grandchildren, sorted by child. - queryset = (JobEvent.objects.filter(parent__parent=parent_task, - parent__event__in=STARTING_EVENTS) - .values('parent__id', 'event', 'changed') - .annotate(num=Count('event')) - .order_by('parent__id')) + queryset = JobEvent.get_startevent_queryset(parent_task, STARTING_EVENTS) # The data above will come back in a list, but we are going to # want to access it based on the parent id, so map it into a diff --git a/awx/main/consumers.py b/awx/main/consumers.py index de196eb866..23ca874ed0 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -1,13 +1,37 @@ +import json + from channels import Group from channels.sessions import channel_session @channel_session -def job_event_connect(message): - job_id = message.content['path'].strip('/') - message.channel_session['job_id'] = job_id - Group("job_events-%s" % job_id).add(message.reply_channel) +def ws_disconnect(message): + for group in message.channel_session['groups']: + print("removing from group: {}".format(group)) + Group(group).discard(message.reply_channel) -def emit_channel_notification(event, payload): - Group(event).send(payload) +@channel_session +def ws_receive(message): + raw_data = message.content['text'] + data = json.loads(raw_data) + if 'groups' in data: + groups = data['groups'] + current_groups = message.channel_session.pop('groups') if 'groups' in message.channel_session else [] + for group_name,v in groups.items(): + if type(v) is list: + for oid in v: + name = '{}-{}'.format(group_name, oid) + print("listening to group: {}".format(name)) + current_groups.append(name) + Group(name).add(message.reply_channel) + else: + print("listening to group: {}".format(group_name)) + current_groups.append(name) + Group(group_name).add(message.reply_channel) + message.channel_session['groups'] = current_groups + + +def emit_channel_notification(group, payload): + print("sending message to group {}".format(group)) + Group(group).send({"text": json.dumps(payload)}) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 855491f08c..f68f35bad7 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -318,7 +318,7 @@ def rebuild_graph(message): logger.debug("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # NOTE: Pull status again and make sure it didn't finish in + # NOTE: Pull status again and make sure it didn't finish in # the meantime? task.status = 'failed' task.job_explanation += ' '.join(( @@ -326,7 +326,7 @@ def rebuild_graph(message): 'Celery, so it has been marked as failed.', )) task.save() - task.socketio_emit_status("failed") + task.websocket_emit_status("failed") running_tasks.pop(running_tasks.index(task)) logger.error("Task %s appears orphaned... marking as failed" % task) @@ -340,7 +340,7 @@ def rebuild_graph(message): task.status = 'failed' task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) task.save() - task.socketio_emit_status("failed") + task.websocket_emit_status("failed") continue logger.debug("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 0955a28667..576840f8cf 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -1223,7 +1223,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin): from awx.main.tasks import RunInventoryUpdate return RunInventoryUpdate - def socketio_emit_data(self): + def websocket_emit_data(self): if self.inventory_source.group is not None: return dict(group_id=self.inventory_source.group.id) return {} diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 34adcb73a4..002b4d7e15 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -29,12 +29,13 @@ from awx.main.models.notifications import ( JobNotificationMixin, ) from awx.main.utils import decrypt_field, ignore_inventory_computed_fields -from awx.main.utils import emit_websocket_notification from awx.main.redact import PlainTextCleaner from awx.main.conf import tower_settings from awx.main.fields import ImplicitRoleField from awx.main.models.mixins import ResourceMixin +from awx.main.consumers import emit_channel_notification + logger = logging.getLogger('awx.main.models.jobs') @@ -1259,11 +1260,10 @@ class JobEvent(CreatedModifiedModel): if update_fields: host_summary.save(update_fields=update_fields) job.inventory.update_computed_fields() - emit_websocket_notification('/socket.io/jobs', 'summary_complete', dict(unified_job_id=job.id)) - + emit_channel_notification('jobs-summary', dict(unified_job_id=job.id)) @classmethod - def start_event_queryset(cls, parent_task, starting_events, ordering=None): + def get_startevent_queryset(cls, parent_task, starting_events, ordering=None): ''' We need to pull information about each start event. @@ -1369,7 +1369,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): from awx.main.tasks import RunSystemJob return RunSystemJob - def socketio_emit_data(self): + def websocket_emit_data(self): return {} def get_absolute_url(self): diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 85ca3ab2aa..c1349ed38b 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -408,7 +408,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin): return True return False - def socketio_emit_data(self): + def websocket_emit_data(self): return dict(project_id=self.project.id) @property diff --git a/awx/main/models/schedules.py b/awx/main/models/schedules.py index d9de8b394e..330b39f7b6 100644 --- a/awx/main/models/schedules.py +++ b/awx/main/models/schedules.py @@ -16,7 +16,8 @@ from jsonfield import JSONField # AWX from awx.main.models.base import * # noqa -from awx.main.utils import ignore_inventory_computed_fields, emit_websocket_notification +from awx.main.utils import ignore_inventory_computed_fields +from awx.main.consumers import emit_channel_notification from django.core.urlresolvers import reverse logger = logging.getLogger('awx.main.models.schedule') @@ -112,7 +113,7 @@ class Schedule(CommonModel): self.dtend = make_aware(datetime.datetime.strptime(until_date, "%Y%m%dT%H%M%SZ"), get_default_timezone()) if 'count' in self.rrule.lower(): self.dtend = future_rs[-1] - emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=self.id)) + emit_channel_notification('schedules-changed', dict(id=self.id)) with ignore_inventory_computed_fields(): self.unified_job_template.update_computed_fields() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 950b6fc99b..bdcedb2aab 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -32,8 +32,9 @@ from djcelery.models import TaskMeta # AWX from awx.main.models.base import * # noqa from awx.main.models.schedules import Schedule -from awx.main.utils import decrypt_field, emit_websocket_notification, _inventory_updates -from awx.main.redact import UriCleaner, REPLACE_STR +from awx.main.utils import decrypt_field, _inventory_updates +from awx.main.redact import UriCleaner +from awx.main.consumers import emit_channel_notification __all__ = ['UnifiedJobTemplate', 'UnifiedJob'] @@ -774,14 +775,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique ''' Given another task object determine if this task would be blocked by it ''' raise NotImplementedError # Implement in subclass. - def socketio_emit_data(self): + def websocket_emit_data(self): ''' Return extra data that should be included when submitting data to the browser over the websocket connection ''' return {} - def socketio_emit_status(self, status): + def websocket_emit_status(self, status): status_data = dict(unified_job_id=self.id, status=status) - status_data.update(self.socketio_emit_data()) - emit_websocket_notification('/socket.io/jobs', 'status_changed', status_data) + status_data.update(self.websocket_emit_data()) + emit_channel_notification('jobs-status_changed', status_data) def generate_dependencies(self, active_tasks): ''' Generate any tasks that the current task might be dependent on given a list of active @@ -850,7 +851,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # Save the pending status, and inform the SocketIO listener. self.update_fields(start_args=json.dumps(kwargs), status='pending') - self.socketio_emit_status("pending") + self.websocket_emit_status("pending") # Each type of unified job has a different Task class; get the # appropirate one. @@ -900,7 +901,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique instance.job_explanation = 'Forced cancel' update_fields.append('job_explanation') instance.save(update_fields=update_fields) - self.socketio_emit_status("canceled") + self.websocket_emit_status("canceled") except: # FIXME: Log this exception! if settings.DEBUG: raise @@ -914,7 +915,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.status = 'canceled' cancel_fields.append('status') self.save(update_fields=cancel_fields) - self.socketio_emit_status("canceled") + self.websocket_emit_status("canceled") if settings.BROKER_URL.startswith('amqp://'): self._force_cancel() return self.cancel_flag diff --git a/awx/main/routing.py b/awx/main/routing.py index 6156a295a9..67a08ff1bd 100644 --- a/awx/main/routing.py +++ b/awx/main/routing.py @@ -2,5 +2,6 @@ from channels.routing import route channel_routing = [ - route("websocket.connect", "awx.main.consumers.job_event_connect", path=r'^/job_event/(?P[a-zA-Z0-9_]+)/$'), + route("websocket.disconnect", "awx.main.consumers.ws_disconnect", path=r'^/websocket/$'), + route("websocket.receive", "awx.main.consumers.ws_receive", path=r'^/websocket/$'), ] diff --git a/awx/main/signals.py b/awx/main/signals.py index 7389f01763..fe31082a88 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -18,11 +18,13 @@ from crum.signals import current_user_getter # AWX from awx.main.models import * # noqa from awx.api.serializers import * # noqa -from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, emit_websocket_notification +from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates from awx.main.tasks import update_inventory_computed_fields from awx.main.conf import tower_settings +from awx.main.consumers import emit_channel_notification + __all__ = [] logger = logging.getLogger('awx.main.signals') @@ -33,13 +35,14 @@ logger = logging.getLogger('awx.main.signals') def emit_job_event_detail(sender, **kwargs): instance = kwargs['instance'] created = kwargs['created'] + print("before created job_event_detail") if created: event_serialized = JobEventSerializer(instance).data event_serialized['id'] = instance.id event_serialized["created"] = event_serialized["created"].isoformat() event_serialized["modified"] = event_serialized["modified"].isoformat() event_serialized["event_name"] = instance.event - emit_websocket_notification('/socket.io/job_events', 'job_events-' + str(instance.job.id), event_serialized) + emit_channel_notification('job_events-' + str(instance.job.id), event_serialized) def emit_ad_hoc_command_event_detail(sender, **kwargs): instance = kwargs['instance'] @@ -50,7 +53,7 @@ def emit_ad_hoc_command_event_detail(sender, **kwargs): event_serialized["created"] = event_serialized["created"].isoformat() event_serialized["modified"] = event_serialized["modified"].isoformat() event_serialized["event_name"] = instance.event - emit_websocket_notification('/socket.io/ad_hoc_command_events', 'ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized) + emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized) def emit_update_inventory_computed_fields(sender, **kwargs): logger.debug("In update inventory computed fields") diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 097dca517d..830804f217 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -51,13 +51,13 @@ from awx.main.queue import FifoQueue from awx.main.conf import tower_settings from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, - emit_websocket_notification, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) +from awx.main.consumers import emit_channel_notification __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', - 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', - 'handle_work_success', 'update_inventory_computed_fields', - 'send_notifications', 'run_administrative_checks', + 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', + 'handle_work_success', 'update_inventory_computed_fields', + 'send_notifications', 'run_administrative_checks', 'run_workflow_job'] HIDDEN_PASSWORD = '**********' @@ -176,8 +176,8 @@ def tower_periodic_scheduler(self): new_unified_job.status = 'failed' new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" new_unified_job.save(update_fields=['status', 'job_explanation']) - new_unified_job.socketio_emit_status("failed") - emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) + new_unified_job.websocket_emit_status("failed") + emit_channel_notification('schedules-changed', dict(id=schedule.id)) @task(queue='default') def notify_task_runner(metadata_dict): @@ -234,10 +234,16 @@ def handle_work_error(self, task_id, subtasks=None): instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ (first_instance_type, first_instance.name, first_instance.id) instance.save() - instance.socketio_emit_status("failed") - - if first_instance: - _send_notification_templates(first_instance, 'failed') + instance.websocket_emit_status("failed") + notification_body = first_task.notification_data() + notification_subject = "{} #{} '{}' failed on Ansible Tower: {}".format(first_task_friendly_name, + first_task_id, + smart_str(first_task_name), + notification_body['url']) + notification_body['friendly_name'] = first_task_friendly_name + send_notifications.delay([n.generate_notification(notification_subject, notification_body).id + for n in set(notification_templates.get('error', []) + notification_templates.get('any', []))], + job_id=first_task_id) @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): @@ -578,7 +584,7 @@ class BaseTask(Task): ''' instance = self.update_model(pk, status='running', celery_task_id=self.request.id) - instance.socketio_emit_status("running") + instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' output_replacements = [] try: @@ -647,7 +653,7 @@ class BaseTask(Task): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) - instance.socketio_emit_status(status) + instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute @@ -1665,7 +1671,7 @@ class RunSystemJob(BaseTask): return settings.BASE_DIR class RunWorkflowJob(BaseTask): - + name = 'awx.main.tasks.run_workflow_job' model = WorkflowJob