From 73e02e745aa3d76a43c75b25176bd9972cd82a26 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 30 Mar 2022 09:14:20 -0400 Subject: [PATCH] Patches to make jobs robust to database restarts (#11905) * Simple patches to make jobs robust to database restarts * Add some wait time before retrying loop due to DB error * Apply dispatcher downtime setting to job updates, fix dispatcher bug This resolves a bug where the pg_is_down property never had the right value the loop is normally stuck in the conn.events() iterator so it never recognized successful database interactions this lead to serial database outages terminating jobs New setting for allowable PG downtime is shared with task code any calls to update_model will use _max_attempts parameter to make it align with the patience time that the dispatcher respects when consuming new events * To avoid restart loops, handle DB errors on startup with prejudice * If reconnect consistently fails, exit with non-zero code --- awx/main/dispatch/worker/base.py | 24 ++++++++++++++++++++++++ awx/main/tasks/callback.py | 5 +++-- awx/main/tasks/jobs.py | 3 ++- awx/main/utils/update_model.py | 10 +++++----- awx/settings/defaults.py | 3 +++ 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 6965416c94..34443b70b2 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -134,6 +134,13 @@ class AWXConsumerRedis(AWXConsumerBase): class AWXConsumerPG(AWXConsumerBase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pg_max_wait = settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE + # if no successful loops have ran since startup, then we should fail right away + self.pg_is_down = True # set so that we fail if we get database errors on startup + self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period + def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs) @@ -150,11 +157,28 @@ class AWXConsumerPG(AWXConsumerBase): init = True for e in conn.events(): self.process_task(json.loads(e.payload)) + self.pg_is_down = False if self.should_stop: return except psycopg2.InterfaceError: logger.warning("Stale Postgres message bus connection, reconnecting") continue + except (db.DatabaseError, psycopg2.OperationalError): + # If we have attained stady state operation, tolerate short-term database hickups + if not self.pg_is_down: + logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s") + self.pg_down_time = time.time() + self.pg_is_down = True + if time.time() - self.pg_down_time > self.pg_max_wait: + logger.warning(f"Postgres event consumer has not recovered in {self.pg_max_wait} s, exiting") + raise + # Wait for a second before next attempt, but still listen for any shutdown signals + for i in range(10): + if self.should_stop: + return + time.sleep(0.1) + for conn in db.connections.all(): + conn.close_if_unusable_or_obsolete() class BaseWorker(object): diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index b1a4c450e5..f92a6028d2 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -32,9 +32,10 @@ class RunnerCallback: self.safe_env = {} self.event_ct = 0 self.model = model + self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) def update_model(self, pk, _attempt=0, **updates): - return update_model(self.model, pk, _attempt=0, **updates) + return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) def event_handler(self, event_data): # @@ -147,7 +148,7 @@ class RunnerCallback: Ansible runner callback to tell the job when/if it is canceled """ unified_job_id = self.instance.pk - self.instance.refresh_from_db() + self.instance = self.update_model(unified_job_id) if not self.instance: logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id)) return True diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 6fb1613f0f..39702d513f 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -113,10 +113,11 @@ class BaseTask(object): def __init__(self): self.cleanup_paths = [] + self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) self.runner_callback = self.callback_class(model=self.model) def update_model(self, pk, _attempt=0, **updates): - return update_model(self.model, pk, _attempt=0, **updates) + return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) def get_path_to(self, *args): """ diff --git a/awx/main/utils/update_model.py b/awx/main/utils/update_model.py index 95c261cd6f..73acefd1fc 100644 --- a/awx/main/utils/update_model.py +++ b/awx/main/utils/update_model.py @@ -1,4 +1,4 @@ -from django.db import transaction, DatabaseError +from django.db import transaction, DatabaseError, InterfaceError import logging import time @@ -7,7 +7,7 @@ import time logger = logging.getLogger('awx.main.tasks.utils') -def update_model(model, pk, _attempt=0, **updates): +def update_model(model, pk, _attempt=0, _max_attempts=5, **updates): """Reload the model instance from the database and update the given fields. """ @@ -27,14 +27,14 @@ def update_model(model, pk, _attempt=0, **updates): update_fields.append('failed') instance.save(update_fields=update_fields) return instance - except DatabaseError as e: + except (DatabaseError, InterfaceError) as e: # Log out the error to the debug logger. logger.debug('Database error updating %s, retrying in 5 seconds (retry #%d): %s', model._meta.object_name, _attempt + 1, e) # Attempt to retry the update, assuming we haven't already # tried too many times. - if _attempt < 5: + if _attempt < _max_attempts: time.sleep(5) - return update_model(model, pk, _attempt=_attempt + 1, **updates) + return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates) else: logger.error('Failed to update %s after %d retries.', model._meta.object_name, _attempt) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f1da7c7061..e570727790 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -426,6 +426,9 @@ CLUSTER_NODE_HEARTBEAT_PERIOD = 60 RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved +# Amount of time dispatcher will try to reconnect to database for jobs and consuming new work +DISPATCHER_DB_DOWNTOWN_TOLLERANCE = 40 + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},