mirror of
https://github.com/ansible/awx.git
synced 2026-03-11 14:39:30 -02:30
Merge pull request #1638 from chrismeyersfsu/fix-isolated_heartbeat
control celery routes using celery router
This commit is contained in:
@@ -62,7 +62,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
|||||||
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
|
from awx.main.utils.ha import register_celery_worker_queues
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.conf import settings_registry
|
from awx.conf import settings_registry
|
||||||
|
|
||||||
@@ -205,39 +205,24 @@ def handle_ha_toplogy_changes(self):
|
|||||||
awx_app = Celery('awx')
|
awx_app = Celery('awx')
|
||||||
awx_app.config_from_object('django.conf:settings')
|
awx_app.config_from_object('django.conf:settings')
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||||
for instance in instances:
|
if len(removed_queues) + len(added_queues) > 0:
|
||||||
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||||
updated_routes = update_celery_worker_routes(instance, settings)
|
|
||||||
logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}")
|
|
||||||
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
|
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
@worker_ready.connect
|
||||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||||
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||||
for instance in instances:
|
if len(removed_queues) + len(added_queues) > 0:
|
||||||
logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}")
|
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||||
|
|
||||||
# Expedite the first hearbeat run so a node comes online quickly.
|
# Expedite the first hearbeat run so a node comes online quickly.
|
||||||
cluster_node_heartbeat.apply([])
|
cluster_node_heartbeat.apply([])
|
||||||
apply_cluster_membership_policies.apply([])
|
apply_cluster_membership_policies.apply([])
|
||||||
|
|
||||||
|
|
||||||
@celeryd_init.connect
|
|
||||||
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
|
||||||
conf = conf if conf else sender.app.conf
|
|
||||||
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
|
|
||||||
(changed, instance) = Instance.objects.get_or_register()
|
|
||||||
if changed:
|
|
||||||
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
|
||||||
added_routes = update_celery_worker_routes(instance, conf)
|
|
||||||
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
|
|
||||||
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
|
||||||
|
|
||||||
|
|
||||||
@celeryd_after_setup.connect
|
@celeryd_after_setup.connect
|
||||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||||
(changed, tower_instance) = Instance.objects.get_or_register()
|
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||||
|
|||||||
@@ -11,18 +11,10 @@ from contextlib import nested
|
|||||||
# AWX
|
# AWX
|
||||||
from awx.main.utils.ha import (
|
from awx.main.utils.ha import (
|
||||||
_add_remove_celery_worker_queues,
|
_add_remove_celery_worker_queues,
|
||||||
update_celery_worker_routes,
|
AWXCeleryRouter,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def conf():
|
|
||||||
class Conf():
|
|
||||||
CELERY_ROUTES = dict()
|
|
||||||
CELERYBEAT_SCHEDULE = dict()
|
|
||||||
return Conf()
|
|
||||||
|
|
||||||
|
|
||||||
class TestAddRemoveCeleryWorkerQueues():
|
class TestAddRemoveCeleryWorkerQueues():
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def instance_generator(self, mocker):
|
def instance_generator(self, mocker):
|
||||||
@@ -73,7 +65,7 @@ class TestAddRemoveCeleryWorkerQueues():
|
|||||||
assert set(removed_queues) == set(removed_expected)
|
assert set(removed_queues) == set(removed_expected)
|
||||||
|
|
||||||
|
|
||||||
class TestUpdateCeleryWorkerRoutes():
|
class TestUpdateCeleryWorkerRouter():
|
||||||
|
|
||||||
@pytest.mark.parametrize("is_controller,expected_routes", [
|
@pytest.mark.parametrize("is_controller,expected_routes", [
|
||||||
(False, {
|
(False, {
|
||||||
@@ -86,20 +78,16 @@ class TestUpdateCeleryWorkerRoutes():
|
|||||||
'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
'awx.main.tasks.awx_isolated_heartbeat': {'queue': 'east-1', 'routing_key': 'east-1'},
|
||||||
}),
|
}),
|
||||||
])
|
])
|
||||||
def test_update_celery_worker_routes(self, mocker, conf, is_controller, expected_routes):
|
def test_update_celery_worker_routes(self, mocker, is_controller, expected_routes):
|
||||||
instance = mocker.MagicMock()
|
def get_or_register():
|
||||||
instance.hostname = 'east-1'
|
instance = mock.MagicMock()
|
||||||
instance.is_controller = mocker.MagicMock(return_value=is_controller)
|
instance.hostname = 'east-1'
|
||||||
|
instance.is_controller = mock.MagicMock(return_value=is_controller)
|
||||||
|
return (False, instance)
|
||||||
|
|
||||||
assert update_celery_worker_routes(instance, conf) == expected_routes
|
with mock.patch('awx.main.models.Instance.objects.get_or_register', get_or_register):
|
||||||
assert conf.CELERY_ROUTES == expected_routes
|
router = AWXCeleryRouter()
|
||||||
|
|
||||||
def test_update_celery_worker_routes_deleted(self, mocker, conf):
|
for k,v in expected_routes.iteritems():
|
||||||
instance = mocker.MagicMock()
|
assert router.route_for_task(k) == v
|
||||||
instance.hostname = 'east-1'
|
|
||||||
instance.is_controller = mocker.MagicMock(return_value=False)
|
|
||||||
conf.CELERY_ROUTES = {'awx.main.tasks.awx_isolated_heartbeat': 'foobar'}
|
|
||||||
|
|
||||||
update_celery_worker_routes(instance, conf)
|
|
||||||
assert 'awx.main.tasks.awx_isolated_heartbeat' not in conf.CELERY_ROUTES
|
|
||||||
|
|
||||||
|
|||||||
@@ -52,24 +52,21 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
|||||||
return (added_queues, removed_queues)
|
return (added_queues, removed_queues)
|
||||||
|
|
||||||
|
|
||||||
def update_celery_worker_routes(instance, conf):
|
class AWXCeleryRouter(object):
|
||||||
tasks = [
|
def route_for_task(self, task, args=None, kwargs=None):
|
||||||
'awx.main.tasks.cluster_node_heartbeat',
|
(changed, instance) = Instance.objects.get_or_register()
|
||||||
'awx.main.tasks.purge_old_stdout_files',
|
tasks = [
|
||||||
]
|
'awx.main.tasks.cluster_node_heartbeat',
|
||||||
routes_updated = {}
|
'awx.main.tasks.purge_old_stdout_files',
|
||||||
# Instance is, effectively, a controller node
|
]
|
||||||
if instance.is_controller():
|
isolated_tasks = [
|
||||||
tasks.append('awx.main.tasks.awx_isolated_heartbeat')
|
'awx.main.tasks.awx_isolated_heartbeat',
|
||||||
else:
|
]
|
||||||
if 'awx.main.tasks.awx_isolated_heartbeat' in conf.CELERY_ROUTES:
|
if task in tasks:
|
||||||
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
|
return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
||||||
|
|
||||||
for t in tasks:
|
if instance.is_controller() and task in isolated_tasks:
|
||||||
conf.CELERY_ROUTES[t] = {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
return {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
||||||
routes_updated[t] = conf.CELERY_ROUTES[t]
|
|
||||||
|
|
||||||
return routes_updated
|
|
||||||
|
|
||||||
|
|
||||||
def register_celery_worker_queues(app, celery_worker_name):
|
def register_celery_worker_queues(app, celery_worker_name):
|
||||||
|
|||||||
@@ -454,6 +454,9 @@ BROKER_POOL_LIMIT = None
|
|||||||
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
|
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
|
||||||
CELERY_EVENT_QUEUE_TTL = 5
|
CELERY_EVENT_QUEUE_TTL = 5
|
||||||
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
|
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
|
||||||
|
CELERY_DEFAULT_EXCHANGE = 'awx_private_queue'
|
||||||
|
CELERY_DEFAULT_ROUTING_KEY = 'awx_private_queue'
|
||||||
|
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
|
||||||
CELERY_TASK_SERIALIZER = 'json'
|
CELERY_TASK_SERIALIZER = 'json'
|
||||||
CELERY_RESULT_SERIALIZER = 'json'
|
CELERY_RESULT_SERIALIZER = 'json'
|
||||||
CELERY_ACCEPT_CONTENT = ['json']
|
CELERY_ACCEPT_CONTENT = ['json']
|
||||||
@@ -465,7 +468,7 @@ CELERYD_AUTOSCALER = 'awx.main.utils.autoscale:DynamicAutoScaler'
|
|||||||
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
|
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
|
||||||
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
|
CELERY_IMPORTS = ('awx.main.scheduler.tasks',)
|
||||||
CELERY_QUEUES = ()
|
CELERY_QUEUES = ()
|
||||||
CELERY_ROUTES = {}
|
CELERY_ROUTES = ('awx.main.utils.ha.AWXCeleryRouter',)
|
||||||
|
|
||||||
|
|
||||||
def log_celery_failure(*args):
|
def log_celery_failure(*args):
|
||||||
|
|||||||
Reference in New Issue
Block a user