Patches to make jobs robust to database restarts (#11905)

* Simple patches to make jobs robust to database restarts

* Add some wait time before retrying loop due to DB error

* Apply dispatcher downtime setting to job updates, fix dispatcher bug

This resolves a bug where the pg_is_down property
  never had the right value
  the loop is normally stuck in the conn.events() iterator
  so it never recognized successful database interactions
  this lead to serial database outages terminating jobs

New setting for allowable PG downtime is shared with task code
  any calls to update_model will use _max_attempts parameter
  to make it align with the patience time that the dispatcher
  respects when consuming new events

* To avoid restart loops, handle DB errors on startup with prejudice

* If reconnect consistently fails, exit with non-zero code
This commit is contained in:
Alan Rominger 2022-03-30 09:14:20 -04:00 committed by GitHub
parent ef0f6ca248
commit 73e02e745a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 37 additions and 8 deletions

View File

@ -134,6 +134,13 @@ class AWXConsumerRedis(AWXConsumerBase):
class AWXConsumerPG(AWXConsumerBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pg_max_wait = settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE
# if no successful loops have ran since startup, then we should fail right away
self.pg_is_down = True # set so that we fail if we get database errors on startup
self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period
def run(self, *args, **kwargs):
super(AWXConsumerPG, self).run(*args, **kwargs)
@ -150,11 +157,28 @@ class AWXConsumerPG(AWXConsumerBase):
init = True
for e in conn.events():
self.process_task(json.loads(e.payload))
self.pg_is_down = False
if self.should_stop:
return
except psycopg2.InterfaceError:
logger.warning("Stale Postgres message bus connection, reconnecting")
continue
except (db.DatabaseError, psycopg2.OperationalError):
# If we have attained stady state operation, tolerate short-term database hickups
if not self.pg_is_down:
logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s")
self.pg_down_time = time.time()
self.pg_is_down = True
if time.time() - self.pg_down_time > self.pg_max_wait:
logger.warning(f"Postgres event consumer has not recovered in {self.pg_max_wait} s, exiting")
raise
# Wait for a second before next attempt, but still listen for any shutdown signals
for i in range(10):
if self.should_stop:
return
time.sleep(0.1)
for conn in db.connections.all():
conn.close_if_unusable_or_obsolete()
class BaseWorker(object):

View File

@ -32,9 +32,10 @@ class RunnerCallback:
self.safe_env = {}
self.event_ct = 0
self.model = model
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
def update_model(self, pk, _attempt=0, **updates):
return update_model(self.model, pk, _attempt=0, **updates)
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
def event_handler(self, event_data):
#
@ -147,7 +148,7 @@ class RunnerCallback:
Ansible runner callback to tell the job when/if it is canceled
"""
unified_job_id = self.instance.pk
self.instance.refresh_from_db()
self.instance = self.update_model(unified_job_id)
if not self.instance:
logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id))
return True

View File

@ -113,10 +113,11 @@ class BaseTask(object):
def __init__(self):
self.cleanup_paths = []
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
self.runner_callback = self.callback_class(model=self.model)
def update_model(self, pk, _attempt=0, **updates):
return update_model(self.model, pk, _attempt=0, **updates)
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
def get_path_to(self, *args):
"""

View File

@ -1,4 +1,4 @@
from django.db import transaction, DatabaseError
from django.db import transaction, DatabaseError, InterfaceError
import logging
import time
@ -7,7 +7,7 @@ import time
logger = logging.getLogger('awx.main.tasks.utils')
def update_model(model, pk, _attempt=0, **updates):
def update_model(model, pk, _attempt=0, _max_attempts=5, **updates):
"""Reload the model instance from the database and update the
given fields.
"""
@ -27,14 +27,14 @@ def update_model(model, pk, _attempt=0, **updates):
update_fields.append('failed')
instance.save(update_fields=update_fields)
return instance
except DatabaseError as e:
except (DatabaseError, InterfaceError) as e:
# Log out the error to the debug logger.
logger.debug('Database error updating %s, retrying in 5 seconds (retry #%d): %s', model._meta.object_name, _attempt + 1, e)
# Attempt to retry the update, assuming we haven't already
# tried too many times.
if _attempt < 5:
if _attempt < _max_attempts:
time.sleep(5)
return update_model(model, pk, _attempt=_attempt + 1, **updates)
return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates)
else:
logger.error('Failed to update %s after %d retries.', model._meta.object_name, _attempt)

View File

@ -426,6 +426,9 @@ CLUSTER_NODE_HEARTBEAT_PERIOD = 60
RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34
EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved
# Amount of time dispatcher will try to reconnect to database for jobs and consuming new work
DISPATCHER_DB_DOWNTOWN_TOLLERANCE = 40
BROKER_URL = 'unix:///var/run/redis/redis.sock'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},