From 5364e7839724026ad4ae49120c4b38dd8ab24326 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 26 Feb 2020 16:06:49 -0500 Subject: [PATCH] switch the periodic scheduler to a child process (instead of a thread) I have a hunch that our usage of a daemon thread is causing import lock contention related to https://github.com/ansible/awx/issues/5617 We've encountered similar issues before with threads across dispatcher processes at fork time, and cpython has had bugs like this in recent history: https://bugs.python.org/issue38884 My gut tells me this might be related. The prior implementation - based on celerybeat - ran its code in a process (not a thread), and the timing of that merge matches the period of time we started noticing issues. Currently testing it to see if it resolves some of the issues we're seeing. --- awx/main/dispatch/periodic.py | 50 ++++++++++--------- .../management/commands/run_dispatcher.py | 3 +- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/awx/main/dispatch/periodic.py b/awx/main/dispatch/periodic.py index 4d839d0521..2ad6f7119c 100644 --- a/awx/main/dispatch/periodic.py +++ b/awx/main/dispatch/periodic.py @@ -1,6 +1,7 @@ import logging -import threading +import os import time +from multiprocessing import Process from django.conf import settings from django.db import connections @@ -14,33 +15,36 @@ logger = logging.getLogger('awx.main.dispatch.periodic') class Scheduler(Scheduler): def run_continuously(self): - cease_continuous_run = threading.Event() idle_seconds = max( 1, min(self.jobs).period.total_seconds() / 2 ) - class ScheduleThread(threading.Thread): - @classmethod - def run(cls): - while not cease_continuous_run.is_set(): - try: - for conn in connections.all(): - # If the database connection has a hiccup, re-establish a new - # connection - conn.close_if_unusable_or_obsolete() - self.run_pending() - except Exception: - logger.exception( - 'encountered an error while scheduling periodic tasks' - ) - time.sleep(idle_seconds) - logger.debug('periodic thread exiting...') + def run(): + ppid = os.getppid() + logger.warn(f'periodic beat started') + while True: + if os.getppid() != ppid: + # if the parent PID changes, this process has been orphaned + # via e.g., segfault or sigkill, we should exit too + pid = os.getpid() + logger.warn(f'periodic beat exiting gracefully pid:{pid}') + raise SystemExit() + try: + for conn in connections.all(): + # If the database connection has a hiccup, re-establish a new + # connection + conn.close_if_unusable_or_obsolete() + self.run_pending() + except Exception: + logger.exception( + 'encountered an error while scheduling periodic tasks' + ) + time.sleep(idle_seconds) - thread = ScheduleThread() - thread.daemon = True - thread.start() - return cease_continuous_run + process = Process(target=run) + process.daemon = True + process.start() def run_continuously(): @@ -49,4 +53,4 @@ def run_continuously(): apply_async = TaskWorker.resolve_callable(task['task']).apply_async total_seconds = task['schedule'].total_seconds() scheduler.every(total_seconds).seconds.do(apply_async) - return scheduler.run_continuously() + scheduler.run_continuously() diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 77e1eefe58..9fd9c3256d 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -53,7 +53,7 @@ class Command(BaseCommand): # spawn a daemon thread to periodically enqueues scheduled tasks # (like the node heartbeat) - cease_continuous_run = periodic.run_continuously() + periodic.run_continuously() reaper.reap() consumer = None @@ -87,7 +87,6 @@ class Command(BaseCommand): ) consumer.run() except KeyboardInterrupt: - cease_continuous_run.set() logger.debug('Terminating Task Dispatcher') if consumer: consumer.stop()