mirror of
https://github.com/ansible/awx.git
synced 2026-03-09 13:39:27 -02:30
Add back hazmat for config and remove baseworker
* added back hazmat per @alancoding feedback around config * removed baseworker completely and refactored it into the callback worker
This commit is contained in:
@@ -36,7 +36,6 @@ class WorkerPool(object):
|
|||||||
|
|
||||||
Example:
|
Example:
|
||||||
pool = WorkerPool(workers_num=4) # spawn four worker processes
|
pool = WorkerPool(workers_num=4) # spawn four worker processes
|
||||||
pool.init_workers(worker_instance.work_loop)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
pool_cls = PoolWorker
|
pool_cls = PoolWorker
|
||||||
|
|||||||
@@ -1,3 +1,2 @@
|
|||||||
from .base import AWXConsumerRedis # noqa
|
from .base import AWXConsumerRedis # noqa
|
||||||
from .callback import CallbackBrokerWorker # noqa
|
from .callback import CallbackBrokerWorker # noqa
|
||||||
from .task import TaskWorker # noqa
|
|
||||||
|
|||||||
@@ -71,46 +71,3 @@ class AWXConsumerRedis(AWXConsumerBase):
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
||||||
|
|
||||||
class BaseWorker(object):
|
|
||||||
def read(self):
|
|
||||||
raise NotImplemented()
|
|
||||||
|
|
||||||
def work_loop(self, idx, *args):
|
|
||||||
ppid = os.getppid()
|
|
||||||
signal_handler = WorkerSignalHandler()
|
|
||||||
set_connection_name('worker') # set application_name to distinguish from other dispatcher processes
|
|
||||||
while not signal_handler.kill_now:
|
|
||||||
# if the parent PID changes, this process has been orphaned
|
|
||||||
# via e.g., segfault or sigkill, we should exit too
|
|
||||||
if os.getppid() != ppid:
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
body = self.read() # this is only for the callback, only reading from redis.
|
|
||||||
if body == 'QUIT':
|
|
||||||
break
|
|
||||||
except QueueEmpty:
|
|
||||||
continue
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Exception on worker {}, reconnecting: ".format(idx))
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
for conn in db.connections.all():
|
|
||||||
# If the database connection has a hiccup during the prior message, close it
|
|
||||||
# so we can establish a new connection
|
|
||||||
conn.close_if_unusable_or_obsolete()
|
|
||||||
self.perform_work(body, *args)
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
|
|
||||||
|
|
||||||
logger.debug('worker exiting gracefully pid:{}'.format(os.getpid()))
|
|
||||||
|
|
||||||
def perform_work(self, body):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def on_start(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_stop(self):
|
|
||||||
pass
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ from awx.main.models.events import emit_event_detail
|
|||||||
from awx.main.utils.profiling import AWXProfiler
|
from awx.main.utils.profiling import AWXProfiler
|
||||||
from awx.main.tasks.system import events_processed_hook
|
from awx.main.tasks.system import events_processed_hook
|
||||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
import awx.main.analytics.subsystem_metrics as s_metrics
|
||||||
from .base import BaseWorker, WorkerSignalHandler
|
from .base import WorkerSignalHandler
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||||
|
|
||||||
@@ -57,7 +57,7 @@ def job_stats_wrapup(job_identifier, event=None):
|
|||||||
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
|
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
|
||||||
|
|
||||||
|
|
||||||
class CallbackBrokerWorker(BaseWorker):
|
class CallbackBrokerWorker:
|
||||||
"""
|
"""
|
||||||
A worker implementation that deserializes callback event data and persists
|
A worker implementation that deserializes callback event data and persists
|
||||||
it into the database.
|
it into the database.
|
||||||
@@ -84,35 +84,6 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
|
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
|
||||||
self.redis.delete(key)
|
self.redis.delete(key)
|
||||||
|
|
||||||
def work_loop(self, idx, *args):
|
|
||||||
ppid = os.getppid()
|
|
||||||
signal_handler = WorkerSignalHandler()
|
|
||||||
set_connection_name('worker') # set application_name to distinguish from other dispatcher processes
|
|
||||||
while not signal_handler.kill_now:
|
|
||||||
# if the parent PID changes, this process has been orphaned
|
|
||||||
# via e.g., segfault or sigkill, we should exit too
|
|
||||||
if os.getppid() != ppid:
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
body = self.read() # this is only for the callback, only reading from redis.
|
|
||||||
if body == 'QUIT':
|
|
||||||
break
|
|
||||||
except QueueEmpty:
|
|
||||||
continue
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Exception on worker {}, reconnecting: ".format(idx))
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
for conn in db.connections.all():
|
|
||||||
# If the database connection has a hiccup during the prior message, close it
|
|
||||||
# so we can establish a new connection
|
|
||||||
conn.close_if_unusable_or_obsolete()
|
|
||||||
self.perform_work(body, *args)
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
|
|
||||||
|
|
||||||
logger.debug('worker exiting gracefully pid:{}'.format(os.getpid()))
|
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
def pid(self):
|
def pid(self):
|
||||||
"""This needs to be obtained after forking, or else it will give the parent process"""
|
"""This needs to be obtained after forking, or else it will give the parent process"""
|
||||||
@@ -181,10 +152,37 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
filepath = self.prof.stop()
|
filepath = self.prof.stop()
|
||||||
logger.error(f'profiling is disabled, wrote {filepath}')
|
logger.error(f'profiling is disabled, wrote {filepath}')
|
||||||
|
|
||||||
def work_loop(self, *args, **kw):
|
def work_loop(self, idx, *args):
|
||||||
if settings.AWX_CALLBACK_PROFILE:
|
if settings.AWX_CALLBACK_PROFILE:
|
||||||
signal.signal(signal.SIGUSR1, self.toggle_profiling)
|
signal.signal(signal.SIGUSR1, self.toggle_profiling)
|
||||||
return super(CallbackBrokerWorker, self).work_loop(*args, **kw)
|
|
||||||
|
ppid = os.getppid()
|
||||||
|
signal_handler = WorkerSignalHandler()
|
||||||
|
set_connection_name('worker') # set application_name to distinguish from other dispatcher processes
|
||||||
|
while not signal_handler.kill_now:
|
||||||
|
# if the parent PID changes, this process has been orphaned
|
||||||
|
# via e.g., segfault or sigkill, we should exit too
|
||||||
|
if os.getppid() != ppid:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
body = self.read() # this is only for the callback, only reading from redis.
|
||||||
|
if body == 'QUIT':
|
||||||
|
break
|
||||||
|
except QueueEmpty:
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Exception on worker {}, reconnecting: ".format(idx))
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
for conn in db.connections.all():
|
||||||
|
# If the database connection has a hiccup during the prior message, close it
|
||||||
|
# so we can establish a new connection
|
||||||
|
conn.close_if_unusable_or_obsolete()
|
||||||
|
self.perform_work(body, *args)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
|
||||||
|
|
||||||
|
logger.debug('worker exiting gracefully pid:{}'.format(os.getpid()))
|
||||||
|
|
||||||
def flush(self, force=False):
|
def flush(self, force=False):
|
||||||
now = tz_now()
|
now = tz_now()
|
||||||
|
|||||||
Reference in New Issue
Block a user