mirror of
https://github.com/ansible/awx.git
synced 2026-03-04 18:21:03 -03:30
Prevent Dispatcher deadlock when Redis disappears (#14249)
This fixes https://github.com/ansible/awx/issues/14245 which has more information about this issue. This change addresses both: - A clashing signal handler (registering a callback to fire when the task manager times out, and hitting that callback in cases where we didn't expect to). Make dispatcher timeout use SIGUSR1, not SIGTERM. - Metrics not being reported should not make us crash, so that is now fixed as well. Signed-off-by: Rick Elrod <rick@elrod.me> Co-authored-by: Alan Rominger <arominge@redhat.com>
This commit is contained in:
@@ -417,16 +417,16 @@ class AutoscalePool(WorkerPool):
|
|||||||
# the task manager to never do more work
|
# the task manager to never do more work
|
||||||
current_task = w.current_task
|
current_task = w.current_task
|
||||||
if current_task and isinstance(current_task, dict):
|
if current_task and isinstance(current_task, dict):
|
||||||
endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager']
|
endings = ('tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager')
|
||||||
current_task_name = current_task.get('task', '')
|
current_task_name = current_task.get('task', '')
|
||||||
if any(current_task_name.endswith(e) for e in endings):
|
if current_task_name.endswith(endings):
|
||||||
if 'started' not in current_task:
|
if 'started' not in current_task:
|
||||||
w.managed_tasks[current_task['uuid']]['started'] = time.time()
|
w.managed_tasks[current_task['uuid']]['started'] = time.time()
|
||||||
age = time.time() - current_task['started']
|
age = time.time() - current_task['started']
|
||||||
w.managed_tasks[current_task['uuid']]['age'] = age
|
w.managed_tasks[current_task['uuid']]['age'] = age
|
||||||
if age > self.task_manager_timeout:
|
if age > self.task_manager_timeout:
|
||||||
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}')
|
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGUSR1 to {w.pid}')
|
||||||
os.kill(w.pid, signal.SIGTERM)
|
os.kill(w.pid, signal.SIGUSR1)
|
||||||
|
|
||||||
for m in orphaned:
|
for m in orphaned:
|
||||||
# if all the workers are dead, spawn at least one
|
# if all the workers are dead, spawn at least one
|
||||||
|
|||||||
@@ -121,10 +121,9 @@ class AWXConsumerBase(object):
|
|||||||
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
||||||
try:
|
try:
|
||||||
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
|
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
|
||||||
self.last_stats = time.time()
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
|
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
|
||||||
self.last_stats = time.time()
|
self.last_stats = time.time()
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
signal.signal(signal.SIGINT, self.stop)
|
signal.signal(signal.SIGINT, self.stop)
|
||||||
@@ -175,9 +174,12 @@ class AWXConsumerPG(AWXConsumerBase):
|
|||||||
|
|
||||||
# record subsystem metrics for the dispatcher
|
# record subsystem metrics for the dispatcher
|
||||||
if current_time - self.last_metrics_gather > 20:
|
if current_time - self.last_metrics_gather > 20:
|
||||||
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
try:
|
||||||
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
||||||
self.subsystem_metrics.pipe_execute()
|
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
||||||
|
self.subsystem_metrics.pipe_execute()
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"encountered an error trying to store {self.name} metrics")
|
||||||
self.listen_cumulative_time = 0.0
|
self.listen_cumulative_time = 0.0
|
||||||
self.last_metrics_gather = current_time
|
self.last_metrics_gather = current_time
|
||||||
|
|
||||||
@@ -250,8 +252,8 @@ class BaseWorker(object):
|
|||||||
break
|
break
|
||||||
except QueueEmpty:
|
except QueueEmpty:
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.error("Exception on worker {}, restarting: ".format(idx) + str(e))
|
logger.exception("Exception on worker {}, reconnecting: ".format(idx))
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
for conn in db.connections.all():
|
for conn in db.connections.all():
|
||||||
|
|||||||
@@ -102,27 +102,33 @@ class TaskBase:
|
|||||||
|
|
||||||
def record_aggregate_metrics(self, *args):
|
def record_aggregate_metrics(self, *args):
|
||||||
if not is_testing():
|
if not is_testing():
|
||||||
# increment task_manager_schedule_calls regardless if the other
|
try:
|
||||||
# metrics are recorded
|
# increment task_manager_schedule_calls regardless if the other
|
||||||
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
|
# metrics are recorded
|
||||||
# Only record metrics if the last time recording was more
|
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
|
||||||
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
# Only record metrics if the last time recording was more
|
||||||
# Prevents a short-duration task manager that runs directly after a
|
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
||||||
# long task manager to override useful metrics.
|
# Prevents a short-duration task manager that runs directly after a
|
||||||
current_time = time.time()
|
# long task manager to override useful metrics.
|
||||||
time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
|
current_time = time.time()
|
||||||
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
|
time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
|
||||||
logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
|
||||||
self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
|
logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
||||||
self.subsystem_metrics.pipe_execute()
|
self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
|
||||||
else:
|
self.subsystem_metrics.pipe_execute()
|
||||||
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
else:
|
||||||
|
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"Error saving metrics for {self.prefix}")
|
||||||
|
|
||||||
def record_aggregate_metrics_and_exit(self, *args):
|
def record_aggregate_metrics_and_exit(self, *args):
|
||||||
self.record_aggregate_metrics()
|
self.record_aggregate_metrics()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
|
# Always be able to restore the original signal handler if we finish
|
||||||
|
original_sigusr1 = signal.getsignal(signal.SIGUSR1)
|
||||||
|
|
||||||
# Lock
|
# Lock
|
||||||
with task_manager_bulk_reschedule():
|
with task_manager_bulk_reschedule():
|
||||||
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
|
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
|
||||||
@@ -131,9 +137,14 @@ class TaskBase:
|
|||||||
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
|
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
|
||||||
return
|
return
|
||||||
logger.debug(f"Starting {self.prefix} Scheduler")
|
logger.debug(f"Starting {self.prefix} Scheduler")
|
||||||
# if sigterm due to timeout, still record metrics
|
# if sigusr1 due to timeout, still record metrics
|
||||||
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
|
signal.signal(signal.SIGUSR1, self.record_aggregate_metrics_and_exit)
|
||||||
self._schedule()
|
try:
|
||||||
|
self._schedule()
|
||||||
|
finally:
|
||||||
|
# Reset the signal handler back to the default just in case anything
|
||||||
|
# else uses the same signal for other purposes
|
||||||
|
signal.signal(signal.SIGUSR1, original_sigusr1)
|
||||||
commit_start = time.time()
|
commit_start = time.time()
|
||||||
|
|
||||||
if self.prefix == "task_manager":
|
if self.prefix == "task_manager":
|
||||||
|
|||||||
Reference in New Issue
Block a user