From 9f745dd3b87896660513c60e207639eb954fda7d Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 1 May 2018 15:19:47 -0400 Subject: [PATCH] control celery routes using celery router * Each time a route is needed (i.e. when a task is sumitted to celery). The router will be queried. This is ideal. With the previous method we had to consider how a change in the routes would propogate to all celery workers and nodes. * fully describe the default awx queue * Our dynamic queue registration would correct awx_private_queue. However, we don't want celery to even create an "invalid"/extra queue-exchange-route. This change makes sure we don't create extranious things in rabbitmq. * reduce the cluster queue registration output. Only output when the queue registration list changes. --- awx/main/tasks.py | 29 ++++++---------------- awx/main/tests/unit/utils/test_ha.py | 36 ++++++++++------------------ awx/main/utils/ha.py | 31 +++++++++++------------- awx/settings/defaults.py | 5 +++- 4 files changed, 37 insertions(+), 64 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 22e6dff090..1e765dd709 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -61,7 +61,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock -from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues +from awx.main.utils.ha import register_celery_worker_queues from awx.main.consumers import emit_channel_notification from awx.conf import settings_registry @@ -201,39 +201,24 @@ def handle_ha_toplogy_changes(self): awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings') instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) - for instance in instances: - logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}") - .format(instance.hostname, removed_queues, added_queues)) - updated_routes = update_celery_worker_routes(instance, settings) - logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}") - .format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES)) + if len(removed_queues) + len(added_queues) > 0: + logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") + .format([i.hostname for i in instances], removed_queues, added_queues)) @worker_ready.connect def handle_ha_toplogy_worker_ready(sender, **kwargs): logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname)) instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) - for instance in instances: - logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}") - .format(instance.hostname, removed_queues, added_queues)) + if len(removed_queues) + len(added_queues) > 0: + logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}") + .format([i.hostname for i in instances], removed_queues, added_queues)) # Expedite the first hearbeat run so a node comes online quickly. cluster_node_heartbeat.apply([]) apply_cluster_membership_policies.apply([]) -@celeryd_init.connect -def handle_update_celery_routes(sender=None, conf=None, **kwargs): - conf = conf if conf else sender.app.conf - logger.debug(six.text_type("Registering celery routes for {}").format(sender)) - (changed, instance) = Instance.objects.get_or_register() - if changed: - logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname)) - added_routes = update_celery_worker_routes(instance, conf) - logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}") - .format(instance.hostname, added_routes, conf.CELERY_ROUTES)) - - @celeryd_after_setup.connect def handle_update_celery_hostname(sender, instance, **kwargs): (changed, tower_instance) = Instance.objects.get_or_register() diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 6432be7a32..94cb7d3606 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -11,18 +11,10 @@ from contextlib import nested # AWX from awx.main.utils.ha import ( _add_remove_celery_worker_queues, - update_celery_worker_routes, + AWXCeleryRouter, ) -@pytest.fixture -def conf(): - class Conf(): - CELERY_ROUTES = dict() - CELERYBEAT_SCHEDULE = dict() - return Conf() - - class TestAddRemoveCeleryWorkerQueues(): @pytest.fixture def instance_generator(self, mocker): @@ -73,7 +65,7 @@ class TestAddRemoveCeleryWorkerQueues(): assert set(removed_queues) == set(removed_expected) -class TestUpdateCeleryWorkerRoutes(): +class TestUpdateCeleryWorkerRouter(): @pytest.mark.parametrize("is_controller,expected_routes", [ (False, { @@ -86,20 +78,16 @@ class TestUpdateCeleryWorkerRoutes(): 'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, }), ]) - def test_update_celery_worker_routes(self, mocker, conf, is_controller, expected_routes): - instance = mocker.MagicMock() - instance.hostname = 'east-1' - instance.is_controller = mocker.MagicMock(return_value=is_controller) + def test_update_celery_worker_routes(self, mocker, is_controller, expected_routes): + def get_or_register(): + instance = mock.MagicMock() + instance.hostname = 'east-1' + instance.is_controller = mock.MagicMock(return_value=is_controller) + return (False, instance) - assert update_celery_worker_routes(instance, conf) == expected_routes - assert conf.CELERY_ROUTES == expected_routes + with mock.patch('awx.main.models.Instance.objects.get_or_register', get_or_register): + router = AWXCeleryRouter() - def test_update_celery_worker_routes_deleted(self, mocker, conf): - instance = mocker.MagicMock() - instance.hostname = 'east-1' - instance.is_controller = mocker.MagicMock(return_value=False) - conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'} - - update_celery_worker_routes(instance, conf) - assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES + for k,v in expected_routes.iteritems(): + assert router.route_for_task(k) == v diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 1e3bee15fd..91b9a0c05b 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -52,24 +52,21 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w return (added_queues, removed_queues) -def update_celery_worker_routes(instance, conf): - tasks = [ - 'awx.main.tasks.cluster_node_heartbeat', - 'awx.main.tasks.purge_old_stdout_files', - ] - routes_updated = {} - # Instance is, effectively, a controller node - if instance.is_controller(): - tasks.append('awx.main.tasks.awx_isolated_heartbeat') - else: - if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES: - del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] +class AWXCeleryRouter(object): + def route_for_task(self, task, args=None, kwargs=None): + (changed, instance) = Instance.objects.get_or_register() + tasks = [ + 'awx.main.tasks.cluster_node_heartbeat', + 'awx.main.tasks.purge_old_stdout_files', + ] + isolated_tasks = [ + 'awx.main.tasks.awx_isolated_heartbeat', + ] + if task in tasks: + return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")} - for t in tasks: - conf.CELERY_ROUTES[t] = {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")} - routes_updated[t] = conf.CELERY_ROUTES[t] - - return routes_updated + if instance.is_controller() and task in isolated_tasks: + return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")} def register_celery_worker_queues(app, celery_worker_name): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 4a7fc96358..b4db5c63e1 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -454,6 +454,9 @@ BROKER_POOL_LIMIT = None BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_EVENT_QUEUE_TTL = 5 CELERY_DEFAULT_QUEUE = 'awx_private_queue' +CELERY_DEFAULT_EXCHANGE = 'awx_private_queue' +CELERY_DEFAULT_ROUTING_KEY = 'awx_private_queue' +CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -465,7 +468,7 @@ CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler' CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = () -CELERY_ROUTES = {} +CELERY_ROUTES = ('awx.main.utils.ha.AWXCeleryRouter',) def log_celery_failure(*args):