diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 703ad4f791..ea88c8ed8b 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -36,7 +36,6 @@ class WorkerPool(object): Example: pool = WorkerPool(workers_num=4) # spawn four worker processes - pool.init_workers(worker_instance.work_loop) """ pool_cls = PoolWorker diff --git a/awx/main/dispatch/worker/__init__.py b/awx/main/dispatch/worker/__init__.py index 1ea197bf9d..71203886eb 100644 --- a/awx/main/dispatch/worker/__init__.py +++ b/awx/main/dispatch/worker/__init__.py @@ -1,3 +1,2 @@ from .base import AWXConsumerRedis # noqa from .callback import CallbackBrokerWorker # noqa -from .task import TaskWorker # noqa diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index cb59adee98..ca7691ca22 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -71,46 +71,3 @@ class AWXConsumerRedis(AWXConsumerBase): while True: time.sleep(60) - - -class BaseWorker(object): - def read(self): - raise NotImplemented() - - def work_loop(self, idx, *args): - ppid = os.getppid() - signal_handler = WorkerSignalHandler() - set_connection_name('worker') # set application_name to distinguish from other dispatcher processes - while not signal_handler.kill_now: - # if the parent PID changes, this process has been orphaned - # via e.g., segfault or sigkill, we should exit too - if os.getppid() != ppid: - break - try: - body = self.read() # this is only for the callback, only reading from redis. - if body == 'QUIT': - break - except QueueEmpty: - continue - except Exception: - logger.exception("Exception on worker {}, reconnecting: ".format(idx)) - continue - try: - for conn in db.connections.all(): - # If the database connection has a hiccup during the prior message, close it - # so we can establish a new connection - conn.close_if_unusable_or_obsolete() - self.perform_work(body, *args) - except Exception: - logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}') - - logger.debug('worker exiting gracefully pid:{}'.format(os.getpid())) - - def perform_work(self, body): - raise NotImplementedError() - - def on_start(self): - pass - - def on_stop(self): - pass diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 37ce7aad39..b056cd5b92 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -26,7 +26,7 @@ from awx.main.models.events import emit_event_detail from awx.main.utils.profiling import AWXProfiler from awx.main.tasks.system import events_processed_hook import awx.main.analytics.subsystem_metrics as s_metrics -from .base import BaseWorker, WorkerSignalHandler +from .base import WorkerSignalHandler logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -57,7 +57,7 @@ def job_stats_wrapup(job_identifier, event=None): logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier)) -class CallbackBrokerWorker(BaseWorker): +class CallbackBrokerWorker: """ A worker implementation that deserializes callback event data and persists it into the database. @@ -84,35 +84,6 @@ class CallbackBrokerWorker(BaseWorker): for key in self.redis.keys('awx_callback_receiver_statistics_*'): self.redis.delete(key) - def work_loop(self, idx, *args): - ppid = os.getppid() - signal_handler = WorkerSignalHandler() - set_connection_name('worker') # set application_name to distinguish from other dispatcher processes - while not signal_handler.kill_now: - # if the parent PID changes, this process has been orphaned - # via e.g., segfault or sigkill, we should exit too - if os.getppid() != ppid: - break - try: - body = self.read() # this is only for the callback, only reading from redis. - if body == 'QUIT': - break - except QueueEmpty: - continue - except Exception: - logger.exception("Exception on worker {}, reconnecting: ".format(idx)) - continue - try: - for conn in db.connections.all(): - # If the database connection has a hiccup during the prior message, close it - # so we can establish a new connection - conn.close_if_unusable_or_obsolete() - self.perform_work(body, *args) - except Exception: - logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}') - - logger.debug('worker exiting gracefully pid:{}'.format(os.getpid())) - @cached_property def pid(self): """This needs to be obtained after forking, or else it will give the parent process""" @@ -181,10 +152,37 @@ class CallbackBrokerWorker(BaseWorker): filepath = self.prof.stop() logger.error(f'profiling is disabled, wrote {filepath}') - def work_loop(self, *args, **kw): + def work_loop(self, idx, *args): if settings.AWX_CALLBACK_PROFILE: signal.signal(signal.SIGUSR1, self.toggle_profiling) - return super(CallbackBrokerWorker, self).work_loop(*args, **kw) + + ppid = os.getppid() + signal_handler = WorkerSignalHandler() + set_connection_name('worker') # set application_name to distinguish from other dispatcher processes + while not signal_handler.kill_now: + # if the parent PID changes, this process has been orphaned + # via e.g., segfault or sigkill, we should exit too + if os.getppid() != ppid: + break + try: + body = self.read() # this is only for the callback, only reading from redis. + if body == 'QUIT': + break + except QueueEmpty: + continue + except Exception: + logger.exception("Exception on worker {}, reconnecting: ".format(idx)) + continue + try: + for conn in db.connections.all(): + # If the database connection has a hiccup during the prior message, close it + # so we can establish a new connection + conn.close_if_unusable_or_obsolete() + self.perform_work(body, *args) + except Exception: + logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}') + + logger.debug('worker exiting gracefully pid:{}'.format(os.getpid())) def flush(self, force=False): now = tz_now()