From 648d9165ff7491bc03a5ba023d27b82a74ac16d7 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Mon, 30 Apr 2018 15:18:05 -0400 Subject: [PATCH] broadcast queues get a per-node stable queue name * Using Kombu's default Broadcast() constructor requires only 1 parameter. That parameter defines the exchange name and the queue name is randomly generated per-node. * This caused problems if/when celery enters an infinite restart loop because too many rabbit queues get created and rabbit OOM's (gracefully). * To remedy this we tell Broadcast the queue name to use, which is derived from some constant + the node name so that the per-node queue name is stable. --- awx/main/tasks.py | 4 ++-- awx/main/tests/unit/utils/test_ha.py | 29 ++++++++++++++++------------ awx/main/utils/ha.py | 28 +++++++++++++++++++-------- awx/settings/defaults.py | 6 +----- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 31edcc38d7..ea3c187d05 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -189,7 +189,7 @@ def apply_cluster_membership_policies(self): handle_ha_toplogy_changes.apply([]) -@shared_task(queue='tower_broadcast_all', bind=True) +@shared_task(exchange='tower_broadcast_all', bind=True) def handle_setting_changes(self, setting_keys): orig_len = len(setting_keys) for i in range(orig_len): @@ -208,7 +208,7 @@ def handle_setting_changes(self, setting_keys): restart_local_services(['uwsgi']) -@shared_task(bind=True, queue='tower_broadcast_all') +@shared_task(bind=True, exchange='tower_broadcast_all') def handle_ha_toplogy_changes(self): (changed, instance) = Instance.objects.get_or_register() if changed: diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index edd44b7958..6432be7a32 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -6,6 +6,7 @@ # python import pytest import mock +from contextlib import nested # AWX from awx.main.utils.ha import ( @@ -47,22 +48,26 @@ class TestAddRemoveCeleryWorkerQueues(): app.control.cancel_consumer = mocker.MagicMock() return app - @pytest.mark.parametrize("static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [ - (['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', [], []), - ([], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []), - ([], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []), - ([], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []), - ([], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']), + @pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [ + (['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', ['tower_broadcast_all_east-1'], []), + ([], [], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []), + ([], [], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []), + ([], [], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []), + ([], [], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']), ]) def test__add_remove_celery_worker_queues_noop(self, mock_app, - instance_generator, - worker_queues_generator, - static_queues, _worker_queues, + instance_generator, + worker_queues_generator, + broadcast_queues, + static_queues, _worker_queues, groups, hostname, added_expected, removed_expected): instance = instance_generator(groups=groups, hostname=hostname) worker_queues = worker_queues_generator(_worker_queues) - with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues): + with nested( + mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues), + mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues), + mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)): (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname) assert set(added_queues) == set(added_expected) assert set(removed_queues) == set(removed_expected) @@ -71,11 +76,11 @@ class TestAddRemoveCeleryWorkerQueues(): class TestUpdateCeleryWorkerRoutes(): @pytest.mark.parametrize("is_controller,expected_routes", [ - (False, { + (False, { 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, 'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'} }), - (True, { + (True, { 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, 'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'}, 'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 93a7f8dd24..1e3bee15fd 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -10,6 +10,10 @@ from django.conf import settings from awx.main.models import Instance +def construct_bcast_queue_name(common_name): + return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID + + def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): removed_queues = [] added_queues = [] @@ -19,17 +23,14 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w ig_names.update(instance.rampart_groups.values_list('name', flat=True)) worker_queue_names = set([q['name'] for q in worker_queues]) + bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC]) all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC) # Remove queues that aren't in the instance group - for queue in worker_queues: - if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \ - queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC: - continue - - if queue['name'] not in all_queue_names or not instance.enabled: - app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name]) - removed_queues.append(queue['name'].encode("utf8")) + for queue_name in worker_queue_names: + if queue_name not in all_queue_names | bcast_queue_names or not instance.enabled: + app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) + removed_queues.append(queue_name.encode("utf8")) # Add queues for instance and instance groups for queue_name in all_queue_names: @@ -37,6 +38,17 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name]) added_queues.append(queue_name.encode("utf8")) + # Add stable-named broadcast queues + for queue_name in settings.AWX_CELERY_BCAST_QUEUES_STATIC: + bcast_queue_name = construct_bcast_queue_name(queue_name) + if bcast_queue_name not in worker_queue_names: + app.control.add_consumer(bcast_queue_name, + exchange=queue_name.encode("utf8"), + exchange_type='fanout', + routing_key=queue_name.encode("utf8"), + reply=True) + added_queues.append(bcast_queue_name) + return (added_queues, removed_queues) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 354c2b9e74..20c8a82a1f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -9,8 +9,6 @@ import djcelery import six from datetime import timedelta -from kombu.common import Broadcast - # global settings from django.conf import global_settings # ugettext lazy @@ -466,9 +464,7 @@ CELERYD_POOL_RESTARTS = True CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler' CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_IMPORTS = ('awx.main.scheduler.tasks',) -CELERY_QUEUES = ( - Broadcast('tower_broadcast_all'), -) +CELERY_QUEUES = () CELERY_ROUTES = {}