Merge pull request #1629 from chrismeyersfsu/fix-stable_broadcast_queue_name

broadcast queues get a per-node stable queue name
This commit is contained in:
Chris Meyers 2018-05-01 13:53:25 -04:00 committed by GitHub
commit 22c2bd0257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 27 deletions

View File

@ -189,7 +189,7 @@ def apply_cluster_membership_policies(self):
handle_ha_toplogy_changes.apply([])
@shared_task(queue='tower_broadcast_all', bind=True)
@shared_task(exchange='tower_broadcast_all', bind=True)
def handle_setting_changes(self, setting_keys):
orig_len = len(setting_keys)
for i in range(orig_len):
@ -208,7 +208,7 @@ def handle_setting_changes(self, setting_keys):
restart_local_services(['uwsgi'])
@shared_task(bind=True, queue='tower_broadcast_all')
@shared_task(bind=True, exchange='tower_broadcast_all')
def handle_ha_toplogy_changes(self):
(changed, instance) = Instance.objects.get_or_register()
if changed:

View File

@ -6,6 +6,7 @@
# python
import pytest
import mock
from contextlib import nested
# AWX
from awx.main.utils.ha import (
@ -47,22 +48,26 @@ class TestAddRemoveCeleryWorkerQueues():
app.control.cancel_consumer = mocker.MagicMock()
return app
@pytest.mark.parametrize("static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [
(['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', [], []),
([], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []),
([], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []),
([], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []),
([], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']),
@pytest.mark.parametrize("broadcast_queues,static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [
(['tower_broadcast_all'], ['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', ['tower_broadcast_all_east-1'], []),
([], [], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []),
([], [], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []),
([], [], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []),
([], [], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']),
])
def test__add_remove_celery_worker_queues_noop(self, mock_app,
instance_generator,
worker_queues_generator,
static_queues, _worker_queues,
instance_generator,
worker_queues_generator,
broadcast_queues,
static_queues, _worker_queues,
groups, hostname,
added_expected, removed_expected):
instance = instance_generator(groups=groups, hostname=hostname)
worker_queues = worker_queues_generator(_worker_queues)
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):
with nested(
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues),
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues),
mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)):
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname)
assert set(added_queues) == set(added_expected)
assert set(removed_queues) == set(removed_expected)
@ -71,11 +76,11 @@ class TestAddRemoveCeleryWorkerQueues():
class TestUpdateCeleryWorkerRoutes():
@pytest.mark.parametrize("is_controller,expected_routes", [
(False, {
(False, {
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'}
}),
(True, {
(True, {
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'},
'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},

View File

@ -10,6 +10,10 @@ from django.conf import settings
from awx.main.models import Instance
def construct_bcast_queue_name(common_name):
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
removed_queues = []
added_queues = []
@ -19,17 +23,14 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
worker_queue_names = set([q['name'] for q in worker_queues])
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
# Remove queues that aren't in the instance group
for queue in worker_queues:
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
queue['alias'] in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
continue
if queue['name'] not in all_queue_names or not instance.enabled:
app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name])
removed_queues.append(queue['name'].encode("utf8"))
for queue_name in worker_queue_names:
if queue_name not in all_queue_names | bcast_queue_names or not instance.enabled:
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
removed_queues.append(queue_name.encode("utf8"))
# Add queues for instance and instance groups
for queue_name in all_queue_names:
@ -37,6 +38,17 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
added_queues.append(queue_name.encode("utf8"))
# Add stable-named broadcast queues
for queue_name in settings.AWX_CELERY_BCAST_QUEUES_STATIC:
bcast_queue_name = construct_bcast_queue_name(queue_name)
if bcast_queue_name not in worker_queue_names:
app.control.add_consumer(bcast_queue_name,
exchange=queue_name.encode("utf8"),
exchange_type='fanout',
routing_key=queue_name.encode("utf8"),
reply=True)
added_queues.append(bcast_queue_name)
return (added_queues, removed_queues)

View File

@ -9,8 +9,6 @@ import djcelery
import six
from datetime import timedelta
from kombu.common import Broadcast
# global settings
from django.conf import global_settings
# ugettext lazy
@ -466,9 +464,7 @@ CELERYD_POOL_RESTARTS = True
CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler'
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
CELERY_QUEUES = (
Broadcast('tower_broadcast_all'),
)
CELERY_QUEUES = ()
CELERY_ROUTES = {}