mirror of
https://github.com/ansible/awx.git
synced 2026-01-17 04:31:21 -03:30
remove isolated instance group queue listening
This commit is contained in:
parent
1cea20092c
commit
fb11967114
@ -209,19 +209,19 @@ def handle_ha_toplogy_changes(self):
|
||||
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
||||
awx_app = Celery('awx')
|
||||
awx_app.config_from_object('django.conf:settings')
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||
removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||
.format(self.request.hostname, removed_queues, added_queues))
|
||||
|
||||
|
||||
@worker_ready.connect
|
||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||
logger.debug(six.text_type("Configure celeryd queues task on host {}").format(sender.hostname))
|
||||
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||
removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||
if len(removed_queues) + len(added_queues) > 0:
|
||||
logger.info(six.text_type("Workers on tower node(s) '{}' removed from queues {} and added to queues {}")
|
||||
.format([i.hostname for i in instances], removed_queues, added_queues))
|
||||
logger.info(six.text_type("Workers on tower node '{}' removed from queues {} and added to queues {}")
|
||||
.format(sender.hostname, removed_queues, added_queues))
|
||||
|
||||
# Expedite the first hearbeat run so a node comes online quickly.
|
||||
cluster_node_heartbeat.apply([])
|
||||
|
||||
@ -61,7 +61,7 @@ class TestAddRemoveCeleryWorkerQueues():
|
||||
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues),
|
||||
mock.patch('awx.main.utils.ha.settings.AWX_CELERY_BCAST_QUEUES_STATIC', broadcast_queues),
|
||||
mock.patch('awx.main.utils.ha.settings.CLUSTER_HOST_ID', hostname)):
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname)
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname)
|
||||
assert set(added_expected) == set(added_queues)
|
||||
assert set(removed_expected) == set(removed_queues)
|
||||
|
||||
|
||||
@ -14,14 +14,13 @@ def construct_bcast_queue_name(common_name):
|
||||
return common_name.encode('utf8') + '_' + settings.CLUSTER_HOST_ID
|
||||
|
||||
|
||||
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
||||
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
|
||||
removed_queues = []
|
||||
added_queues = []
|
||||
hostnames = set([instance.hostname for instance in controlled_instances])
|
||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||
|
||||
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
||||
all_queue_names = hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||
all_queue_names = set([instance.hostname]) | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||
desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set())
|
||||
|
||||
# Remove queues
|
||||
@ -69,18 +68,13 @@ class AWXCeleryRouter(object):
|
||||
|
||||
def register_celery_worker_queues(app, celery_worker_name):
|
||||
instance = Instance.objects.me()
|
||||
controlled_instances = [instance]
|
||||
if instance.is_controller():
|
||||
controlled_instances.extend(Instance.objects.filter(
|
||||
rampart_groups__controller__instances__hostname=instance.hostname
|
||||
))
|
||||
added_queues = []
|
||||
removed_queues = []
|
||||
|
||||
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
||||
|
||||
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
|
||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance,
|
||||
celery_worker_queues, celery_worker_name)
|
||||
return (controlled_instances, removed_queues, added_queues)
|
||||
return (removed_queues, added_queues)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user