switch the periodic scheduler to a child process (instead of a thread)

I have a hunch that our usage of a daemon thread is causing import lock
contention related to https://github.com/ansible/awx/issues/5617
We've encountered similar issues before with threads across dispatcher
processes at fork time, and cpython has had bugs like this in recent
history:

https://bugs.python.org/issue38884

My gut tells me this might be related.

The prior implementation - based on celerybeat - ran its code in
a process (not a thread), and the timing of that merge matches the
period of time we started noticing issues.

Currently testing it to see if it resolves some of the issues we're
seeing.
This commit is contained in:
Ryan Petrello
2020-02-26 16:06:49 -05:00
parent 154b9c36ac
commit 5364e78397
2 changed files with 28 additions and 25 deletions

View File

@@ -1,6 +1,7 @@
import logging import logging
import threading import os
import time import time
from multiprocessing import Process
from django.conf import settings from django.conf import settings
from django.db import connections from django.db import connections
@@ -14,33 +15,36 @@ logger = logging.getLogger('awx.main.dispatch.periodic')
class Scheduler(Scheduler): class Scheduler(Scheduler):
def run_continuously(self): def run_continuously(self):
cease_continuous_run = threading.Event()
idle_seconds = max( idle_seconds = max(
1, 1,
min(self.jobs).period.total_seconds() / 2 min(self.jobs).period.total_seconds() / 2
) )
class ScheduleThread(threading.Thread): def run():
@classmethod ppid = os.getppid()
def run(cls): logger.warn(f'periodic beat started')
while not cease_continuous_run.is_set(): while True:
try: if os.getppid() != ppid:
for conn in connections.all(): # if the parent PID changes, this process has been orphaned
# If the database connection has a hiccup, re-establish a new # via e.g., segfault or sigkill, we should exit too
# connection pid = os.getpid()
conn.close_if_unusable_or_obsolete() logger.warn(f'periodic beat exiting gracefully pid:{pid}')
self.run_pending() raise SystemExit()
except Exception: try:
logger.exception( for conn in connections.all():
'encountered an error while scheduling periodic tasks' # If the database connection has a hiccup, re-establish a new
) # connection
time.sleep(idle_seconds) conn.close_if_unusable_or_obsolete()
logger.debug('periodic thread exiting...') self.run_pending()
except Exception:
logger.exception(
'encountered an error while scheduling periodic tasks'
)
time.sleep(idle_seconds)
thread = ScheduleThread() process = Process(target=run)
thread.daemon = True process.daemon = True
thread.start() process.start()
return cease_continuous_run
def run_continuously(): def run_continuously():
@@ -49,4 +53,4 @@ def run_continuously():
apply_async = TaskWorker.resolve_callable(task['task']).apply_async apply_async = TaskWorker.resolve_callable(task['task']).apply_async
total_seconds = task['schedule'].total_seconds() total_seconds = task['schedule'].total_seconds()
scheduler.every(total_seconds).seconds.do(apply_async) scheduler.every(total_seconds).seconds.do(apply_async)
return scheduler.run_continuously() scheduler.run_continuously()

View File

@@ -53,7 +53,7 @@ class Command(BaseCommand):
# spawn a daemon thread to periodically enqueues scheduled tasks # spawn a daemon thread to periodically enqueues scheduled tasks
# (like the node heartbeat) # (like the node heartbeat)
cease_continuous_run = periodic.run_continuously() periodic.run_continuously()
reaper.reap() reaper.reap()
consumer = None consumer = None
@@ -87,7 +87,6 @@ class Command(BaseCommand):
) )
consumer.run() consumer.run()
except KeyboardInterrupt: except KeyboardInterrupt:
cease_continuous_run.set()
logger.debug('Terminating Task Dispatcher') logger.debug('Terminating Task Dispatcher')
if consumer: if consumer:
consumer.stop() consumer.stop()