fix flake8

This commit is contained in:
chris meyers
2020-02-21 15:36:27 -05:00
committed by Ryan Petrello
parent e25bd931a1
commit 093d204d19
12 changed files with 22 additions and 54 deletions

View File

@@ -41,6 +41,7 @@ class MetricsView(APIView):
return Response(metrics().decode('UTF-8')) return Response(metrics().decode('UTF-8'))
raise PermissionDenied() raise PermissionDenied()
class BroadcastWebsocketMetricsView(APIView): class BroadcastWebsocketMetricsView(APIView):
name = _('Broadcast Websockets') name = _('Broadcast Websockets')
swagger_topic = 'Broadcast Websockets' swagger_topic = 'Broadcast Websockets'

View File

@@ -1,8 +1,6 @@
import datetime import datetime
import os
import asyncio import asyncio
import logging import logging
import json
import aioredis import aioredis
import redis import redis
@@ -11,10 +9,7 @@ from prometheus_client import (
Gauge, Gauge,
Counter, Counter,
Enum, Enum,
Histogram,
Enum,
CollectorRegistry, CollectorRegistry,
parser,
) )
from django.conf import settings from django.conf import settings
@@ -50,7 +45,7 @@ class FixedSlidingWindow():
del self.buckets[k] del self.buckets[k]
def record(self, ts=datetime.datetime.now()): def record(self, ts=datetime.datetime.now()):
now_bucket = int((ts-datetime.datetime(1970,1,1)).total_seconds()) now_bucket = int((ts - datetime.datetime(1970,1,1)).total_seconds())
val = self.buckets.get(now_bucket, 0) val = self.buckets.get(now_bucket, 0)
self.buckets[now_bucket] = val + 1 self.buckets[now_bucket] = val + 1
@@ -118,15 +113,15 @@ class BroadcastWebsocketStats():
'Number of messages received, to be forwarded, by the broadcast websocket system', 'Number of messages received, to be forwarded, by the broadcast websocket system',
registry=self._registry) registry=self._registry)
self._messages_received = Gauge(f'awx_{self.remote_name}_messages_received', self._messages_received = Gauge(f'awx_{self.remote_name}_messages_received',
'Number of messages received, to be forwarded, by the broadcast websocket system, for the duration of the current connection', 'Number forwarded messages received by the broadcast websocket system, for the duration of the current connection',
registry=self._registry) registry=self._registry)
self._connection = Enum(f'awx_{self.remote_name}_connection', self._connection = Enum(f'awx_{self.remote_name}_connection',
'Websocket broadcast connection', 'Websocket broadcast connection',
states=['disconnected', 'connected'], states=['disconnected', 'connected'],
registry=self._registry) registry=self._registry)
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start', self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
'Time the connection was established', 'Time the connection was established',
registry=self._registry) registry=self._registry)
self._messages_received_per_minute = Gauge(f'awx_{self.remote_name}_messages_received_per_minute', self._messages_received_per_minute = Gauge(f'awx_{self.remote_name}_messages_received_per_minute',
'Messages received per minute', 'Messages received per minute',
@@ -160,6 +155,5 @@ class BroadcastWebsocketStats():
def serialize(self): def serialize(self):
self.render() self.render()
data = {}
registry_data = generate_latest(self._registry).decode('UTF-8') registry_data = generate_latest(self._registry).decode('UTF-8')
return registry_data return registry_data

View File

@@ -1,15 +1,9 @@
import os
import json import json
import logging import logging
import codecs
import datetime import datetime
import hmac import hmac
import asyncio import asyncio
from django.utils.encoding import force_bytes
from django.utils.encoding import smart_str
from django.http.cookie import parse_cookie
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings from django.conf import settings
from django.utils.encoding import force_bytes from django.utils.encoding import force_bytes
@@ -19,8 +13,6 @@ from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from channels.db import database_sync_to_async from channels.db import database_sync_to_async
from asgiref.sync import async_to_sync
logger = logging.getLogger('awx.main.consumers') logger = logging.getLogger('awx.main.consumers')
XRF_KEY = '_auth_user_xrf' XRF_KEY = '_auth_user_xrf'
@@ -37,7 +29,7 @@ class WebsocketSecretAuthHelper:
@classmethod @classmethod
def construct_secret(cls): def construct_secret(cls):
nonce_serialized = "{}".format(int((datetime.datetime.utcnow()-datetime.datetime.fromtimestamp(0)).total_seconds())) nonce_serialized = "{}".format(int((datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0)).total_seconds()))
payload_dict = { payload_dict = {
'secret': settings.BROADCAST_WEBSOCKET_SECRET, 'secret': settings.BROADCAST_WEBSOCKET_SECRET,
'nonce': nonce_serialized 'nonce': nonce_serialized
@@ -53,8 +45,6 @@ class WebsocketSecretAuthHelper:
@classmethod @classmethod
def verify_secret(cls, s, nonce_tolerance=300): def verify_secret(cls, s, nonce_tolerance=300):
hex_decoder = codecs.getdecoder("hex_codec")
try: try:
(prefix, payload) = s.split(' ') (prefix, payload) = s.split(' ')
if prefix != 'HMAC-SHA256': if prefix != 'HMAC-SHA256':
@@ -82,7 +72,7 @@ class WebsocketSecretAuthHelper:
# Avoid timing attack and check the nonce after all the heavy lifting # Avoid timing attack and check the nonce after all the heavy lifting
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
nonce_parsed = datetime.datetime.fromtimestamp(int(nonce_parsed)) nonce_parsed = datetime.datetime.fromtimestamp(int(nonce_parsed))
if (now-nonce_parsed).total_seconds() > nonce_tolerance: if (now - nonce_parsed).total_seconds() > nonce_tolerance:
raise ValueError("Potential replay attack or machine(s) time out of sync.") raise ValueError("Potential replay attack or machine(s) time out of sync.")
return True return True
@@ -160,9 +150,7 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
XRF_KEY not in self.scope["session"] or XRF_KEY not in self.scope["session"] or
xrftoken != self.scope["session"][XRF_KEY] xrftoken != self.scope["session"][XRF_KEY]
): ):
logger.error( logger.error(f"access denied to channel, XRF mismatch for {user.username}")
"access denied to channel, XRF mismatch for {}".format(user.username)
)
await self.send_json({"error": "access denied to channel"}) await self.send_json({"error": "access denied to channel"})
return return

View File

@@ -73,9 +73,9 @@ class PubSub(object):
def pg_bus_conn(): def pg_bus_conn():
conf = settings.DATABASES['default'] conf = settings.DATABASES['default']
conn = psycopg2.connect(dbname=conf['NAME'], conn = psycopg2.connect(dbname=conf['NAME'],
host=conf['HOST'], host=conf['HOST'],
user=conf['USER'], user=conf['USER'],
password=conf['PASSWORD']) password=conf['PASSWORD'])
# Django connection.cursor().connection doesn't have autocommit=True on # Django connection.cursor().connection doesn't have autocommit=True on
conn.set_session(autocommit=True) conn.set_session(autocommit=True)
pubsub = PubSub(conn) pubsub = PubSub(conn)

View File

@@ -1,5 +1,4 @@
import logging import logging
import socket
import string import string
import random import random
import json import json

View File

@@ -2,13 +2,11 @@ import inspect
import logging import logging
import sys import sys
import json import json
import re
from uuid import uuid4 from uuid import uuid4
from django.conf import settings from django.conf import settings
from django.db import connection
from . import pg_bus_conn, get_local_queuename from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch') logger = logging.getLogger('awx.main.dispatch')

View File

@@ -7,7 +7,6 @@ import signal
import sys import sys
import redis import redis
import json import json
import re
import psycopg2 import psycopg2
from uuid import UUID from uuid import UUID
from queue import Empty as QueueEmpty from queue import Empty as QueueEmpty

View File

@@ -5,7 +5,7 @@ import logging
from django.conf import settings from django.conf import settings
from django.core.cache import cache as django_cache from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db import connection as django_connection, connections from django.db import connection as django_connection
from awx.main.utils.handlers import AWXProxyHandler from awx.main.utils.handlers import AWXProxyHandler
from awx.main.dispatch import get_local_queuename, reaper from awx.main.dispatch import get_local_queuename, reaper

View File

@@ -4,7 +4,6 @@
# Python # Python
import json import json
import logging import logging
import os
import redis import redis
# Django # Django

View File

@@ -1,4 +1,3 @@
from django.urls import re_path
from django.conf.urls import url from django.conf.urls import url
from channels.auth import AuthMiddlewareStack from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter from channels.routing import ProtocolTypeRouter, URLRouter

View File

@@ -1,18 +1,12 @@
import os
import json import json
import logging import logging
import asyncio import asyncio
import datetime
import sys
import aiohttp import aiohttp
from aiohttp import client_exceptions from aiohttp import client_exceptions
from channels_redis.core import RedisChannelLayer
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from django.utils.encoding import force_bytes
from django.conf import settings from django.conf import settings
from django.apps import apps from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
@@ -57,10 +51,10 @@ class WebsocketTask():
event_loop, event_loop,
stats: BroadcastWebsocketStats, stats: BroadcastWebsocketStats,
remote_host: str, remote_host: str,
remote_port: int=settings.BROADCAST_WEBSOCKET_PORT, remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
protocol: str=settings.BROADCAST_WEBSOCKET_PROTOCOL, protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
verify_ssl: bool=settings.BROADCAST_WEBSOCKET_VERIFY_CERT, verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
endpoint: str='broadcast'): endpoint: str = 'broadcast'):
self.name = name self.name = name
self.event_loop = event_loop self.event_loop = event_loop
self.stats = stats self.stats = stats
@@ -112,16 +106,16 @@ class WebsocketTask():
except client_exceptions.ClientConnectorError as e: except client_exceptions.ClientConnectorError as e:
logger.warn(f"Failed to connect to {self.remote_host}: '{e}'. Reconnecting ...") logger.warn(f"Failed to connect to {self.remote_host}: '{e}'. Reconnecting ...")
self.stats.record_connection_lost() self.stats.record_connection_lost()
self.start(attempt=attempt+1) self.start(attempt=attempt + 1)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warn(f"Timeout while trying to connect to {self.remote_host}. Reconnecting ...") logger.warn(f"Timeout while trying to connect to {self.remote_host}. Reconnecting ...")
self.stats.record_connection_lost() self.stats.record_connection_lost()
self.start(attempt=attempt+1) self.start(attempt=attempt + 1)
except Exception as e: except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Early on, this is our canary. I'm not sure what exceptions we can really encounter.
logger.warn(f"Websocket broadcast client exception {type(e)} {e}") logger.warn(f"Websocket broadcast client exception {type(e)} {e}")
self.stats.record_connection_lost() self.stats.record_connection_lost()
self.start(attempt=attempt+1) self.start(attempt=attempt + 1)
def start(self, attempt=0): def start(self, attempt=0):
self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) self.async_task = self.event_loop.create_task(self.connect(attempt=attempt))

View File

@@ -1,10 +1,8 @@
import time
import threading import threading
import logging import logging
import atexit import atexit
import json import json
import ssl import ssl
from datetime import datetime
from six.moves.queue import Queue, Empty from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
@@ -187,7 +185,6 @@ class WSClient(object):
self._send(json.dumps(payload)) self._send(json.dumps(payload))
def unsubscribe(self, wait=True, timeout=10): def unsubscribe(self, wait=True, timeout=10):
time_start = datetime.now()
if wait: if wait:
# Other unnsubscribe events could have caused the edge to trigger. # Other unnsubscribe events could have caused the edge to trigger.
# This way the _next_ event will trigger our waiting. # This way the _next_ event will trigger our waiting.