mirror of
https://github.com/ansible/awx.git
synced 2026-03-22 11:25:08 -02:30
Merge pull request #2110 from chrismeyersfsu/improvement-remove_instance_group_queues2
remove rampart group queue subscription
This commit is contained in:
@@ -209,19 +209,19 @@ def handle_ha_toplogy_changes(self):
|
|||||||
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
||||||
awx_app = Celery('awx')
|
awx_app = Celery('awx')
|
||||||
awx_app.config_from_object('django.conf:settings')
|
awx_app.config_from_object('django.conf:settings')
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||||
if len(removed_queues) + len(added_queues) > 0:
|
if len(removed_queues) + len(added_queues) > 0:
|
||||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
.format(self.request.hostname, removed_queues, added_queues))
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
@worker_ready.connect
|
||||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||||
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||||
if len(removed_queues) + len(added_queues) > 0:
|
if len(removed_queues) + len(added_queues) > 0:
|
||||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
.format(sender.hostname, removed_queues, added_queues))
|
||||||
|
|
||||||
# Expedite the first hearbeat run so a node comes online quickly.
|
# Expedite the first hearbeat run so a node comes online quickly.
|
||||||
cluster_node_heartbeat.apply([])
|
cluster_node_heartbeat.apply([])
|
||||||
|
|||||||
@@ -18,7 +18,8 @@ from awx.main.utils.ha import (
|
|||||||
class TestAddRemoveCeleryWorkerQueues():
|
class TestAddRemoveCeleryWorkerQueues():
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def instance_generator(self, mocker):
|
def instance_generator(self, mocker):
|
||||||
def fn(groups=['east', 'west', 'north', 'south'], hostname='east-1'):
|
def fn(hostname='east-1'):
|
||||||
|
groups=['east', 'west', 'north', 'south']
|
||||||
instance = mocker.MagicMock()
|
instance = mocker.MagicMock()
|
||||||
instance.hostname = hostname
|
instance.hostname = hostname
|
||||||
instance.rampart_groups = mocker.MagicMock()
|
instance.rampart_groups = mocker.MagicMock()
|
||||||
@@ -40,29 +41,29 @@ class TestAddRemoveCeleryWorkerQueues():
|
|||||||
app.control.cancel_consumer = mocker.MagicMock()
|
app.control.cancel_consumer = mocker.MagicMock()
|
||||||
return app
|
return app
|
||||||
|
|
||||||
@pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [
|
@pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,hostname,added_expected,removed_expected", [
|
||||||
(['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', ['tower_broadcast_all_east-1'], []),
|
(['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], 'east-1', ['tower_broadcast_all_east-1'], []),
|
||||||
([], [], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []),
|
([], [], ['east', 'west', 'east-1'], 'east-1', [], ['east', 'west']),
|
||||||
([], [], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []),
|
([], [], ['east', 'west'], 'east-1', ['east-1'], ['east', 'west']),
|
||||||
([], [], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []),
|
([], [], [], 'east-1', ['east-1'], []),
|
||||||
([], [], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']),
|
([], [], ['china', 'russia'], 'east-1', [ 'east-1'], ['china', 'russia']),
|
||||||
])
|
])
|
||||||
def test__add_remove_celery_worker_queues_noop(self, mock_app,
|
def test__add_remove_celery_worker_queues_noop(self, mock_app,
|
||||||
instance_generator,
|
instance_generator,
|
||||||
worker_queues_generator,
|
worker_queues_generator,
|
||||||
broadcast_queues,
|
broadcast_queues,
|
||||||
static_queues, _worker_queues,
|
static_queues, _worker_queues,
|
||||||
groups, hostname,
|
hostname,
|
||||||
added_expected, removed_expected):
|
added_expected, removed_expected):
|
||||||
instance = instance_generator(groups=groups, hostname=hostname)
|
instance = instance_generator(hostname=hostname)
|
||||||
worker_queues = worker_queues_generator(_worker_queues)
|
worker_queues = worker_queues_generator(_worker_queues)
|
||||||
with nested(
|
with nested(
|
||||||
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues),
|
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues),
|
||||||
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues),
|
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues),
|
||||||
mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)):
|
mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)):
|
||||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname)
|
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname)
|
||||||
assert set(added_queues) == set(added_expected)
|
assert set(added_expected) == set(added_queues)
|
||||||
assert set(removed_queues) == set(removed_expected)
|
assert set(removed_expected) == set(removed_queues)
|
||||||
|
|
||||||
|
|
||||||
class TestUpdateCeleryWorkerRouter():
|
class TestUpdateCeleryWorkerRouter():
|
||||||
|
|||||||
@@ -14,17 +14,13 @@ def construct_bcast_queue_name(common_name):
|
|||||||
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
|
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
|
||||||
|
|
||||||
|
|
||||||
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
|
||||||
removed_queues = []
|
removed_queues = []
|
||||||
added_queues = []
|
added_queues = []
|
||||||
ig_names = set()
|
|
||||||
hostnames = set([instance.hostname for instance in controlled_instances])
|
|
||||||
for instance in controlled_instances:
|
|
||||||
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
|
||||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||||
|
|
||||||
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
||||||
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||||
desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set())
|
desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set())
|
||||||
|
|
||||||
# Remove queues
|
# Remove queues
|
||||||
@@ -33,7 +29,7 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
|||||||
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||||
removed_queues.append(queue_name.encode("utf8"))
|
removed_queues.append(queue_name.encode("utf8"))
|
||||||
|
|
||||||
# Add queues for instance and instance groups
|
# Add queues for instances
|
||||||
for queue_name in all_queue_names:
|
for queue_name in all_queue_names:
|
||||||
if queue_name not in worker_queue_names:
|
if queue_name not in worker_queue_names:
|
||||||
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||||
@@ -72,18 +68,13 @@ class AWXCeleryRouter(object):
|
|||||||
|
|
||||||
def register_celery_worker_queues(app, celery_worker_name):
|
def register_celery_worker_queues(app, celery_worker_name):
|
||||||
instance = Instance.objects.me()
|
instance = Instance.objects.me()
|
||||||
controlled_instances = [instance]
|
|
||||||
if instance.is_controller():
|
|
||||||
controlled_instances.extend(Instance.objects.filter(
|
|
||||||
rampart_groups__controller__instances__hostname=instance.hostname
|
|
||||||
))
|
|
||||||
added_queues = []
|
added_queues = []
|
||||||
removed_queues = []
|
removed_queues = []
|
||||||
|
|
||||||
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
||||||
|
|
||||||
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
||||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
|
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance,
|
||||||
celery_worker_queues, celery_worker_name)
|
celery_worker_queues, celery_worker_name)
|
||||||
return (controlled_instances, removed_queues, added_queues)
|
return (removed_queues, added_queues)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user