mirror of
https://github.com/ansible/awx.git
synced 2026-03-19 01:47:31 -02:30
use non-unicode queue names
* Use unicode InstanceGroup and queue names up until the point we actually create the queue * kombu add_consumers returns a dict with a value that contians the passed in queue name. Trouble is, the returned dict value is a string and not a unicode string and this results in an error.
This commit is contained in:
@@ -3,6 +3,9 @@
|
|||||||
# Copyright (c) 2017 Ansible Tower by Red Hat
|
# Copyright (c) 2017 Ansible Tower by Red Hat
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
|
||||||
|
# Python
|
||||||
|
import six
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
@@ -13,7 +16,7 @@ from awx.main.models import Instance
|
|||||||
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
||||||
removed_queues = []
|
removed_queues = []
|
||||||
added_queues = []
|
added_queues = []
|
||||||
ig_names = set(['tower_instance_router'])
|
ig_names = set([six.text_type('tower_instance_router')])
|
||||||
hostnames = set([instance.hostname for instance in controlled_instances])
|
hostnames = set([instance.hostname for instance in controlled_instances])
|
||||||
for instance in controlled_instances:
|
for instance in controlled_instances:
|
||||||
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
||||||
@@ -26,14 +29,14 @@ def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, w
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if queue['name'] not in ig_names | hostnames or not instance.enabled:
|
if queue['name'] not in ig_names | hostnames or not instance.enabled:
|
||||||
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
|
app.control.cancel_consumer(queue['name'].encode("utf8"), reply=True, destination=[worker_name])
|
||||||
removed_queues.append(queue['name'])
|
removed_queues.append(queue['name'].encode("utf8"))
|
||||||
|
|
||||||
# Add queues for instance and instance groups
|
# Add queues for instance and instance groups
|
||||||
for queue_name in ig_names | hostnames:
|
for queue_name in ig_names | hostnames:
|
||||||
if queue_name not in worker_queue_names:
|
if queue_name not in worker_queue_names:
|
||||||
app.control.add_consumer(queue_name, reply=True, destination=[worker_name])
|
app.control.add_consumer(queue_name.encode("utf8"), reply=True, destination=[worker_name])
|
||||||
added_queues.append(queue_name)
|
added_queues.append(queue_name.encode("utf8"))
|
||||||
|
|
||||||
return (added_queues, removed_queues)
|
return (added_queues, removed_queues)
|
||||||
|
|
||||||
@@ -52,7 +55,7 @@ def update_celery_worker_routes(instance, conf):
|
|||||||
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
|
del conf.CELERY_ROUTES['awx.main.tasks.awx_isolated_heartbeat']
|
||||||
|
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
conf.CELERY_ROUTES[t] = {'queue': instance.hostname, 'routing_key': instance.hostname}
|
conf.CELERY_ROUTES[t] = {'queue': instance.hostname.encode("utf8"), 'routing_key': instance.hostname.encode("utf8")}
|
||||||
routes_updated[t] = conf.CELERY_ROUTES[t]
|
routes_updated[t] = conf.CELERY_ROUTES[t]
|
||||||
|
|
||||||
return routes_updated
|
return routes_updated
|
||||||
|
|||||||
Reference in New Issue
Block a user