mirror of
https://github.com/ansible/awx.git
synced 2026-01-20 06:01:25 -03:30
celeryd attach to queues dynamically
* Based on the tower topology (Instance and InstanceGroup relationships), have celery dyamically listen to queues on boot * Add celery task capable of "refreshing" what queues each celeryd worker listens to. This will be used to support changes in the topology. * Cleaned up some celery task definitions. * Converged wrongly targeted job launch/finish messages to 'tower' queue, rather than a 1-off queue. * Dynamically route celery tasks destined for the local node * separate beat process add support for separate beat process
This commit is contained in:
parent
7bc3d85913
commit
c9ff3e99b8
2
Makefile
2
Makefile
@ -326,7 +326,7 @@ celeryd:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/awx/bin/activate; \
|
||||
fi; \
|
||||
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,$(COMPOSE_HOST),$(AWX_GROUP_QUEUES) -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid
|
||||
celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_broadcast_all -n celery@$(COMPOSE_HOST) --pidfile /tmp/celery_pid
|
||||
|
||||
# Run to start the zeromq callback receiver
|
||||
receiver:
|
||||
|
||||
@ -63,6 +63,9 @@ class Instance(models.Model):
|
||||
grace_period = settings.AWX_ISOLATED_PERIODIC_CHECK * 2
|
||||
return self.modified < ref_time - timedelta(seconds=grace_period)
|
||||
|
||||
def is_controller(self):
|
||||
return Instance.objects.filter(rampart_groups__controller__instances=self).exists()
|
||||
|
||||
|
||||
class InstanceGroup(models.Model):
|
||||
"""A model representing a Queue/Group of AWX Instances."""
|
||||
|
||||
@ -21,12 +21,12 @@ class LogErrorsTask(Task):
|
||||
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(base=LogErrorsTask)
|
||||
def run_job_launch(job_id):
|
||||
TaskManager().schedule()
|
||||
|
||||
|
||||
@shared_task
|
||||
@shared_task(base=LogErrorsTask)
|
||||
def run_job_complete(job_id):
|
||||
TaskManager().schedule()
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ except Exception:
|
||||
|
||||
# Celery
|
||||
from celery import Task, shared_task
|
||||
from celery.signals import celeryd_init, worker_process_init, worker_shutdown
|
||||
from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, beat_init
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
@ -57,6 +57,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
||||
ignore_inventory_computed_fields, ignore_inventory_group_removal,
|
||||
get_type_for_model, extract_ansible_vars)
|
||||
from awx.main.utils.reload import restart_local_services, stop_local_services
|
||||
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
|
||||
from awx.main.utils.handlers import configure_external_logger
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.conf import settings_registry
|
||||
@ -147,6 +148,37 @@ def handle_setting_changes(self, setting_keys):
|
||||
break
|
||||
|
||||
|
||||
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
|
||||
def handle_ha_toplogy_changes(self):
|
||||
instance = Instance.objects.me()
|
||||
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
|
||||
(instance, removed_queues, added_queues) = register_celery_worker_queues(self.app, self.request.hostname)
|
||||
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
|
||||
.format(instance.hostname, removed_queues, added_queues))
|
||||
updated_routes = update_celery_worker_routes(instance, settings)
|
||||
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
|
||||
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||
logger.debug("Configure celeryd queues task on host {}".format(sender.hostname))
|
||||
(instance, removed_queues, added_queues) = register_celery_worker_queues(sender.app, sender.hostname)
|
||||
logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}"
|
||||
.format(instance.hostname, removed_queues, added_queues))
|
||||
|
||||
|
||||
@beat_init.connect
|
||||
@celeryd_init.connect
|
||||
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
||||
conf = conf if conf else sender.app.conf
|
||||
logger.debug("Registering celery routes for {}".format(sender))
|
||||
instance = Instance.objects.me()
|
||||
added_routes = update_celery_worker_routes(instance, conf)
|
||||
logger.info("Workers on tower node '{}' added routes {} all routes are now {}"
|
||||
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
||||
|
||||
|
||||
@shared_task(queue='tower', base=LogErrorsTask)
|
||||
def send_notifications(notification_list, job_id=None):
|
||||
if not isinstance(notification_list, list):
|
||||
|
||||
100
awx/main/tests/unit/utils/test_ha.py
Normal file
100
awx/main/tests/unit/utils/test_ha.py
Normal file
@ -0,0 +1,100 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
||||
# All Rights Reserved.
|
||||
|
||||
# python
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
# AWX
|
||||
from awx.main.utils.ha import (
|
||||
_add_remove_celery_worker_queues,
|
||||
update_celery_worker_routes,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conf():
|
||||
class Conf():
|
||||
CELERY_ROUTES = dict()
|
||||
CELERYBEAT_SCHEDULE = dict()
|
||||
return Conf()
|
||||
|
||||
|
||||
class TestAddRemoveCeleryWorkerQueues():
|
||||
@pytest.fixture
|
||||
def instance_generator(self, mocker):
|
||||
def fn(groups=['east', 'west', 'north', 'south'], hostname='east-1'):
|
||||
instance = mocker.MagicMock()
|
||||
instance.hostname = hostname
|
||||
instance.rampart_groups = mocker.MagicMock()
|
||||
instance.rampart_groups.values_list = mocker.MagicMock(return_value=groups)
|
||||
|
||||
return instance
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def worker_queues_generator(self, mocker):
|
||||
def fn(queues=['east', 'west']):
|
||||
return [dict(name=n, alias='') for n in queues]
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def mock_app(self, mocker):
|
||||
app = mocker.MagicMock()
|
||||
app.control = mocker.MagicMock()
|
||||
app.control.cancel_consumer = mocker.MagicMock()
|
||||
return app
|
||||
|
||||
@pytest.mark.parametrize("static_queues,_worker_queues,groups,hostname,added_expected,removed_expected", [
|
||||
(['east', 'west'], ['east', 'west', 'east-1'], [], 'east-1', [], []),
|
||||
([], ['east', 'west', 'east-1'], ['east', 'west'], 'east-1', [], []),
|
||||
([], ['east', 'west'], ['east', 'west'], 'east-1', ['east-1'], []),
|
||||
([], [], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], []),
|
||||
([], ['china', 'russia'], ['east', 'west'], 'east-1', ['east', 'west', 'east-1'], ['china', 'russia']),
|
||||
])
|
||||
def test__add_remove_celery_worker_queues_noop(self, mock_app,
|
||||
instance_generator,
|
||||
worker_queues_generator,
|
||||
static_queues, _worker_queues,
|
||||
groups, hostname,
|
||||
added_expected, removed_expected):
|
||||
instance = instance_generator(groups=groups, hostname=hostname)
|
||||
worker_queues = worker_queues_generator(_worker_queues)
|
||||
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname)
|
||||
assert set(added_queues) == set(added_expected)
|
||||
assert set(removed_queues) == set(removed_expected)
|
||||
|
||||
|
||||
class TestUpdateCeleryWorkerRoutes():
|
||||
|
||||
@pytest.mark.parametrize("is_controller,expected_routes", [
|
||||
(False, {
|
||||
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
||||
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'}
|
||||
}),
|
||||
(True, {
|
||||
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
||||
'awx.main.tasks.purge_old_stdout_files': {'queue': 'east-1', 'routing_key': 'east-1'},
|
||||
'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
||||
}),
|
||||
])
|
||||
def test_update_celery_worker_routes(self, mocker, conf, is_controller, expected_routes):
|
||||
instance = mocker.MagicMock()
|
||||
instance.hostname = 'east-1'
|
||||
instance.is_controller = mocker.MagicMock(return_value=is_controller)
|
||||
|
||||
assert update_celery_worker_routes(instance, conf) == expected_routes
|
||||
assert conf.CELERY_ROUTES == expected_routes
|
||||
|
||||
def test_update_celery_worker_routes_deleted(self, mocker, conf):
|
||||
instance = mocker.MagicMock()
|
||||
instance.hostname = 'east-1'
|
||||
instance.is_controller = mocker.MagicMock(return_value=False)
|
||||
conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
|
||||
|
||||
update_celery_worker_routes(instance, conf)
|
||||
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES
|
||||
|
||||
71
awx/main/utils/ha.py
Normal file
71
awx/main/utils/ha.py
Normal file
@ -0,0 +1,71 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
||||
# All Rights Reserved.
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# AWX
|
||||
from awx.main.models import Instance
|
||||
|
||||
|
||||
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
|
||||
removed_queues = []
|
||||
added_queues = []
|
||||
ig_names = set(instance.rampart_groups.values_list('name', flat=True))
|
||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||
|
||||
|
||||
# Remove queues that aren't in the instance group
|
||||
for queue in worker_queues:
|
||||
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
|
||||
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
|
||||
continue
|
||||
|
||||
if queue['name'] not in ig_names | set([instance.hostname]):
|
||||
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
|
||||
removed_queues.append(queue['name'])
|
||||
|
||||
# Add queues for instance and instance groups
|
||||
for queue_name in ig_names | set([instance.hostname]):
|
||||
if queue_name not in worker_queue_names:
|
||||
app.control.add_consumer(queue_name, reply=True, destination=[worker_name])
|
||||
added_queues.append(queue_name)
|
||||
|
||||
return (added_queues, removed_queues)
|
||||
|
||||
|
||||
def update_celery_worker_routes(instance, conf):
|
||||
tasks = [
|
||||
'awx.main.tasks.cluster_node_heartbeat',
|
||||
'awx.main.tasks.purge_old_stdout_files',
|
||||
]
|
||||
routes_updated = {}
|
||||
|
||||
# Instance is, effectively, a controller node
|
||||
if instance.is_controller():
|
||||
tasks.append('awx.main.tasks.awx_isolated_heartbeat')
|
||||
else:
|
||||
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES:
|
||||
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
|
||||
|
||||
for t in tasks:
|
||||
conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
|
||||
routes_updated[t] = conf.CELERY_ROUTES[t]
|
||||
|
||||
return routes_updated
|
||||
|
||||
|
||||
def register_celery_worker_queues(app, celery_worker_name):
|
||||
instance = Instance.objects.me()
|
||||
added_queues = []
|
||||
removed_queues = []
|
||||
|
||||
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
||||
|
||||
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name)
|
||||
|
||||
return (instance, removed_queues, added_queues)
|
||||
|
||||
@ -392,6 +392,18 @@ EMAIL_HOST_USER = ''
|
||||
EMAIL_HOST_PASSWORD = ''
|
||||
EMAIL_USE_TLS = False
|
||||
|
||||
# The number of seconds to sleep between status checks for jobs running on isolated nodes
|
||||
AWX_ISOLATED_CHECK_INTERVAL = 30
|
||||
|
||||
# The timeout (in seconds) for launching jobs on isolated nodes
|
||||
AWX_ISOLATED_LAUNCH_TIMEOUT = 600
|
||||
|
||||
# Ansible connection timeout (in seconds) for communicating with isolated instances
|
||||
AWX_ISOLATED_CONNECTION_TIMEOUT = 10
|
||||
|
||||
# The time (in seconds) between the periodic isolated heartbeat status check
|
||||
AWX_ISOLATED_PERIODIC_CHECK = 600
|
||||
|
||||
# Memcached django cache configuration
|
||||
# CACHES = {
|
||||
# 'default': {
|
||||
@ -435,20 +447,12 @@ CELERY_BEAT_MAX_LOOP_INTERVAL = 60
|
||||
CELERY_RESULT_BACKEND = 'django-db'
|
||||
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
|
||||
CELERY_TASK_QUEUES = (
|
||||
Queue('default', Exchange('default'), routing_key='default'),
|
||||
Queue('tower', Exchange('tower'), routing_key='tower'),
|
||||
Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False),
|
||||
Broadcast('tower_broadcast_all')
|
||||
)
|
||||
CELERY_TASK_ROUTES = {
|
||||
'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', 'routing_key': 'tower'},
|
||||
'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.launch'},
|
||||
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'tower_scheduler', 'routing_key': 'tower_scheduler.job.complete'},
|
||||
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', 'routing_key': 'cluster.heartbeat'},
|
||||
'awx.main.tasks.purge_old_stdout_files': {'queue': 'default', 'routing_key': 'cluster.heartbeat'},
|
||||
}
|
||||
CELERY_TASK_ROUTES = {}
|
||||
|
||||
CELERY_BEAT_SCHEDULE = {
|
||||
CELERYBEAT_SCHEDULE = {
|
||||
'tower_scheduler': {
|
||||
'task': 'awx.main.tasks.awx_periodic_scheduler',
|
||||
'schedule': timedelta(seconds=30),
|
||||
@ -474,11 +478,21 @@ CELERY_BEAT_SCHEDULE = {
|
||||
'task_manager': {
|
||||
'task': 'awx.main.scheduler.tasks.run_task_manager',
|
||||
'schedule': timedelta(seconds=20),
|
||||
'options': {'expires': 20,}
|
||||
'options': {'expires': 20}
|
||||
},
|
||||
'isolated_heartbeat': {
|
||||
'task': 'awx.main.tasks.awx_isolated_heartbeat',
|
||||
'schedule': timedelta(seconds=AWX_ISOLATED_PERIODIC_CHECK),
|
||||
'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2},
|
||||
}
|
||||
}
|
||||
AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
|
||||
|
||||
# Celery queues that will always be listened to by celery workers
|
||||
# Note: Broadcast queues have unique, auto-generated names, with the alias
|
||||
# property value of the original queue name.
|
||||
AWX_CELERY_QUEUES_STATIC = ['tower_broadcast_all',]
|
||||
|
||||
# Django Caching Configuration
|
||||
if is_testing():
|
||||
CACHES = {
|
||||
@ -627,18 +641,6 @@ AWX_ANSIBLE_CALLBACK_PLUGINS = ""
|
||||
# Time at which an HA node is considered active
|
||||
AWX_ACTIVE_NODE_TIME = 7200
|
||||
|
||||
# The number of seconds to sleep between status checks for jobs running on isolated nodes
|
||||
AWX_ISOLATED_CHECK_INTERVAL = 30
|
||||
|
||||
# The timeout (in seconds) for launching jobs on isolated nodes
|
||||
AWX_ISOLATED_LAUNCH_TIMEOUT = 600
|
||||
|
||||
# Ansible connection timeout (in seconds) for communicating with isolated instances
|
||||
AWX_ISOLATED_CONNECTION_TIMEOUT = 10
|
||||
|
||||
# The time (in seconds) between the periodic isolated heartbeat status check
|
||||
AWX_ISOLATED_PERIODIC_CHECK = 600
|
||||
|
||||
# Enable Pendo on the UI, possible values are 'off', 'anonymous', and 'detailed'
|
||||
# Note: This setting may be overridden by database settings.
|
||||
PENDO_TRACKING_STATE = "off"
|
||||
|
||||
@ -138,15 +138,6 @@ except ImportError:
|
||||
sys.exit(1)
|
||||
|
||||
CLUSTER_HOST_ID = socket.gethostname()
|
||||
CELERY_TASK_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}
|
||||
# Production only runs this schedule on controlling nodes
|
||||
# but development will just run it on all nodes
|
||||
CELERY_TASK_ROUTES['awx.main.tasks.awx_isolated_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}
|
||||
CELERY_BEAT_SCHEDULE['isolated_heartbeat'] = {
|
||||
'task': 'awx.main.tasks.awx_isolated_heartbeat',
|
||||
'schedule': timedelta(seconds = AWX_ISOLATED_PERIODIC_CHECK),
|
||||
'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2,}
|
||||
}
|
||||
|
||||
# Supervisor service name dictionary used for programatic restart
|
||||
SERVICE_NAME_DICT = {
|
||||
|
||||
@ -31,9 +31,6 @@ AWX_PROOT_ENABLED = False
|
||||
|
||||
CLUSTER_HOST_ID = "awx"
|
||||
SYSTEM_UUID = '00000000-0000-0000-0000-000000000000'
|
||||
CELERY_TASK_QUEUES += (Queue(CLUSTER_HOST_ID, Exchange(CLUSTER_HOST_ID), routing_key=CLUSTER_HOST_ID),)
|
||||
CELERY_TASK_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}
|
||||
CELERY_TASK_ROUTES['awx.main.tasks.purge_old_stdout_files'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}
|
||||
|
||||
|
||||
###############################################################################
|
||||
|
||||
@ -3,8 +3,7 @@ nodaemon = True
|
||||
umask = 022
|
||||
|
||||
[program:celery]
|
||||
# TODO: Needs to be reworked to dynamically use instance group queues
|
||||
command = /var/lib/awx/venv/awx/bin/celery worker -A awx -l debug --autoscale=4 -Ofair -Q tower_scheduler,tower_broadcast_all,tower,%(host_node_name)s -n celery@localhost
|
||||
command = /var/lib/awx/venv/awx/bin/celery worker -A awx -l debug --autoscale=4 -Ofair -Q tower_broadcast_all -n celery@localhost
|
||||
directory = /var/lib/awx
|
||||
environment = LANGUAGE="en_US.UTF-8",LANG="en_US.UTF-8",LC_ALL="en_US.UTF-8",LC_CTYPE="en_US.UTF-8"
|
||||
#user = {{ aw_user }}
|
||||
|
||||
@ -4,7 +4,7 @@ minfds = 4096
|
||||
nodaemon=true
|
||||
|
||||
[program:celeryd]
|
||||
command = celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=/celerybeat-schedule -Q tower_scheduler,tower_broadcast_all,%(ENV_AWX_GROUP_QUEUES)s,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s
|
||||
command = celery worker -A awx -l DEBUG -B -Ofair --autoscale=100,4 --schedule=/celerybeat-schedule -Q tower_broadcast_all -n celery@%(ENV_HOSTNAME)s
|
||||
autostart = true
|
||||
autorestart = true
|
||||
redirect_stderr=true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user