converting from socketio to channels websocket

This commit is contained in:
Wayne Witzel III
2016-08-11 15:06:07 -04:00
parent 42aab8ab83
commit 4c8aaf1aed
11 changed files with 83 additions and 64 deletions

View File

@@ -72,8 +72,8 @@ from awx.api.permissions import * # noqa
from awx.api.renderers import * # noqa from awx.api.renderers import * # noqa
from awx.api.serializers import * # noqa from awx.api.serializers import * # noqa
from awx.api.metadata import RoleMetadata from awx.api.metadata import RoleMetadata
from awx.main.utils import emit_websocket_notification
from awx.main.conf import tower_settings from awx.main.conf import tower_settings
from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.api.views') logger = logging.getLogger('awx.api.views')
@@ -544,11 +544,7 @@ class AuthTokenView(APIView):
# Mark them as invalid and inform the user # Mark them as invalid and inform the user
invalid_tokens = AuthToken.get_tokens_over_limit(serializer.validated_data['user']) invalid_tokens = AuthToken.get_tokens_over_limit(serializer.validated_data['user'])
for t in invalid_tokens: for t in invalid_tokens:
# TODO: send socket notification emit_channel_notification('control-limit_reached', dict(reason=force_text(AuthToken.reason_long('limit_reached')), token_key=t.key))
emit_websocket_notification('/socket.io/control',
'limit_reached',
dict(reason=force_text(AuthToken.reason_long('limit_reached'))),
token_key=t.key)
t.invalidate(reason='limit_reached') t.invalidate(reason='limit_reached')
# Note: This header is normally added in the middleware whenever an # Note: This header is normally added in the middleware whenever an
@@ -3183,21 +3179,8 @@ class JobJobTasksList(BaseJobEventsList):
return ({'detail': 'Parent event not found.'}, -1, status.HTTP_404_NOT_FOUND) return ({'detail': 'Parent event not found.'}, -1, status.HTTP_404_NOT_FOUND)
parent_task = parent_task[0] parent_task = parent_task[0]
# Some events correspond to a playbook or task starting up,
# and these are what we're interested in here.
STARTING_EVENTS = ('playbook_on_task_start', 'playbook_on_setup') STARTING_EVENTS = ('playbook_on_task_start', 'playbook_on_setup')
queryset = JobEvent.get_startevent_queryset(parent_task, STARTING_EVENTS)
# We need to pull information about each start event.
#
# This is super tricky, because this table has a one-to-many
# relationship with itself (parent-child), and we're getting
# information for an arbitrary number of children. This means we
# need stats on grandchildren, sorted by child.
queryset = (JobEvent.objects.filter(parent__parent=parent_task,
parent__event__in=STARTING_EVENTS)
.values('parent__id', 'event', 'changed')
.annotate(num=Count('event'))
.order_by('parent__id'))
# The data above will come back in a list, but we are going to # The data above will come back in a list, but we are going to
# want to access it based on the parent id, so map it into a # want to access it based on the parent id, so map it into a

View File

@@ -1,13 +1,37 @@
import json
from channels import Group from channels import Group
from channels.sessions import channel_session from channels.sessions import channel_session
@channel_session @channel_session
def job_event_connect(message): def ws_disconnect(message):
job_id = message.content['path'].strip('/') for group in message.channel_session['groups']:
message.channel_session['job_id'] = job_id print("removing from group: {}".format(group))
Group("job_events-%s" % job_id).add(message.reply_channel) Group(group).discard(message.reply_channel)
def emit_channel_notification(event, payload): @channel_session
Group(event).send(payload) def ws_receive(message):
raw_data = message.content['text']
data = json.loads(raw_data)
if 'groups' in data:
groups = data['groups']
current_groups = message.channel_session.pop('groups') if 'groups' in message.channel_session else []
for group_name,v in groups.items():
if type(v) is list:
for oid in v:
name = '{}-{}'.format(group_name, oid)
print("listening to group: {}".format(name))
current_groups.append(name)
Group(name).add(message.reply_channel)
else:
print("listening to group: {}".format(group_name))
current_groups.append(name)
Group(group_name).add(message.reply_channel)
message.channel_session['groups'] = current_groups
def emit_channel_notification(group, payload):
print("sending message to group {}".format(group))
Group(group).send({"text": json.dumps(payload)})

View File

@@ -318,7 +318,7 @@ def rebuild_graph(message):
logger.debug("Active celery tasks: " + str(active_tasks)) logger.debug("Active celery tasks: " + str(active_tasks))
for task in list(running_tasks): for task in list(running_tasks):
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# NOTE: Pull status again and make sure it didn't finish in # NOTE: Pull status again and make sure it didn't finish in
# the meantime? # the meantime?
task.status = 'failed' task.status = 'failed'
task.job_explanation += ' '.join(( task.job_explanation += ' '.join((
@@ -326,7 +326,7 @@ def rebuild_graph(message):
'Celery, so it has been marked as failed.', 'Celery, so it has been marked as failed.',
)) ))
task.save() task.save()
task.socketio_emit_status("failed") task.websocket_emit_status("failed")
running_tasks.pop(running_tasks.index(task)) running_tasks.pop(running_tasks.index(task))
logger.error("Task %s appears orphaned... marking as failed" % task) logger.error("Task %s appears orphaned... marking as failed" % task)
@@ -340,7 +340,7 @@ def rebuild_graph(message):
task.status = 'failed' task.status = 'failed'
task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) task.job_explanation += 'Task failed to generate dependencies: {}'.format(e)
task.save() task.save()
task.socketio_emit_status("failed") task.websocket_emit_status("failed")
continue continue
logger.debug("New dependencies: %s" % str(task_dependencies)) logger.debug("New dependencies: %s" % str(task_dependencies))
for dep in task_dependencies: for dep in task_dependencies:

View File

@@ -1223,7 +1223,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin):
from awx.main.tasks import RunInventoryUpdate from awx.main.tasks import RunInventoryUpdate
return RunInventoryUpdate return RunInventoryUpdate
def socketio_emit_data(self): def websocket_emit_data(self):
if self.inventory_source.group is not None: if self.inventory_source.group is not None:
return dict(group_id=self.inventory_source.group.id) return dict(group_id=self.inventory_source.group.id)
return {} return {}

View File

@@ -29,12 +29,13 @@ from awx.main.models.notifications import (
JobNotificationMixin, JobNotificationMixin,
) )
from awx.main.utils import decrypt_field, ignore_inventory_computed_fields from awx.main.utils import decrypt_field, ignore_inventory_computed_fields
from awx.main.utils import emit_websocket_notification
from awx.main.redact import PlainTextCleaner from awx.main.redact import PlainTextCleaner
from awx.main.conf import tower_settings from awx.main.conf import tower_settings
from awx.main.fields import ImplicitRoleField from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin from awx.main.models.mixins import ResourceMixin
from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.main.models.jobs') logger = logging.getLogger('awx.main.models.jobs')
@@ -1259,11 +1260,10 @@ class JobEvent(CreatedModifiedModel):
if update_fields: if update_fields:
host_summary.save(update_fields=update_fields) host_summary.save(update_fields=update_fields)
job.inventory.update_computed_fields() job.inventory.update_computed_fields()
emit_websocket_notification('/socket.io/jobs', 'summary_complete', dict(unified_job_id=job.id)) emit_channel_notification('jobs-summary', dict(unified_job_id=job.id))
@classmethod @classmethod
def start_event_queryset(cls, parent_task, starting_events, ordering=None): def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
''' '''
We need to pull information about each start event. We need to pull information about each start event.
@@ -1369,7 +1369,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
from awx.main.tasks import RunSystemJob from awx.main.tasks import RunSystemJob
return RunSystemJob return RunSystemJob
def socketio_emit_data(self): def websocket_emit_data(self):
return {} return {}
def get_absolute_url(self): def get_absolute_url(self):

View File

@@ -408,7 +408,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
return True return True
return False return False
def socketio_emit_data(self): def websocket_emit_data(self):
return dict(project_id=self.project.id) return dict(project_id=self.project.id)
@property @property

View File

@@ -16,7 +16,8 @@ from jsonfield import JSONField
# AWX # AWX
from awx.main.models.base import * # noqa from awx.main.models.base import * # noqa
from awx.main.utils import ignore_inventory_computed_fields, emit_websocket_notification from awx.main.utils import ignore_inventory_computed_fields
from awx.main.consumers import emit_channel_notification
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
logger = logging.getLogger('awx.main.models.schedule') logger = logging.getLogger('awx.main.models.schedule')
@@ -112,7 +113,7 @@ class Schedule(CommonModel):
self.dtend = make_aware(datetime.datetime.strptime(until_date, "%Y%m%dT%H%M%SZ"), get_default_timezone()) self.dtend = make_aware(datetime.datetime.strptime(until_date, "%Y%m%dT%H%M%SZ"), get_default_timezone())
if 'count' in self.rrule.lower(): if 'count' in self.rrule.lower():
self.dtend = future_rs[-1] self.dtend = future_rs[-1]
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=self.id)) emit_channel_notification('schedules-changed', dict(id=self.id))
with ignore_inventory_computed_fields(): with ignore_inventory_computed_fields():
self.unified_job_template.update_computed_fields() self.unified_job_template.update_computed_fields()

View File

@@ -32,8 +32,9 @@ from djcelery.models import TaskMeta
# AWX # AWX
from awx.main.models.base import * # noqa from awx.main.models.base import * # noqa
from awx.main.models.schedules import Schedule from awx.main.models.schedules import Schedule
from awx.main.utils import decrypt_field, emit_websocket_notification, _inventory_updates from awx.main.utils import decrypt_field, _inventory_updates
from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.redact import UriCleaner
from awx.main.consumers import emit_channel_notification
__all__ = ['UnifiedJobTemplate', 'UnifiedJob'] __all__ = ['UnifiedJobTemplate', 'UnifiedJob']
@@ -774,14 +775,14 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
''' Given another task object determine if this task would be blocked by it ''' ''' Given another task object determine if this task would be blocked by it '''
raise NotImplementedError # Implement in subclass. raise NotImplementedError # Implement in subclass.
def socketio_emit_data(self): def websocket_emit_data(self):
''' Return extra data that should be included when submitting data to the browser over the websocket connection ''' ''' Return extra data that should be included when submitting data to the browser over the websocket connection '''
return {} return {}
def socketio_emit_status(self, status): def websocket_emit_status(self, status):
status_data = dict(unified_job_id=self.id, status=status) status_data = dict(unified_job_id=self.id, status=status)
status_data.update(self.socketio_emit_data()) status_data.update(self.websocket_emit_data())
emit_websocket_notification('/socket.io/jobs', 'status_changed', status_data) emit_channel_notification('jobs-status_changed', status_data)
def generate_dependencies(self, active_tasks): def generate_dependencies(self, active_tasks):
''' Generate any tasks that the current task might be dependent on given a list of active ''' Generate any tasks that the current task might be dependent on given a list of active
@@ -850,7 +851,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# Save the pending status, and inform the SocketIO listener. # Save the pending status, and inform the SocketIO listener.
self.update_fields(start_args=json.dumps(kwargs), status='pending') self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.socketio_emit_status("pending") self.websocket_emit_status("pending")
# Each type of unified job has a different Task class; get the # Each type of unified job has a different Task class; get the
# appropirate one. # appropirate one.
@@ -900,7 +901,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
instance.job_explanation = 'Forced cancel' instance.job_explanation = 'Forced cancel'
update_fields.append('job_explanation') update_fields.append('job_explanation')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
self.socketio_emit_status("canceled") self.websocket_emit_status("canceled")
except: # FIXME: Log this exception! except: # FIXME: Log this exception!
if settings.DEBUG: if settings.DEBUG:
raise raise
@@ -914,7 +915,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
self.status = 'canceled' self.status = 'canceled'
cancel_fields.append('status') cancel_fields.append('status')
self.save(update_fields=cancel_fields) self.save(update_fields=cancel_fields)
self.socketio_emit_status("canceled") self.websocket_emit_status("canceled")
if settings.BROKER_URL.startswith('amqp://'): if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel() self._force_cancel()
return self.cancel_flag return self.cancel_flag

View File

@@ -2,5 +2,6 @@ from channels.routing import route
channel_routing = [ channel_routing = [
route("websocket.connect", "awx.main.consumers.job_event_connect", path=r'^/job_event/(?P<id>[a-zA-Z0-9_]+)/$'), route("websocket.disconnect", "awx.main.consumers.ws_disconnect", path=r'^/websocket/$'),
route("websocket.receive", "awx.main.consumers.ws_receive", path=r'^/websocket/$'),
] ]

View File

@@ -18,11 +18,13 @@ from crum.signals import current_user_getter
# AWX # AWX
from awx.main.models import * # noqa from awx.main.models import * # noqa
from awx.api.serializers import * # noqa from awx.api.serializers import * # noqa
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, emit_websocket_notification from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks import update_inventory_computed_fields from awx.main.tasks import update_inventory_computed_fields
from awx.main.conf import tower_settings from awx.main.conf import tower_settings
from awx.main.consumers import emit_channel_notification
__all__ = [] __all__ = []
logger = logging.getLogger('awx.main.signals') logger = logging.getLogger('awx.main.signals')
@@ -33,13 +35,14 @@ logger = logging.getLogger('awx.main.signals')
def emit_job_event_detail(sender, **kwargs): def emit_job_event_detail(sender, **kwargs):
instance = kwargs['instance'] instance = kwargs['instance']
created = kwargs['created'] created = kwargs['created']
print("before created job_event_detail")
if created: if created:
event_serialized = JobEventSerializer(instance).data event_serialized = JobEventSerializer(instance).data
event_serialized['id'] = instance.id event_serialized['id'] = instance.id
event_serialized["created"] = event_serialized["created"].isoformat() event_serialized["created"] = event_serialized["created"].isoformat()
event_serialized["modified"] = event_serialized["modified"].isoformat() event_serialized["modified"] = event_serialized["modified"].isoformat()
event_serialized["event_name"] = instance.event event_serialized["event_name"] = instance.event
emit_websocket_notification('/socket.io/job_events', 'job_events-' + str(instance.job.id), event_serialized) emit_channel_notification('job_events-' + str(instance.job.id), event_serialized)
def emit_ad_hoc_command_event_detail(sender, **kwargs): def emit_ad_hoc_command_event_detail(sender, **kwargs):
instance = kwargs['instance'] instance = kwargs['instance']
@@ -50,7 +53,7 @@ def emit_ad_hoc_command_event_detail(sender, **kwargs):
event_serialized["created"] = event_serialized["created"].isoformat() event_serialized["created"] = event_serialized["created"].isoformat()
event_serialized["modified"] = event_serialized["modified"].isoformat() event_serialized["modified"] = event_serialized["modified"].isoformat()
event_serialized["event_name"] = instance.event event_serialized["event_name"] = instance.event
emit_websocket_notification('/socket.io/ad_hoc_command_events', 'ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized) emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized)
def emit_update_inventory_computed_fields(sender, **kwargs): def emit_update_inventory_computed_fields(sender, **kwargs):
logger.debug("In update inventory computed fields") logger.debug("In update inventory computed fields")

View File

@@ -51,13 +51,13 @@ from awx.main.queue import FifoQueue
from awx.main.conf import tower_settings from awx.main.conf import tower_settings
from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
emit_websocket_notification,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) check_proot_installed, build_proot_temp_dir, wrap_args_with_proot)
from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
'handle_work_success', 'update_inventory_computed_fields', 'handle_work_success', 'update_inventory_computed_fields',
'send_notifications', 'run_administrative_checks', 'send_notifications', 'run_administrative_checks',
'run_workflow_job'] 'run_workflow_job']
HIDDEN_PASSWORD = '**********' HIDDEN_PASSWORD = '**********'
@@ -176,8 +176,8 @@ def tower_periodic_scheduler(self):
new_unified_job.status = 'failed' new_unified_job.status = 'failed'
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation']) new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.socketio_emit_status("failed") new_unified_job.websocket_emit_status("failed")
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) emit_channel_notification('schedules-changed', dict(id=schedule.id))
@task(queue='default') @task(queue='default')
def notify_task_runner(metadata_dict): def notify_task_runner(metadata_dict):
@@ -234,10 +234,16 @@ def handle_work_error(self, task_id, subtasks=None):
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \ instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
(first_instance_type, first_instance.name, first_instance.id) (first_instance_type, first_instance.name, first_instance.id)
instance.save() instance.save()
instance.socketio_emit_status("failed") instance.websocket_emit_status("failed")
notification_body = first_task.notification_data()
if first_instance: notification_subject = "{} #{} '{}' failed on Ansible Tower: {}".format(first_task_friendly_name,
_send_notification_templates(first_instance, 'failed') first_task_id,
smart_str(first_task_name),
notification_body['url'])
notification_body['friendly_name'] = first_task_friendly_name
send_notifications.delay([n.generate_notification(notification_subject, notification_body).id
for n in set(notification_templates.get('error', []) + notification_templates.get('any', []))],
job_id=first_task_id)
@task(queue='default') @task(queue='default')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True): def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
@@ -578,7 +584,7 @@ class BaseTask(Task):
''' '''
instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.socketio_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''
output_replacements = [] output_replacements = []
try: try:
@@ -647,7 +653,7 @@ class BaseTask(Task):
instance = self.update_model(pk, status=status, result_traceback=tb, instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements) output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs) self.post_run_hook(instance, **kwargs)
instance.socketio_emit_status(status) instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery # Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute # and will stop a task chain from continuing to execute
@@ -1665,7 +1671,7 @@ class RunSystemJob(BaseTask):
return settings.BASE_DIR return settings.BASE_DIR
class RunWorkflowJob(BaseTask): class RunWorkflowJob(BaseTask):
name = 'awx.main.tasks.run_workflow_job' name = 'awx.main.tasks.run_workflow_job'
model = WorkflowJob model = WorkflowJob