remove task state tracking from the callback receiver

we don't have support for displaying these stats anyways, so there's
no point in using resources tracking them, especially for high-volume
installs
This commit is contained in:
Ryan Petrello 2020-09-16 13:34:22 -04:00
parent 1860a2f71d
commit 0df6409244
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777

View File

@ -27,6 +27,12 @@ else:
logger = logging.getLogger('awx.main.dispatch')
class NoOpResultQueue(object):
def put(self, item):
pass
class PoolWorker(object):
'''
Used to track a worker child process and its pending and finished messages.
@ -56,11 +62,13 @@ class PoolWorker(object):
It is "idle" when self.managed_tasks is empty.
'''
def __init__(self, queue_size, target, args):
track_managed_tasks = False
def __init__(self, queue_size, target, args, **kwargs):
self.messages_sent = 0
self.messages_finished = 0
self.managed_tasks = collections.OrderedDict()
self.finished = MPQueue(queue_size)
self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue()
self.queue = MPQueue(queue_size)
self.process = Process(target=target, args=(self.queue, self.finished) + args)
self.process.daemon = True
@ -74,7 +82,8 @@ class PoolWorker(object):
if not body.get('uuid'):
body['uuid'] = str(uuid4())
uuid = body['uuid']
self.managed_tasks[uuid] = body
if self.track_managed_tasks:
self.managed_tasks[uuid] = body
self.queue.put(body, block=True, timeout=5)
self.messages_sent += 1
self.calculate_managed_tasks()
@ -111,6 +120,8 @@ class PoolWorker(object):
return str(self.process.exitcode)
def calculate_managed_tasks(self):
if not self.track_managed_tasks:
return
# look to see if any tasks were finished
finished = []
for _ in range(self.finished.qsize()):
@ -135,6 +146,8 @@ class PoolWorker(object):
@property
def current_task(self):
if not self.track_managed_tasks:
return None
self.calculate_managed_tasks()
# the task at [0] is the one that's running right now (or is about to
# be running)
@ -145,6 +158,8 @@ class PoolWorker(object):
@property
def orphaned_tasks(self):
if not self.track_managed_tasks:
return []
orphaned = []
if not self.alive:
# if this process had a running task that never finished,
@ -179,6 +194,11 @@ class PoolWorker(object):
return not self.busy
class StatefulPoolWorker(PoolWorker):
track_managed_tasks = True
class WorkerPool(object):
'''
Creates a pool of forked PoolWorkers.
@ -200,6 +220,7 @@ class WorkerPool(object):
)
'''
pool_cls = PoolWorker
debug_meta = ''
def __init__(self, min_workers=None, queue_size=None):
@ -225,7 +246,7 @@ class WorkerPool(object):
# for the DB and cache connections (that way lies race conditions)
django_connection.close()
django_cache.close()
worker = PoolWorker(self.queue_size, self.target, (idx,) + self.target_args)
worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args)
self.workers.append(worker)
try:
worker.start()
@ -293,6 +314,8 @@ class AutoscalePool(WorkerPool):
down based on demand
'''
pool_cls = StatefulPoolWorker
def __init__(self, *args, **kwargs):
self.max_workers = kwargs.pop('max_workers', None)
super(AutoscalePool, self).__init__(*args, **kwargs)