mirror of
https://github.com/ansible/awx.git
synced 2026-05-20 07:17:40 -02:30
control celery routes using celery router
* Each time a route is needed (i.e. when a task is sumitted to celery). The router will be queried. This is ideal. With the previous method we had to consider how a change in the routes would propogate to all celery workers and nodes. * fully describe the default awx queue * Our dynamic queue registration would correct awx_private_queue. However, we don't want celery to even create an "invalid"/extra queue-exchange-route. This change makes sure we don't create extranious things in rabbitmq. * reduce the cluster queue registration output. Only output when the queue registration list changes.
This commit is contained in:
@@ -61,7 +61,7 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
||||
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
||||
from awx.main.utils.reload import stop_local_services
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
|
||||
from awx.main.utils.ha import register_celery_worker_queues
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.conf import settings_registry
|
||||
|
||||
@@ -201,39 +201,24 @@ def handle_ha_toplogy_changes(self):
|
||||
awx_app = Celery('awx')
|
||||
awx_app.config_from_object('django.conf:settings')
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||
for instance in instances:
|
||||
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||
.format(instance.hostname, removed_queues, added_queues))
|
||||
updated_routes = update_celery_worker_routes(instance, settings)
|
||||
logger.info(six.text_type("Worker on tower node '{}' updated celery routes {} all routes are now {}")
|
||||
.format(instance.hostname, updated_routes, self.app.conf.CELERY_ROUTES))
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||
for instance in instances:
|
||||
logger.info(six.text_type("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}")
|
||||
.format(instance.hostname, removed_queues, added_queues))
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
|
||||
# Expedite the first hearbeat run so a node comes online quickly.
|
||||
cluster_node_heartbeat.apply([])
|
||||
apply_cluster_membership_policies.apply([])
|
||||
|
||||
|
||||
@celeryd_init.connect
|
||||
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
||||
conf = conf if conf else sender.app.conf
|
||||
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
|
||||
(changed, instance) = Instance.objects.get_or_register()
|
||||
if changed:
|
||||
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
||||
added_routes = update_celery_worker_routes(instance, conf)
|
||||
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
|
||||
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
||||
|
||||
|
||||
@celeryd_after_setup.connect
|
||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||
|
||||
Reference in New Issue
Block a user