mirror of
https://github.com/ansible/awx.git
synced 2026-01-17 04:31:21 -03:30
Merge pull request #6093 from ryanpetrello/remove-beat-thread
switch the periodic scheduler to a background process (instead of a thread) to avoid a cpython bug Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
commit
5a164cae15
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import threading
|
||||
import os
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connections
|
||||
@ -14,33 +15,36 @@ logger = logging.getLogger('awx.main.dispatch.periodic')
|
||||
class Scheduler(Scheduler):
|
||||
|
||||
def run_continuously(self):
|
||||
cease_continuous_run = threading.Event()
|
||||
idle_seconds = max(
|
||||
1,
|
||||
min(self.jobs).period.total_seconds() / 2
|
||||
)
|
||||
|
||||
class ScheduleThread(threading.Thread):
|
||||
@classmethod
|
||||
def run(cls):
|
||||
while not cease_continuous_run.is_set():
|
||||
try:
|
||||
for conn in connections.all():
|
||||
# If the database connection has a hiccup, re-establish a new
|
||||
# connection
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
self.run_pending()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
'encountered an error while scheduling periodic tasks'
|
||||
)
|
||||
time.sleep(idle_seconds)
|
||||
logger.debug('periodic thread exiting...')
|
||||
def run():
|
||||
ppid = os.getppid()
|
||||
logger.warn(f'periodic beat started')
|
||||
while True:
|
||||
if os.getppid() != ppid:
|
||||
# if the parent PID changes, this process has been orphaned
|
||||
# via e.g., segfault or sigkill, we should exit too
|
||||
pid = os.getpid()
|
||||
logger.warn(f'periodic beat exiting gracefully pid:{pid}')
|
||||
raise SystemExit()
|
||||
try:
|
||||
for conn in connections.all():
|
||||
# If the database connection has a hiccup, re-establish a new
|
||||
# connection
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
self.run_pending()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
'encountered an error while scheduling periodic tasks'
|
||||
)
|
||||
time.sleep(idle_seconds)
|
||||
|
||||
thread = ScheduleThread()
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
return cease_continuous_run
|
||||
process = Process(target=run)
|
||||
process.daemon = True
|
||||
process.start()
|
||||
|
||||
|
||||
def run_continuously():
|
||||
@ -49,4 +53,4 @@ def run_continuously():
|
||||
apply_async = TaskWorker.resolve_callable(task['task']).apply_async
|
||||
total_seconds = task['schedule'].total_seconds()
|
||||
scheduler.every(total_seconds).seconds.do(apply_async)
|
||||
return scheduler.run_continuously()
|
||||
scheduler.run_continuously()
|
||||
|
||||
@ -53,7 +53,7 @@ class Command(BaseCommand):
|
||||
|
||||
# spawn a daemon thread to periodically enqueues scheduled tasks
|
||||
# (like the node heartbeat)
|
||||
cease_continuous_run = periodic.run_continuously()
|
||||
periodic.run_continuously()
|
||||
|
||||
reaper.reap()
|
||||
consumer = None
|
||||
@ -87,7 +87,6 @@ class Command(BaseCommand):
|
||||
)
|
||||
consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
cease_continuous_run.set()
|
||||
logger.debug('Terminating Task Dispatcher')
|
||||
if consumer:
|
||||
consumer.stop()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user