mirror of
https://github.com/ansible/awx.git
synced 2026-05-16 05:47:38 -02:30
Merge pull request #3657 from wwitzel3/jtabor-sockets
Switch to Django Channels
This commit is contained in:
39
awx/main/consumers.py
Normal file
39
awx/main/consumers.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import json
|
||||
|
||||
from channels import Group
|
||||
from channels.sessions import channel_session
|
||||
|
||||
|
||||
def discard_groups(message):
|
||||
if 'groups' in message.channel_session:
|
||||
for group in message.channel_session['groups']:
|
||||
Group(group).discard(message.reply_channel)
|
||||
|
||||
@channel_session
|
||||
def ws_disconnect(message):
|
||||
discard_groups(message)
|
||||
|
||||
@channel_session
|
||||
def ws_receive(message):
|
||||
raw_data = message.content['text']
|
||||
data = json.loads(raw_data)
|
||||
|
||||
if 'groups' in data:
|
||||
discard_groups(message)
|
||||
groups = data['groups']
|
||||
current_groups = message.channel_session.pop('groups') if 'groups' in message.channel_session else []
|
||||
for group_name,v in groups.items():
|
||||
if type(v) is list:
|
||||
for oid in v:
|
||||
name = '{}-{}'.format(group_name, oid)
|
||||
current_groups.append(name)
|
||||
Group(name).add(message.reply_channel)
|
||||
else:
|
||||
current_groups.append(group_name)
|
||||
Group(group_name).add(message.reply_channel)
|
||||
message.channel_session['groups'] = current_groups
|
||||
|
||||
|
||||
def emit_channel_notification(group, payload):
|
||||
payload = json.dumps(payload)
|
||||
Group(group).send({"text": json.dumps(payload)})
|
||||
22
awx/main/migrations/0039_v310_channelgroup.py
Normal file
22
awx/main/migrations/0039_v310_channelgroup.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0038_v310_workflow_rbac_prompts'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='ChannelGroup',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('group', models.CharField(unique=True, max_length=200)),
|
||||
('channels', models.TextField()),
|
||||
],
|
||||
),
|
||||
]
|
||||
@@ -22,6 +22,7 @@ from awx.main.models.notifications import * # noqa
|
||||
from awx.main.models.fact import * # noqa
|
||||
from awx.main.models.label import * # noqa
|
||||
from awx.main.models.workflow import * # noqa
|
||||
from awx.main.models.channels import * # noqa
|
||||
|
||||
# Monkeypatch Django serializer to ignore django-taggit fields (which break
|
||||
# the dumpdata command; see https://github.com/alex/django-taggit/issues/155).
|
||||
|
||||
5
awx/main/models/channels.py
Normal file
5
awx/main/models/channels.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from django.db import models
|
||||
|
||||
class ChannelGroup(models.Model):
|
||||
group = models.CharField(max_length=200, unique=True)
|
||||
channels = models.TextField()
|
||||
@@ -1222,7 +1222,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin):
|
||||
from awx.main.tasks import RunInventoryUpdate
|
||||
return RunInventoryUpdate
|
||||
|
||||
def socketio_emit_data(self):
|
||||
def websocket_emit_data(self):
|
||||
if self.inventory_source.group is not None:
|
||||
return dict(group_id=self.inventory_source.group.id)
|
||||
return {}
|
||||
|
||||
@@ -29,12 +29,13 @@ from awx.main.models.notifications import (
|
||||
JobNotificationMixin,
|
||||
)
|
||||
from awx.main.utils import decrypt_field, ignore_inventory_computed_fields
|
||||
from awx.main.utils import emit_websocket_notification
|
||||
from awx.main.redact import PlainTextCleaner
|
||||
from awx.main.fields import ImplicitRoleField
|
||||
from awx.main.models.mixins import ResourceMixin
|
||||
from awx.main.models.base import PERM_INVENTORY_SCAN
|
||||
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
|
||||
logger = logging.getLogger('awx.main.models.jobs')
|
||||
|
||||
@@ -1270,11 +1271,10 @@ class JobEvent(CreatedModifiedModel):
|
||||
if update_fields:
|
||||
host_summary.save(update_fields=update_fields)
|
||||
job.inventory.update_computed_fields()
|
||||
emit_websocket_notification('/socket.io/jobs', 'summary_complete', dict(unified_job_id=job.id))
|
||||
|
||||
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id))
|
||||
|
||||
@classmethod
|
||||
def start_event_queryset(cls, parent_task, starting_events, ordering=None):
|
||||
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
|
||||
'''
|
||||
We need to pull information about each start event.
|
||||
|
||||
@@ -1380,7 +1380,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
|
||||
from awx.main.tasks import RunSystemJob
|
||||
return RunSystemJob
|
||||
|
||||
def socketio_emit_data(self):
|
||||
def websocket_emit_data(self):
|
||||
return {}
|
||||
|
||||
def get_absolute_url(self):
|
||||
@@ -1421,4 +1421,3 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
|
||||
|
||||
def get_notification_friendly_name(self):
|
||||
return "System Job"
|
||||
|
||||
|
||||
@@ -407,7 +407,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin):
|
||||
return True
|
||||
return False
|
||||
|
||||
def socketio_emit_data(self):
|
||||
def websocket_emit_data(self):
|
||||
return dict(project_id=self.project.id)
|
||||
|
||||
@property
|
||||
|
||||
@@ -16,7 +16,8 @@ from jsonfield import JSONField
|
||||
|
||||
# AWX
|
||||
from awx.main.models.base import * # noqa
|
||||
from awx.main.utils import ignore_inventory_computed_fields, emit_websocket_notification
|
||||
from awx.main.utils import ignore_inventory_computed_fields
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
logger = logging.getLogger('awx.main.models.schedule')
|
||||
@@ -112,7 +113,7 @@ class Schedule(CommonModel):
|
||||
self.dtend = make_aware(datetime.datetime.strptime(until_date, "%Y%m%dT%H%M%SZ"), get_default_timezone())
|
||||
if 'count' in self.rrule.lower():
|
||||
self.dtend = future_rs[-1]
|
||||
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=self.id))
|
||||
emit_channel_notification('schedules-changed', dict(id=self.id, group_name='schedules'))
|
||||
with ignore_inventory_computed_fields():
|
||||
self.unified_job_template.update_computed_fields()
|
||||
|
||||
|
||||
@@ -32,8 +32,9 @@ from djcelery.models import TaskMeta
|
||||
# AWX
|
||||
from awx.main.models.base import * # noqa
|
||||
from awx.main.models.schedules import Schedule
|
||||
from awx.main.utils import decrypt_field, emit_websocket_notification, _inventory_updates
|
||||
from awx.main.redact import UriCleaner, REPLACE_STR
|
||||
from awx.main.utils import decrypt_field, _inventory_updates
|
||||
from awx.main.redact import UriCleaner
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
|
||||
|
||||
@@ -774,14 +775,15 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
''' Given another task object determine if this task would be blocked by it '''
|
||||
raise NotImplementedError # Implement in subclass.
|
||||
|
||||
def socketio_emit_data(self):
|
||||
def websocket_emit_data(self):
|
||||
''' Return extra data that should be included when submitting data to the browser over the websocket connection '''
|
||||
return {}
|
||||
|
||||
def socketio_emit_status(self, status):
|
||||
def websocket_emit_status(self, status):
|
||||
status_data = dict(unified_job_id=self.id, status=status)
|
||||
status_data.update(self.socketio_emit_data())
|
||||
emit_websocket_notification('/socket.io/jobs', 'status_changed', status_data)
|
||||
status_data.update(self.websocket_emit_data())
|
||||
status_data['group_name'] = 'jobs'
|
||||
emit_channel_notification('jobs-status_changed', status_data)
|
||||
|
||||
def generate_dependencies(self, active_tasks):
|
||||
''' Generate any tasks that the current task might be dependent on given a list of active
|
||||
@@ -859,7 +861,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
|
||||
# Save the pending status, and inform the SocketIO listener.
|
||||
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
||||
self.socketio_emit_status("pending")
|
||||
self.websocket_emit_status("pending")
|
||||
|
||||
from awx.main.scheduler.tasks import run_job_launch
|
||||
run_job_launch.delay(self.id)
|
||||
@@ -912,7 +914,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
instance.job_explanation = 'Forced cancel'
|
||||
update_fields.append('job_explanation')
|
||||
instance.save(update_fields=update_fields)
|
||||
self.socketio_emit_status("canceled")
|
||||
self.websocket_emit_status("canceled")
|
||||
except: # FIXME: Log this exception!
|
||||
if settings.DEBUG:
|
||||
raise
|
||||
@@ -926,8 +928,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
self.status = 'canceled'
|
||||
cancel_fields.append('status')
|
||||
self.save(update_fields=cancel_fields)
|
||||
self.socketio_emit_status("canceled")
|
||||
self.websocket_emit_status("canceled")
|
||||
if settings.BROKER_URL.startswith('amqp://'):
|
||||
self._force_cancel()
|
||||
return self.cancel_flag
|
||||
|
||||
|
||||
7
awx/main/routing.py
Normal file
7
awx/main/routing.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from channels.routing import route
|
||||
|
||||
|
||||
channel_routing = [
|
||||
route("websocket.disconnect", "awx.main.consumers.ws_disconnect", path=r'^/websocket/$'),
|
||||
route("websocket.receive", "awx.main.consumers.ws_receive", path=r'^/websocket/$'),
|
||||
]
|
||||
@@ -62,7 +62,7 @@ def spawn_workflow_graph_jobs(workflow_jobs):
|
||||
job.status = 'failed'
|
||||
job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials"
|
||||
job.save(update_fields=['status', 'job_explanation'])
|
||||
job.socketio_emit_status("failed")
|
||||
job.websocket_emit_status("failed")
|
||||
|
||||
# TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ?
|
||||
#emit_websocket_notification('/socket.io/jobs', '', dict(id=))
|
||||
@@ -76,7 +76,7 @@ def process_finished_workflow_jobs(workflow_jobs):
|
||||
# TODO: detect if wfj failed
|
||||
workflow_job.status = 'completed'
|
||||
workflow_job.save()
|
||||
workflow_job.socketio_emit_status('completed')
|
||||
workflow_job.websocket_emit_status('completed')
|
||||
|
||||
def rebuild_graph():
|
||||
"""Regenerate the task graph by refreshing known tasks from Tower, purging
|
||||
@@ -120,7 +120,7 @@ def rebuild_graph():
|
||||
logger.debug("Active celery tasks: " + str(active_tasks))
|
||||
for task in list(running_celery_tasks):
|
||||
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||
# NOTE: Pull status again and make sure it didn't finish in
|
||||
# NOTE: Pull status again and make sure it didn't finish in
|
||||
# the meantime?
|
||||
task.status = 'failed'
|
||||
task.job_explanation += ' '.join((
|
||||
@@ -128,8 +128,8 @@ def rebuild_graph():
|
||||
'Celery, so it has been marked as failed.',
|
||||
))
|
||||
task.save()
|
||||
task.socketio_emit_status("failed")
|
||||
running_tasks.pop(task)
|
||||
task.websocket_emit_status("failed")
|
||||
running_tasks.pop(running_tasks.index(task))
|
||||
logger.error("Task %s appears orphaned... marking as failed" % task)
|
||||
|
||||
# Create and process dependencies for new tasks
|
||||
@@ -142,7 +142,7 @@ def rebuild_graph():
|
||||
task.status = 'failed'
|
||||
task.job_explanation += 'Task failed to generate dependencies: {}'.format(e)
|
||||
task.save()
|
||||
task.socketio_emit_status("failed")
|
||||
task.websocket_emit_status("failed")
|
||||
continue
|
||||
logger.debug("New dependencies: %s" % str(task_dependencies))
|
||||
for dep in task_dependencies:
|
||||
@@ -202,7 +202,7 @@ def process_graph(graph, task_capacity):
|
||||
|
||||
node_type = graph.get_node_type(node_obj)
|
||||
if node_type == 'job':
|
||||
# clear dependencies because a job can block (not necessarily
|
||||
# clear dependencies because a job can block (not necessarily
|
||||
# depend) on other jobs that share the same job template
|
||||
node_dependencies = []
|
||||
|
||||
@@ -215,7 +215,7 @@ def process_graph(graph, task_capacity):
|
||||
node_obj.start()
|
||||
spawn_workflow_graph_jobs([node_obj])
|
||||
return process_graph(graph, task_capacity)
|
||||
|
||||
|
||||
dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \
|
||||
[{'type': graph.get_node_type(n['node_object']),
|
||||
'id': n['node_object'].id} for n in node_dependencies]
|
||||
|
||||
@@ -19,10 +19,12 @@ from crum.signals import current_user_getter
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.api.serializers import * # noqa
|
||||
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, emit_websocket_notification
|
||||
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore
|
||||
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
|
||||
from awx.main.tasks import update_inventory_computed_fields
|
||||
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
__all__ = []
|
||||
|
||||
logger = logging.getLogger('awx.main.signals')
|
||||
@@ -33,13 +35,15 @@ logger = logging.getLogger('awx.main.signals')
|
||||
def emit_job_event_detail(sender, **kwargs):
|
||||
instance = kwargs['instance']
|
||||
created = kwargs['created']
|
||||
print("before created job_event_detail")
|
||||
if created:
|
||||
event_serialized = JobEventSerializer(instance).data
|
||||
event_serialized['id'] = instance.id
|
||||
event_serialized["created"] = event_serialized["created"].isoformat()
|
||||
event_serialized["modified"] = event_serialized["modified"].isoformat()
|
||||
event_serialized["event_name"] = instance.event
|
||||
emit_websocket_notification('/socket.io/job_events', 'job_events-' + str(instance.job.id), event_serialized)
|
||||
event_serialized["group_name"] = "job_events"
|
||||
emit_channel_notification('job_events-' + str(instance.job.id), event_serialized)
|
||||
|
||||
def emit_ad_hoc_command_event_detail(sender, **kwargs):
|
||||
instance = kwargs['instance']
|
||||
@@ -50,7 +54,8 @@ def emit_ad_hoc_command_event_detail(sender, **kwargs):
|
||||
event_serialized["created"] = event_serialized["created"].isoformat()
|
||||
event_serialized["modified"] = event_serialized["modified"].isoformat()
|
||||
event_serialized["event_name"] = instance.event
|
||||
emit_websocket_notification('/socket.io/ad_hoc_command_events', 'ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized)
|
||||
event_serialized["group_name"] = "ad_hoc_command_events"
|
||||
emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized)
|
||||
|
||||
def emit_update_inventory_computed_fields(sender, **kwargs):
|
||||
logger.debug("In update inventory computed fields")
|
||||
|
||||
@@ -49,13 +49,13 @@ from awx.main.models import * # noqa
|
||||
from awx.main.models import UnifiedJob
|
||||
from awx.main.task_engine import TaskEnhancer
|
||||
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
||||
emit_websocket_notification,
|
||||
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot)
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
|
||||
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
|
||||
'handle_work_success', 'update_inventory_computed_fields',
|
||||
'send_notifications', 'run_administrative_checks',
|
||||
'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error',
|
||||
'handle_work_success', 'update_inventory_computed_fields',
|
||||
'send_notifications', 'run_administrative_checks',
|
||||
'RunJobLaunch']
|
||||
|
||||
HIDDEN_PASSWORD = '**********'
|
||||
@@ -183,8 +183,8 @@ def tower_periodic_scheduler(self):
|
||||
new_unified_job.status = 'failed'
|
||||
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
|
||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||
new_unified_job.socketio_emit_status("failed")
|
||||
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id))
|
||||
new_unified_job.websocket_emit_status("failed")
|
||||
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
||||
|
||||
def _send_notification_templates(instance, status_str):
|
||||
if status_str not in ['succeeded', 'failed']:
|
||||
@@ -237,11 +237,11 @@ def handle_work_error(self, task_id, subtasks=None):
|
||||
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
||||
(first_instance_type, first_instance.name, first_instance.id)
|
||||
instance.save()
|
||||
instance.socketio_emit_status("failed")
|
||||
instance.websocket_emit_status("failed")
|
||||
|
||||
if first_instance:
|
||||
_send_notification_templates(first_instance, 'failed')
|
||||
|
||||
|
||||
# We only send 1 job complete message since all the job completion message
|
||||
# handling does is trigger the scheduler. If we extend the functionality of
|
||||
# what the job complete message handler does then we may want to send a
|
||||
@@ -590,7 +590,7 @@ class BaseTask(Task):
|
||||
'''
|
||||
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
||||
|
||||
instance.socketio_emit_status("running")
|
||||
instance.websocket_emit_status("running")
|
||||
status, rc, tb = 'error', None, ''
|
||||
output_replacements = []
|
||||
try:
|
||||
@@ -659,7 +659,7 @@ class BaseTask(Task):
|
||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||
output_replacements=output_replacements)
|
||||
self.post_run_hook(instance, **kwargs)
|
||||
instance.socketio_emit_status(status)
|
||||
instance.websocket_emit_status(status)
|
||||
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
# Raising an exception will mark the job as 'failed' in celery
|
||||
# and will stop a task chain from continuing to execute
|
||||
@@ -1678,7 +1678,7 @@ class RunSystemJob(BaseTask):
|
||||
|
||||
'''
|
||||
class RunWorkflowJob(BaseTask):
|
||||
|
||||
|
||||
name = 'awx.main.tasks.run_workflow_job'
|
||||
model = WorkflowJob
|
||||
|
||||
@@ -1691,14 +1691,14 @@ class RunWorkflowJob(BaseTask):
|
||||
# complete. Instead, the workflow job should return or never even run,
|
||||
# because all of the "launch logic" can be done schedule().
|
||||
|
||||
# However, other aspects of our system depend on a 1-1 relationship
|
||||
# However, other aspects of our system depend on a 1-1 relationship
|
||||
# between a Job and a Celery Task.
|
||||
#
|
||||
#
|
||||
# * If we let the workflow job task (RunWorkflowJob.run()) complete
|
||||
# then how do we trigger the handle_work_error and
|
||||
# then how do we trigger the handle_work_error and
|
||||
# handle_work_success subtasks?
|
||||
#
|
||||
# * How do we handle the recovery process? (i.e. there is an entry in
|
||||
# * How do we handle the recovery process? (i.e. there is an entry in
|
||||
# the database but not in celery).
|
||||
while True:
|
||||
dag = WorkflowDAG(instance)
|
||||
|
||||
@@ -491,19 +491,6 @@ def get_system_task_capacity():
|
||||
return 50 + ((int(total_mem_value) / 1024) - 2) * 75
|
||||
|
||||
|
||||
def emit_websocket_notification(endpoint, event, payload, token_key=None):
|
||||
from awx.main.socket_queue import Socket
|
||||
|
||||
try:
|
||||
with Socket('websocket', 'w', nowait=True, logger=logger) as websocket:
|
||||
if token_key:
|
||||
payload['token_key'] = token_key
|
||||
payload['event'] = event
|
||||
payload['endpoint'] = endpoint
|
||||
websocket.publish(payload)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_inventory_updates = threading.local()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user