diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 83e2226012..5f081e84f2 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -15,19 +15,18 @@ class Control(object): services = ('dispatcher', 'callback_receiver') result = None - def __init__(self, service, queuename=None, routing_key=None): + def __init__(self, service, host=None): if service not in self.services: raise RuntimeError('{} must be in {}'.format(service, self.services)) self.service = service - self.queuename = queuename or get_local_queuename() - self.routing_key = routing_key or self.queuename - self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.routing_key) + self.queuename = host or get_local_queuename() + self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename) def publish(self, msg, conn, **kwargs): producer = Producer( exchange=self.queue.exchange, channel=conn, - routing_key=self.routing_key + routing_key=self.queuename ) producer.publish(msg, expiration=5, **kwargs) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 3fc502b33e..f5b92ca8f1 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -280,11 +280,6 @@ class WorkerPool(object): logger.exception('could not kill {}'.format(worker.pid)) - def cleanup(self): - for worker in self.workers: - worker.calculate_managed_tasks() - - class AutoscalePool(WorkerPool): ''' An extended pool implementation that automatically scales workers up and diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index bc440b831e..e73ed4bade 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -56,18 +56,8 @@ class AWXConsumer(ConsumerMixin): @property def listening_on(self): - def qname(q): - if q.routing_key != q.name: - return ':'.join([q.name, q.routing_key]) - return q.name - - def qtype(q): - if q.exchange.type != 'direct': - return ' [{}]'.format(q.exchange.type) - return '' - return 'listening on {}'.format([ - '{}{}'.format(qname(q), qtype(q)) for q in self.queues + '{} [{}]'.format(q.name, q.exchange.type) for q in self.queues ]) def control(self, body, message): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 8e706f5309..51608a8b7a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -5,8 +5,6 @@ from django.conf import settings from django.core.management.base import BaseCommand from kombu import Exchange, Queue -from awx.main.dispatch import get_local_queuename -from awx.main.dispatch.control import Control from awx.main.dispatch.kombu import Connection from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker @@ -19,20 +17,7 @@ class Command(BaseCommand): ''' help = 'Launch the job callback receiver' - def add_arguments(self, parser): - parser.add_argument('--status', dest='status', action='store_true', - help='print the internal state of any running callback receiver') - def handle(self, *arg, **options): - control_routing_key = 'callback_receiver-{}-control'.format(get_local_queuename()) - if options.get('status'): - print(Control( - 'callback_receiver', - queuename=settings.CALLBACK_QUEUE, - routing_key=control_routing_key - ).status()) - return - with Connection(settings.BROKER_URL) as conn: consumer = None try: @@ -44,9 +29,8 @@ class Command(BaseCommand): Queue( settings.CALLBACK_QUEUE, Exchange(settings.CALLBACK_QUEUE, type='direct'), - routing_key=key + routing_key=settings.CALLBACK_QUEUE ) - for key in [settings.CALLBACK_QUEUE, control_routing_key] ] ) consumer.run() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 398f8bbe88..3613ac4d34 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1319,7 +1319,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique timeout = 5 try: running = self.celery_task_id in ControlDispatcher( - 'dispatcher', queuename=self.execution_node + 'dispatcher', self.execution_node ).running(timeout=timeout) except socket.timeout: logger.error('could not reach dispatcher on {} within {}s'.format(