mirror of
https://github.com/ansible/awx.git
synced 2026-03-13 15:09:32 -02:30
disabled instance stay subscribed to bcast queue
* A disabled node needs to stay subscribed to the broadcast queue because the work to re-subscribe the node to queues when the node is re-enabled is done over the broadcast queue.
This commit is contained in:
@@ -25,10 +25,11 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
|||||||
|
|
||||||
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
bcast_queue_names = set([construct_bcast_queue_name(n) for n in settings.AWX_CELERY_BCAST_QUEUES_STATIC])
|
||||||
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
all_queue_names = ig_names | hostnames | set(settings.AWX_CELERY_QUEUES_STATIC)
|
||||||
|
desired_queues = bcast_queue_names | (all_queue_names if instance.enabled else set())
|
||||||
|
|
||||||
# Remove queues that aren't in the instance group
|
# Remove queues
|
||||||
for queue_name in worker_queue_names:
|
for queue_name in worker_queue_names:
|
||||||
if queue_name not in all_queue_names | bcast_queue_names or not instance.enabled:
|
if queue_name not in desired_queues:
|
||||||
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
app.control.cancel_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||||
removed_queues.append(queue_name.encode("utf8"))
|
removed_queues.append(queue_name.encode("utf8"))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user