diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index f5b38262ad..c37cd67763 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -27,6 +27,12 @@ else: logger = logging.getLogger('awx.main.dispatch') +class NoOpResultQueue(object): + + def put(self, item): + pass + + class PoolWorker(object): ''' Used to track a worker child process and its pending and finished messages. @@ -56,11 +62,13 @@ class PoolWorker(object): It is "idle" when self.managed_tasks is empty. ''' - def __init__(self, queue_size, target, args): + track_managed_tasks = False + + def __init__(self, queue_size, target, args, **kwargs): self.messages_sent = 0 self.messages_finished = 0 self.managed_tasks = collections.OrderedDict() - self.finished = MPQueue(queue_size) + self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue() self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True @@ -74,7 +82,8 @@ class PoolWorker(object): if not body.get('uuid'): body['uuid'] = str(uuid4()) uuid = body['uuid'] - self.managed_tasks[uuid] = body + if self.track_managed_tasks: + self.managed_tasks[uuid] = body self.queue.put(body, block=True, timeout=5) self.messages_sent += 1 self.calculate_managed_tasks() @@ -111,6 +120,8 @@ class PoolWorker(object): return str(self.process.exitcode) def calculate_managed_tasks(self): + if not self.track_managed_tasks: + return # look to see if any tasks were finished finished = [] for _ in range(self.finished.qsize()): @@ -135,6 +146,8 @@ class PoolWorker(object): @property def current_task(self): + if not self.track_managed_tasks: + return None self.calculate_managed_tasks() # the task at [0] is the one that's running right now (or is about to # be running) @@ -145,6 +158,8 @@ class PoolWorker(object): @property def orphaned_tasks(self): + if not self.track_managed_tasks: + return [] orphaned = [] if not self.alive: # if this process had a running task that never finished, @@ -179,6 +194,11 @@ class PoolWorker(object): return not self.busy +class StatefulPoolWorker(PoolWorker): + + track_managed_tasks = True + + class WorkerPool(object): ''' Creates a pool of forked PoolWorkers. @@ -200,6 +220,7 @@ class WorkerPool(object): ) ''' + pool_cls = PoolWorker debug_meta = '' def __init__(self, min_workers=None, queue_size=None): @@ -225,7 +246,7 @@ class WorkerPool(object): # for the DB and cache connections (that way lies race conditions) django_connection.close() django_cache.close() - worker = PoolWorker(self.queue_size, self.target, (idx,) + self.target_args) + worker = self.pool_cls(self.queue_size, self.target, (idx,) + self.target_args) self.workers.append(worker) try: worker.start() @@ -293,6 +314,8 @@ class AutoscalePool(WorkerPool): down based on demand ''' + pool_cls = StatefulPoolWorker + def __init__(self, *args, **kwargs): self.max_workers = kwargs.pop('max_workers', None) super(AutoscalePool, self).__init__(*args, **kwargs)