mirror of
https://github.com/ansible/awx.git
synced 2026-05-18 06:47:41 -02:30
Merge pull request #6344 from chrismeyersfsu/redis-cleanup1
Redis cleanup1 Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
@@ -1,7 +1,5 @@
|
|||||||
import psycopg2
|
import psycopg2
|
||||||
import select
|
import select
|
||||||
import sys
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
@@ -9,10 +7,6 @@ from django.conf import settings
|
|||||||
|
|
||||||
|
|
||||||
NOT_READY = ([], [], [])
|
NOT_READY = ([], [], [])
|
||||||
if 'run_callback_receiver' in sys.argv:
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
|
||||||
else:
|
|
||||||
logger = logging.getLogger('awx.main.dispatch')
|
|
||||||
|
|
||||||
|
|
||||||
def get_local_queuename():
|
def get_local_queuename():
|
||||||
@@ -36,25 +30,6 @@ class PubSub(object):
|
|||||||
with self.conn.cursor() as cur:
|
with self.conn.cursor() as cur:
|
||||||
cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))
|
cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))
|
||||||
|
|
||||||
def get_event(self, select_timeout=0):
|
|
||||||
# poll the connection, then return one event, if we have one. Else
|
|
||||||
# return None.
|
|
||||||
select.select([self.conn], [], [], select_timeout)
|
|
||||||
self.conn.poll()
|
|
||||||
if self.conn.notifies:
|
|
||||||
return self.conn.notifies.pop(0)
|
|
||||||
|
|
||||||
def get_events(self, select_timeout=0):
|
|
||||||
# Poll the connection and return all events, if there are any. Else
|
|
||||||
# return None.
|
|
||||||
select.select([self.conn], [], [], select_timeout) # redundant?
|
|
||||||
self.conn.poll()
|
|
||||||
events = []
|
|
||||||
while self.conn.notifies:
|
|
||||||
events.append(self.conn.notifies.pop(0))
|
|
||||||
if events:
|
|
||||||
return events
|
|
||||||
|
|
||||||
def events(self, select_timeout=5, yield_timeouts=False):
|
def events(self, select_timeout=5, yield_timeouts=False):
|
||||||
while True:
|
while True:
|
||||||
if select.select([self.conn], [], [], select_timeout) == NOT_READY:
|
if select.select([self.conn], [], [], select_timeout) == NOT_READY:
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
import string
|
import uuid
|
||||||
import random
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from awx.main.dispatch import get_local_queuename
|
from awx.main.dispatch import get_local_queuename
|
||||||
@@ -21,10 +20,6 @@ class Control(object):
|
|||||||
self.service = service
|
self.service = service
|
||||||
self.queuename = host or get_local_queuename()
|
self.queuename = host or get_local_queuename()
|
||||||
|
|
||||||
def publish(self, msg, conn, **kwargs):
|
|
||||||
# TODO: delete this method??
|
|
||||||
raise RuntimeError("Publish called?!")
|
|
||||||
|
|
||||||
def status(self, *args, **kwargs):
|
def status(self, *args, **kwargs):
|
||||||
return self.control_with_reply('status', *args, **kwargs)
|
return self.control_with_reply('status', *args, **kwargs)
|
||||||
|
|
||||||
@@ -33,8 +28,7 @@ class Control(object):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def generate_reply_queue_name(cls):
|
def generate_reply_queue_name(cls):
|
||||||
letters = string.ascii_lowercase
|
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"
|
||||||
return 'reply_to_{}'.format(''.join(random.choice(letters) for i in range(8)))
|
|
||||||
|
|
||||||
def control_with_reply(self, command, timeout=5):
|
def control_with_reply(self, command, timeout=5):
|
||||||
logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
|
logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ class task:
|
|||||||
getattr(cls.queue, 'im_func', cls.queue)
|
getattr(cls.queue, 'im_func', cls.queue)
|
||||||
)
|
)
|
||||||
if not queue:
|
if not queue:
|
||||||
msg = f'{cls.name}: Queue value required and may not me None'
|
msg = f'{cls.name}: Queue value required and may not be None'
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
obj = {
|
obj = {
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ class InstanceManager(models.Manager):
|
|||||||
instance = instance.get()
|
instance = instance.get()
|
||||||
if instance.ip_address != ip_address:
|
if instance.ip_address != ip_address:
|
||||||
instance.ip_address = ip_address
|
instance.ip_address = ip_address
|
||||||
instance.save()
|
instance.save(update_fields=['ip_address'])
|
||||||
return (True, instance)
|
return (True, instance)
|
||||||
else:
|
else:
|
||||||
return (False, instance)
|
return (False, instance)
|
||||||
|
|||||||
@@ -349,7 +349,7 @@ class TestTaskPublisher:
|
|||||||
def test_apply_async_queue_required(self):
|
def test_apply_async_queue_required(self):
|
||||||
with pytest.raises(ValueError) as e:
|
with pytest.raises(ValueError) as e:
|
||||||
message, queue = add.apply_async([2, 2])
|
message, queue = add.apply_async([2, 2])
|
||||||
assert "awx.main.tests.functional.test_dispatch.add: Queue value required and may not me None" == e.value.args[0]
|
assert "awx.main.tests.functional.test_dispatch.add: Queue value required and may not be None" == e.value.args[0]
|
||||||
|
|
||||||
def test_queue_defined_in_task_decorator(self):
|
def test_queue_defined_in_task_decorator(self):
|
||||||
message, queue = multiply.apply_async([2, 2])
|
message, queue = multiply.apply_async([2, 2])
|
||||||
|
|||||||
Reference in New Issue
Block a user