diff --git a/awx/main/consumers.py b/awx/main/consumers.py index bc7aa47089..03173ceffd 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -5,6 +5,7 @@ import logging import codecs import datetime import hmac +import asyncio from django.utils.encoding import force_bytes from django.utils.encoding import smart_str @@ -200,28 +201,66 @@ class EventConsumer(AsyncJsonWebsocketConsumer): await self.send(event['text']) -def emit_channel_notification(group, payload): +def run_sync(func): + event_loop = None try: - payload = json.dumps(payload, cls=DjangoJSONEncoder) + event_loop = asyncio.get_event_loop() + except RuntimeError: + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + return event_loop.run_until_complete(func) + + +def _dump_payload(payload): + try: + return json.dumps(payload, cls=DjangoJSONEncoder) except ValueError: - logger.error("Invalid payload emitting channel {} on topic: {}".format(group, payload)) + logger.error("Invalid payload to emit") + return None + + +async def emit_channel_notification_async(group, payload): + payload_dumped = _dump_payload(payload) + if payload_dumped is None: + return + + channel_layer = get_channel_layer() + await channel_layer.group_send( + group, + { + "type": "internal.message", + "text": payload_dumped + }, + ) + + await channel_layer.group_send( + BROADCAST_GROUP, + { + "type": "internal.message", + "text": wrap_broadcast_msg(group, payload_dumped), + }, + ) + + +def emit_channel_notification(group, payload): + payload_dumped = _dump_payload(payload) + if payload_dumped is None: return channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( + run_sync(channel_layer.group_send( group, { "type": "internal.message", - "text": payload + "text": payload_dumped }, - ) + )) - async_to_sync(channel_layer.group_send)( + run_sync(channel_layer.group_send( BROADCAST_GROUP, { "type": "internal.message", - "text": wrap_broadcast_msg(group, payload), + "text": wrap_broadcast_msg(group, payload_dumped), }, - ) - + ))