From fb119671146b779a1f710872cccbae9e8adb93c7 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 8 Jun 2018 13:46:58 -0400 Subject: [PATCH] remove isolated instance group queue listening --- awx/main/tasks.py | 12 ++++++------ awx/main/tests/unit/utils/test_ha.py | 2 +- awx/main/utils/ha.py | 14 ++++---------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index be10d3ef58..c0667d15b6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -209,19 +209,19 @@ def handle_ha_toplogy_changes(self): logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname)) awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings') - instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) + removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") - .format([i.hostname for i in instances], removed_queues, added_queues)) + logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") + .format(self.request.hostname, removed_queues, added_queues)) @worker_ready.connect def handle_ha_toplogy_worker_ready(sender, **kwargs): logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname)) - instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) + removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) if len(removed_queues) + len(added_queues) > 0: - logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") - .format([i.hostname for i in instances], removed_queues, added_queues)) + logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") + .format(sender.hostname, removed_queues, added_queues)) # Expedite the first hearbeat run so a node comes online quickly. cluster_node_heartbeat.apply([]) diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index f73f8e908a..35c8005781 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -61,7 +61,7 @@ class TestAddRemoveCeleryWorkerQueues(): mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues), mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues), mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)): - (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname) + (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname) assert set(added_expected) == set(added_queues) assert set(removed_expected) == set(removed_queues) diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index dd629ca24d..1d9b6e08d3 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -14,14 +14,13 @@ def construct_bcast_queue_name(common_name): return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID -def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): +def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): removed_queues = [] added_queues = [] - hostnames = set([instance.hostname for instance in controlled_instances]) worker_queue_names = set([q['name'] for q in worker_queues]) bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC]) - all_queue_names = hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) + all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC) desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set()) # Remove queues @@ -69,18 +68,13 @@ class AWXCeleryRouter(object): def register_celery_worker_queues(app, celery_worker_name): instance = Instance.objects.me() - controlled_instances = [instance] - if instance.is_controller(): - controlled_instances.extend(Instance.objects.filter( - rampart_groups__controller__instances__hostname=instance.hostname - )) added_queues = [] removed_queues = [] celery_host_queues = app.control.inspect([celery_worker_name]).active_queues() celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] - (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances, + (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name) - return (controlled_instances, removed_queues, added_queues) + return (removed_queues, added_queues)