mirror of
https://github.com/ansible/awx.git
synced 2026-02-04 02:58:13 -03:30
add support for awx-manage run_callback_receiver --status
This commit is contained in:
@@ -15,18 +15,19 @@ class Control(object):
|
|||||||
services = ('dispatcher', 'callback_receiver')
|
services = ('dispatcher', 'callback_receiver')
|
||||||
result = None
|
result = None
|
||||||
|
|
||||||
def __init__(self, service, host=None):
|
def __init__(self, service, queuename=None, routing_key=None):
|
||||||
if service not in self.services:
|
if service not in self.services:
|
||||||
raise RuntimeError('{} must be in {}'.format(service, self.services))
|
raise RuntimeError('{} must be in {}'.format(service, self.services))
|
||||||
self.service = service
|
self.service = service
|
||||||
self.queuename = host or get_local_queuename()
|
self.queuename = queuename or get_local_queuename()
|
||||||
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename)
|
self.routing_key = routing_key or self.queuename
|
||||||
|
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.routing_key)
|
||||||
|
|
||||||
def publish(self, msg, conn, **kwargs):
|
def publish(self, msg, conn, **kwargs):
|
||||||
producer = Producer(
|
producer = Producer(
|
||||||
exchange=self.queue.exchange,
|
exchange=self.queue.exchange,
|
||||||
channel=conn,
|
channel=conn,
|
||||||
routing_key=self.queuename
|
routing_key=self.routing_key
|
||||||
)
|
)
|
||||||
producer.publish(msg, expiration=5, **kwargs)
|
producer.publish(msg, expiration=5, **kwargs)
|
||||||
|
|
||||||
|
|||||||
@@ -280,6 +280,11 @@ class WorkerPool(object):
|
|||||||
logger.exception('could not kill {}'.format(worker.pid))
|
logger.exception('could not kill {}'.format(worker.pid))
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
for worker in self.workers:
|
||||||
|
worker.calculate_managed_tasks()
|
||||||
|
|
||||||
|
|
||||||
class AutoscalePool(WorkerPool):
|
class AutoscalePool(WorkerPool):
|
||||||
'''
|
'''
|
||||||
An extended pool implementation that automatically scales workers up and
|
An extended pool implementation that automatically scales workers up and
|
||||||
|
|||||||
@@ -56,8 +56,18 @@ class AWXConsumer(ConsumerMixin):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def listening_on(self):
|
def listening_on(self):
|
||||||
|
def qname(q):
|
||||||
|
if q.routing_key != q.name:
|
||||||
|
return ':'.join([q.name, q.routing_key])
|
||||||
|
return q.name
|
||||||
|
|
||||||
|
def qtype(q):
|
||||||
|
if q.exchange.type != 'direct':
|
||||||
|
return ' [{}]'.format(q.exchange.type)
|
||||||
|
return ''
|
||||||
|
|
||||||
return 'listening on {}'.format([
|
return 'listening on {}'.format([
|
||||||
'{} [{}]'.format(q.name, q.exchange.type) for q in self.queues
|
'{}{}'.format(qname(q), qtype(q)) for q in self.queues
|
||||||
])
|
])
|
||||||
|
|
||||||
def control(self, body, message):
|
def control(self, body, message):
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ from django.conf import settings
|
|||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from kombu import Exchange, Queue
|
from kombu import Exchange, Queue
|
||||||
|
|
||||||
|
from awx.main.dispatch import get_local_queuename
|
||||||
|
from awx.main.dispatch.control import Control
|
||||||
from awx.main.dispatch.kombu import Connection
|
from awx.main.dispatch.kombu import Connection
|
||||||
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
|
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
|
||||||
|
|
||||||
@@ -17,7 +19,20 @@ class Command(BaseCommand):
|
|||||||
'''
|
'''
|
||||||
help = 'Launch the job callback receiver'
|
help = 'Launch the job callback receiver'
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument('--status', dest='status', action='store_true',
|
||||||
|
help='print the internal state of any running callback receiver')
|
||||||
|
|
||||||
def handle(self, *arg, **options):
|
def handle(self, *arg, **options):
|
||||||
|
control_routing_key = 'callback_receiver-{}-control'.format(get_local_queuename())
|
||||||
|
if options.get('status'):
|
||||||
|
print(Control(
|
||||||
|
'callback_receiver',
|
||||||
|
queuename=settings.CALLBACK_QUEUE,
|
||||||
|
routing_key=control_routing_key
|
||||||
|
).status())
|
||||||
|
return
|
||||||
|
|
||||||
with Connection(settings.BROKER_URL) as conn:
|
with Connection(settings.BROKER_URL) as conn:
|
||||||
consumer = None
|
consumer = None
|
||||||
try:
|
try:
|
||||||
@@ -29,8 +44,9 @@ class Command(BaseCommand):
|
|||||||
Queue(
|
Queue(
|
||||||
settings.CALLBACK_QUEUE,
|
settings.CALLBACK_QUEUE,
|
||||||
Exchange(settings.CALLBACK_QUEUE, type='direct'),
|
Exchange(settings.CALLBACK_QUEUE, type='direct'),
|
||||||
routing_key=settings.CALLBACK_QUEUE
|
routing_key=key
|
||||||
)
|
)
|
||||||
|
for key in [settings.CALLBACK_QUEUE, control_routing_key]
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
consumer.run()
|
consumer.run()
|
||||||
|
|||||||
@@ -1319,7 +1319,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
timeout = 5
|
timeout = 5
|
||||||
try:
|
try:
|
||||||
running = self.celery_task_id in ControlDispatcher(
|
running = self.celery_task_id in ControlDispatcher(
|
||||||
'dispatcher', self.execution_node
|
'dispatcher', queuename=self.execution_node
|
||||||
).running(timeout=timeout)
|
).running(timeout=timeout)
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
logger.error('could not reach dispatcher on {} within {}s'.format(
|
logger.error('could not reach dispatcher on {} within {}s'.format(
|
||||||
|
|||||||
Reference in New Issue
Block a user