From c9ff3e99b8f84f16e4763b965a82365b234321db Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 26 Sep 2017 10:28:41 -0400 Subject: [PATCH] celeryd attach to queues dynamically * Based on the tower topology (Instance and InstanceGroup relationships), have celery dyamically listen to queues on boot * Add celery task capable of "refreshing" what queues each celeryd worker listens to. This will be used to support changes in the topology. * Cleaned up some celery task definitions. * Converged wrongly targeted job launch/finish messages to 'tower' queue, rather than a 1-off queue. * Dynamically route celery tasks destined for the local node * separate beat process add support for separate beat process --- Makefile | 2 +- awx/main/models/ha.py | 3 + awx/main/scheduler/tasks.py | 4 +- awx/main/tasks.py | 34 +++++- awx/main/tests/unit/utils/test_ha.py | 100 ++++++++++++++++++ awx/main/utils/ha.py | 71 +++++++++++++ awx/settings/defaults.py | 48 +++++---- awx/settings/development.py | 9 -- installer/image_build/files/settings.py | 3 - .../image_build/files/supervisor_task.conf | 3 +- tools/docker-compose/supervisor.conf | 2 +- 11 files changed, 237 insertions(+), 42 deletions(-) create mode 100644 awx/main/tests/unit/utils/test_ha.py create mode 100644 awx/main/utils/ha.py diff --git a/Makefile b/Makefile index 58479e0f19..874a4e52ee 100644 --- a/Makefile +++ b/Makefile @@ -326,7 +326,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,$(COMPOSE_HOST),$(AWX_GROUP_QUEUES) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid + celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_broadcast_all -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid # Run to start the zeromq callback receiver receiver: diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index f2e57f7a07..cb63beb126 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -63,6 +63,9 @@ class Instance(models.Model): grace_period = settings.AWX_ISOLATED_PERIODIC_CHECK * 2 return self.modified < ref_time - timedelta(seconds=grace_period) + def is_controller(self): + return Instance.objects.filter(rampart_groups__controller__instances=self).exists() + class InstanceGroup(models.Model): """A model representing a Queue/Group of AWX Instances.""" diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 70d4c95354..89e36f6a93 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -21,12 +21,12 @@ class LogErrorsTask(Task): super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) -@shared_task +@shared_task(base=LogErrorsTask) def run_job_launch(job_id): TaskManager().schedule() -@shared_task +@shared_task(base=LogErrorsTask) def run_job_complete(job_id): TaskManager().schedule() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1f74fb04a1..36274b926d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -26,7 +26,7 @@ except Exception: # Celery from celery import Task, shared_task -from celery.signals import celeryd_init, worker_process_init, worker_shutdown +from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, beat_init # Django from django.conf import settings @@ -57,6 +57,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, ignore_inventory_computed_fields, ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) from awx.main.utils.reload import restart_local_services, stop_local_services +from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification from awx.conf import settings_registry @@ -147,6 +148,37 @@ def handle_setting_changes(self, setting_keys): break +@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask) +def handle_ha_toplogy_changes(self): + instance = Instance.objects.me() + logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname)) + (instance, removed_queues, added_queues) = register_celery_worker_queues(self.app, self.request.hostname) + logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}" + .format(instance.hostname, removed_queues, added_queues)) + updated_routes = update_celery_worker_routes(instance, settings) + logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}" + .format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES)) + + +@worker_ready.connect +def handle_ha_toplogy_worker_ready(sender, **kwargs): + logger.debug("Configure celeryd queues task on host {}".format(sender.hostname)) + (instance, removed_queues, added_queues) = register_celery_worker_queues(sender.app, sender.hostname) + logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}" + .format(instance.hostname, removed_queues, added_queues)) + + +@beat_init.connect +@celeryd_init.connect +def handle_update_celery_routes(sender=None, conf=None, **kwargs): + conf = conf if conf else sender.app.conf + logger.debug("Registering celery routes for {}".format(sender)) + instance = Instance.objects.me() + added_routes = update_celery_worker_routes(instance, conf) + logger.info("Workers on tower node '{}' added routes {} all routes are now {}" + .format(instance.hostname, added_routes, conf.CELERY_ROUTES)) + + @shared_task(queue='tower', base=LogErrorsTask) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py new file mode 100644 index 0000000000..6bd1b856b9 --- /dev/null +++ b/awx/main/tests/unit/utils/test_ha.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2017 Ansible Tower by Red Hat +# All Rights Reserved. + +# python +import pytest +import mock + +# AWX +from awx.main.utils.ha import ( + _add_remove_celery_worker_queues, + update_celery_worker_routes, +) + + +@pytest.fixture +def conf(): + class Conf(): + CELERY_ROUTES = dict() + CELERYBEAT_SCHEDULE = dict() + return Conf() + + +class TestAddRemoveCeleryWorkerQueues(): + @pytest.fixture + def instance_generator(self, mocker): + def fn(groups=['east', 'west', 'north', 'south'], hostname='east-1'): + instance = mocker.MagicMock() + instance.hostname = hostname + instance.rampart_groups = mocker.MagicMock() + instance.rampart_groups.values_list = mocker.MagicMock(return_value=groups) + + return instance + return fn + + @pytest.fixture + def worker_queues_generator(self, mocker): + def fn(queues=['east', 'west']): + return [dict(name=n, alias='') for n in queues] + return fn + + @pytest.fixture + def mock_app(self, mocker): + app = mocker.MagicMock() + app.control = mocker.MagicMock() + app.control.cancel_consumer = mocker.MagicMock() + return app + + @pytest.mark.parametrize("static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [ + (['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', [], []), + ([], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []), + ([], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []), + ([], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []), + ([], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']), + ]) + def test__add_remove_celery_worker_queues_noop(self, mock_app, + instance_generator, + worker_queues_generator, + static_queues, _worker_queues, + groups, hostname, + added_expected, removed_expected): + instance = instance_generator(groups=groups, hostname=hostname) + worker_queues = worker_queues_generator(_worker_queues) + with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues): + (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname) + assert set(added_queues) == set(added_expected) + assert set(removed_queues) == set(removed_expected) + + +class TestUpdateCeleryWorkerRoutes(): + + @pytest.mark.parametrize("is_controller,expected_routes", [ + (False, { + 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, + 'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'} + }), + (True, { + 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, + 'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'}, + 'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'}, + }), + ]) + def test_update_celery_worker_routes(self, mocker, conf, is_controller, expected_routes): + instance = mocker.MagicMock() + instance.hostname = 'east-1' + instance.is_controller = mocker.MagicMock(return_value=is_controller) + + assert update_celery_worker_routes(instance, conf) == expected_routes + assert conf.CELERY_ROUTES == expected_routes + + def test_update_celery_worker_routes_deleted(self, mocker, conf): + instance = mocker.MagicMock() + instance.hostname = 'east-1' + instance.is_controller = mocker.MagicMock(return_value=False) + conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'} + + update_celery_worker_routes(instance, conf) + assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES + diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py new file mode 100644 index 0000000000..9efb3e9cf3 --- /dev/null +++ b/awx/main/utils/ha.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2017 Ansible Tower by Red Hat +# All Rights Reserved. + +# Django +from django.conf import settings + +# AWX +from awx.main.models import Instance + + +def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): + removed_queues = [] + added_queues = [] + ig_names = set(instance.rampart_groups.values_list('name', flat=True)) + worker_queue_names = set([q['name'] for q in worker_queues]) + + + # Remove queues that aren't in the instance group + for queue in worker_queues: + if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \ + queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC: + continue + + if queue['name'] not in ig_names | set([instance.hostname]): + app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name]) + removed_queues.append(queue['name']) + + # Add queues for instance and instance groups + for queue_name in ig_names | set([instance.hostname]): + if queue_name not in worker_queue_names: + app.control.add_consumer(queue_name, reply=True, destination=[worker_name]) + added_queues.append(queue_name) + + return (added_queues, removed_queues) + + +def update_celery_worker_routes(instance, conf): + tasks = [ + 'awx.main.tasks.cluster_node_heartbeat', + 'awx.main.tasks.purge_old_stdout_files', + ] + routes_updated = {} + + # Instance is, effectively, a controller node + if instance.is_controller(): + tasks.append('awx.main.tasks.awx_isolated_heartbeat') + else: + if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES: + del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] + + for t in tasks: + conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname} + routes_updated[t] = conf.CELERY_ROUTES[t] + + return routes_updated + + +def register_celery_worker_queues(app, celery_worker_name): + instance = Instance.objects.me() + added_queues = [] + removed_queues = [] + + celery_host_queues = app.control.inspect([celery_worker_name]).active_queues() + + celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] + (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name) + + return (instance, removed_queues, added_queues) + diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2bcb8ee3d2..c97348e45e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -392,6 +392,18 @@ EMAIL_HOST_USER = '' EMAIL_HOST_PASSWORD = '' EMAIL_USE_TLS = False +# The number of seconds to sleep between status checks for jobs running on isolated nodes +AWX_ISOLATED_CHECK_INTERVAL = 30 + +# The timeout (in seconds) for launching jobs on isolated nodes +AWX_ISOLATED_LAUNCH_TIMEOUT = 600 + +# Ansible connection timeout (in seconds) for communicating with isolated instances +AWX_ISOLATED_CONNECTION_TIMEOUT = 10 + +# The time (in seconds) between the periodic isolated heartbeat status check +AWX_ISOLATED_PERIODIC_CHECK = 600 + # Memcached django cache configuration # CACHES = { # 'default': { @@ -435,20 +447,12 @@ CELERY_BEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'django-db' CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_TASK_QUEUES = ( - Queue('default', Exchange('default'), routing_key='default'), Queue('tower', Exchange('tower'), routing_key='tower'), - Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False), Broadcast('tower_broadcast_all') ) -CELERY_TASK_ROUTES = { - 'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', 'routing_key': 'tower'}, - 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.launch'}, - 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.complete'}, - 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', 'routing_key': 'cluster.heartbeat'}, - 'awx.main.tasks.purge_old_stdout_files': {'queue': 'default', 'routing_key': 'cluster.heartbeat'}, -} +CELERY_TASK_ROUTES = {} -CELERY_BEAT_SCHEDULE = { +CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), @@ -474,11 +478,21 @@ CELERY_BEAT_SCHEDULE = { 'task_manager': { 'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), - 'options': {'expires': 20,} + 'options': {'expires': 20} }, + 'isolated_heartbeat': { + 'task': 'awx.main.tasks.awx_isolated_heartbeat', + 'schedule': timedelta(seconds=AWX_ISOLATED_PERIODIC_CHECK), + 'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2}, + } } AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3 +# Celery queues that will always be listened to by celery workers +# Note: Broadcast queues have unique, auto-generated names, with the alias +# property value of the original queue name. +AWX_CELERY_QUEUES_STATIC = ['tower_broadcast_all',] + # Django Caching Configuration if is_testing(): CACHES = { @@ -627,18 +641,6 @@ AWX_ANSIBLE_CALLBACK_PLUGINS = "" # Time at which an HA node is considered active AWX_ACTIVE_NODE_TIME = 7200 -# The number of seconds to sleep between status checks for jobs running on isolated nodes -AWX_ISOLATED_CHECK_INTERVAL = 30 - -# The timeout (in seconds) for launching jobs on isolated nodes -AWX_ISOLATED_LAUNCH_TIMEOUT = 600 - -# Ansible connection timeout (in seconds) for communicating with isolated instances -AWX_ISOLATED_CONNECTION_TIMEOUT = 10 - -# The time (in seconds) between the periodic isolated heartbeat status check -AWX_ISOLATED_PERIODIC_CHECK = 600 - # Enable Pendo on the UI, possible values are 'off', 'anonymous', and 'detailed' # Note: This setting may be overridden by database settings. PENDO_TRACKING_STATE = "off" diff --git a/awx/settings/development.py b/awx/settings/development.py index 682cf21dd8..617c0b6745 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -138,15 +138,6 @@ except ImportError: sys.exit(1) CLUSTER_HOST_ID = socket.gethostname() -CELERY_TASK_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID} -# Production only runs this schedule on controlling nodes -# but development will just run it on all nodes -CELERY_TASK_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID} -CELERY_BEAT_SCHEDULE['isolated_heartbeat'] = { - 'task': 'awx.main.tasks.awx_isolated_heartbeat', - 'schedule': timedelta(seconds = AWX_ISOLATED_PERIODIC_CHECK), - 'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2,} -} # Supervisor service name dictionary used for programatic restart SERVICE_NAME_DICT = { diff --git a/installer/image_build/files/settings.py b/installer/image_build/files/settings.py index d9a56df2f1..aac778aaba 100644 --- a/installer/image_build/files/settings.py +++ b/installer/image_build/files/settings.py @@ -31,9 +31,6 @@ AWX_PROOT_ENABLED = False CLUSTER_HOST_ID = "awx" SYSTEM_UUID = '00000000-0000-0000-0000-000000000000' -CELERY_TASK_QUEUES += (Queue(CLUSTER_HOST_ID, Exchange(CLUSTER_HOST_ID), routing_key=CLUSTER_HOST_ID),) -CELERY_TASK_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID} -CELERY_TASK_ROUTES['awx.main.tasks.purge_old_stdout_files'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID} ############################################################################### diff --git a/installer/image_build/files/supervisor_task.conf b/installer/image_build/files/supervisor_task.conf index 857f941c96..ad49df4587 100644 --- a/installer/image_build/files/supervisor_task.conf +++ b/installer/image_build/files/supervisor_task.conf @@ -3,8 +3,7 @@ nodaemon = True umask = 022 [program:celery] -# TODO: Needs to be reworked to dynamically use instance group queues -command = /var/lib/awx/venv/awx/bin/celery worker -A awx -l debug --autoscale=4 -Ofair -Q tower_scheduler,tower_broadcast_all,tower,%(host_node_name)s -n celery@localhost +command = /var/lib/awx/venv/awx/bin/celery worker -A awx -l debug --autoscale=4 -Ofair -Q tower_broadcast_all -n celery@localhost directory = /var/lib/awx environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8" #user = {{ aw_user }} diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 7f72e269c4..b0700e1442 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -4,7 +4,7 @@ minfds = 4096 nodaemon=true [program:celeryd] -command = celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=/celerybeat-schedule -Q tower_scheduler,tower_broadcast_all,%(ENV_AWX_GROUP_QUEUES)s,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s +command = celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=/celerybeat-schedule -Q tower_broadcast_all -n celery@%(ENV_HOSTNAME)s autostart = true autorestart = true redirect_stderr=true