diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 6965416c94..34443b70b2 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -134,6 +134,13 @@ class AWXConsumerRedis(AWXConsumerBase): class AWXConsumerPG(AWXConsumerBase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pg_max_wait = settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE + # if no successful loops have ran since startup, then we should fail right away + self.pg_is_down = True # set so that we fail if we get database errors on startup + self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period + def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs) @@ -150,11 +157,28 @@ class AWXConsumerPG(AWXConsumerBase): init = True for e in conn.events(): self.process_task(json.loads(e.payload)) + self.pg_is_down = False if self.should_stop: return except psycopg2.InterfaceError: logger.warning("Stale Postgres message bus connection, reconnecting") continue + except (db.DatabaseError, psycopg2.OperationalError): + # If we have attained stady state operation, tolerate short-term database hickups + if not self.pg_is_down: + logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s") + self.pg_down_time = time.time() + self.pg_is_down = True + if time.time() - self.pg_down_time > self.pg_max_wait: + logger.warning(f"Postgres event consumer has not recovered in {self.pg_max_wait} s, exiting") + raise + # Wait for a second before next attempt, but still listen for any shutdown signals + for i in range(10): + if self.should_stop: + return + time.sleep(0.1) + for conn in db.connections.all(): + conn.close_if_unusable_or_obsolete() class BaseWorker(object): diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index b1a4c450e5..f92a6028d2 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -32,9 +32,10 @@ class RunnerCallback: self.safe_env = {} self.event_ct = 0 self.model = model + self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) def update_model(self, pk, _attempt=0, **updates): - return update_model(self.model, pk, _attempt=0, **updates) + return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) def event_handler(self, event_data): # @@ -147,7 +148,7 @@ class RunnerCallback: Ansible runner callback to tell the job when/if it is canceled """ unified_job_id = self.instance.pk - self.instance.refresh_from_db() + self.instance = self.update_model(unified_job_id) if not self.instance: logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id)) return True diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 6fb1613f0f..39702d513f 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -113,10 +113,11 @@ class BaseTask(object): def __init__(self): self.cleanup_paths = [] + self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) self.runner_callback = self.callback_class(model=self.model) def update_model(self, pk, _attempt=0, **updates): - return update_model(self.model, pk, _attempt=0, **updates) + return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) def get_path_to(self, *args): """ diff --git a/awx/main/utils/update_model.py b/awx/main/utils/update_model.py index 95c261cd6f..73acefd1fc 100644 --- a/awx/main/utils/update_model.py +++ b/awx/main/utils/update_model.py @@ -1,4 +1,4 @@ -from django.db import transaction, DatabaseError +from django.db import transaction, DatabaseError, InterfaceError import logging import time @@ -7,7 +7,7 @@ import time logger = logging.getLogger('awx.main.tasks.utils') -def update_model(model, pk, _attempt=0, **updates): +def update_model(model, pk, _attempt=0, _max_attempts=5, **updates): """Reload the model instance from the database and update the given fields. """ @@ -27,14 +27,14 @@ def update_model(model, pk, _attempt=0, **updates): update_fields.append('failed') instance.save(update_fields=update_fields) return instance - except DatabaseError as e: + except (DatabaseError, InterfaceError) as e: # Log out the error to the debug logger. logger.debug('Database error updating %s, retrying in 5 seconds (retry #%d): %s', model._meta.object_name, _attempt + 1, e) # Attempt to retry the update, assuming we haven't already # tried too many times. - if _attempt < 5: + if _attempt < _max_attempts: time.sleep(5) - return update_model(model, pk, _attempt=_attempt + 1, **updates) + return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates) else: logger.error('Failed to update %s after %d retries.', model._meta.object_name, _attempt) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f1da7c7061..e570727790 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -426,6 +426,9 @@ CLUSTER_NODE_HEARTBEAT_PERIOD = 60 RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved +# Amount of time dispatcher will try to reconnect to database for jobs and consuming new work +DISPATCHER_DB_DOWNTOWN_TOLLERANCE = 40 + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},