mirror of
https://github.com/ansible/awx.git
synced 2026-05-18 06:47:41 -02:30
Merge pull request #12527 from AlanCoding/offline_db
Further resiliency changes, specifically focused on case of database going offline
This commit is contained in:
@@ -80,7 +80,7 @@ def _ctit_db_wrapper(trans_safe=False):
|
|||||||
yield
|
yield
|
||||||
except DBError as exc:
|
except DBError as exc:
|
||||||
if trans_safe:
|
if trans_safe:
|
||||||
level = logger.exception
|
level = logger.warning
|
||||||
if isinstance(exc, ProgrammingError):
|
if isinstance(exc, ProgrammingError):
|
||||||
if 'relation' in str(exc) and 'does not exist' in str(exc):
|
if 'relation' in str(exc) and 'does not exist' in str(exc):
|
||||||
# this generally means we can't fetch Tower configuration
|
# this generally means we can't fetch Tower configuration
|
||||||
@@ -89,7 +89,7 @@ def _ctit_db_wrapper(trans_safe=False):
|
|||||||
# has come up *before* the database has finished migrating, and
|
# has come up *before* the database has finished migrating, and
|
||||||
# especially that the conf.settings table doesn't exist yet
|
# especially that the conf.settings table doesn't exist yet
|
||||||
level = logger.debug
|
level = logger.debug
|
||||||
level('Database settings are not available, using defaults.')
|
level(f'Database settings are not available, using defaults. error: {str(exc)}')
|
||||||
else:
|
else:
|
||||||
logger.exception('Error modifying something related to database settings.')
|
logger.exception('Error modifying something related to database settings.')
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -313,7 +313,12 @@ class Metrics:
|
|||||||
self.previous_send_metrics.set(current_time)
|
self.previous_send_metrics.set(current_time)
|
||||||
self.previous_send_metrics.store_value(self.conn)
|
self.previous_send_metrics.store_value(self.conn)
|
||||||
finally:
|
finally:
|
||||||
lock.release()
|
try:
|
||||||
|
lock.release()
|
||||||
|
except Exception as exc:
|
||||||
|
# After system failures, we might throw redis.exceptions.LockNotOwnedError
|
||||||
|
# this is to avoid print a Traceback, and importantly, avoid raising an exception into parent context
|
||||||
|
logger.warning(f'Error releasing subsystem metrics redis lock, error: {str(exc)}')
|
||||||
|
|
||||||
def load_other_metrics(self, request):
|
def load_other_metrics(self, request):
|
||||||
# data received from other nodes are stored in their own keys
|
# data received from other nodes are stored in their own keys
|
||||||
|
|||||||
@@ -167,17 +167,28 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
try:
|
try:
|
||||||
cls.objects.bulk_create(events)
|
cls.objects.bulk_create(events)
|
||||||
metrics_bulk_events_saved += len(events)
|
metrics_bulk_events_saved += len(events)
|
||||||
except Exception:
|
except Exception as exc:
|
||||||
|
logger.warning(f'Error in events bulk_create, will try indiviually up to 5 errors, error {str(exc)}')
|
||||||
# if an exception occurs, we should re-attempt to save the
|
# if an exception occurs, we should re-attempt to save the
|
||||||
# events one-by-one, because something in the list is
|
# events one-by-one, because something in the list is
|
||||||
# broken/stale
|
# broken/stale
|
||||||
|
consecutive_errors = 0
|
||||||
|
events_saved = 0
|
||||||
metrics_events_batch_save_errors += 1
|
metrics_events_batch_save_errors += 1
|
||||||
for e in events:
|
for e in events:
|
||||||
try:
|
try:
|
||||||
e.save()
|
e.save()
|
||||||
metrics_singular_events_saved += 1
|
events_saved += 1
|
||||||
except Exception:
|
consecutive_errors = 0
|
||||||
logger.exception('Database Error Saving Job Event')
|
except Exception as exc_indv:
|
||||||
|
consecutive_errors += 1
|
||||||
|
logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}')
|
||||||
|
|
||||||
|
if consecutive_errors >= 5:
|
||||||
|
raise
|
||||||
|
metrics_singular_events_saved += events_saved
|
||||||
|
if events_saved == 0:
|
||||||
|
raise
|
||||||
metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save
|
metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save
|
||||||
for e in events:
|
for e in events:
|
||||||
if not getattr(e, '_skip_websocket_message', False):
|
if not getattr(e, '_skip_websocket_message', False):
|
||||||
@@ -257,17 +268,18 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
try:
|
try:
|
||||||
self.flush(force=flush)
|
self.flush(force=flush)
|
||||||
break
|
break
|
||||||
except (OperationalError, InterfaceError, InternalError):
|
except (OperationalError, InterfaceError, InternalError) as exc:
|
||||||
if retries >= self.MAX_RETRIES:
|
if retries >= self.MAX_RETRIES:
|
||||||
logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.')
|
logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.')
|
||||||
return
|
return
|
||||||
delay = 60 * retries
|
delay = 60 * retries
|
||||||
logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format(i=retries + 1, delay=delay))
|
logger.warning(f'Database Error Flushing Job Events, retry #{retries + 1} in {delay} seconds: {str(exc)}')
|
||||||
django_connection.close()
|
django_connection.close()
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
retries += 1
|
retries += 1
|
||||||
except DatabaseError:
|
except DatabaseError:
|
||||||
logger.exception('Database Error Saving Job Event')
|
logger.exception('Database Error Flushing Job Events')
|
||||||
|
django_connection.close()
|
||||||
break
|
break
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
tb = traceback.format_exc()
|
tb = traceback.format_exc()
|
||||||
|
|||||||
@@ -95,8 +95,13 @@ class Command(BaseCommand):
|
|||||||
# database migrations are still running
|
# database migrations are still running
|
||||||
from awx.main.models.ha import Instance
|
from awx.main.models.ha import Instance
|
||||||
|
|
||||||
executor = MigrationExecutor(connection)
|
try:
|
||||||
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
|
executor = MigrationExecutor(connection)
|
||||||
|
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.info(f'Error on startup of run_wsbroadcast (error: {exc}), retry in 10s...')
|
||||||
|
time.sleep(10)
|
||||||
|
return
|
||||||
|
|
||||||
# In containerized deployments, migrations happen in the task container,
|
# In containerized deployments, migrations happen in the task container,
|
||||||
# and the services running there don't start until migrations are
|
# and the services running there don't start until migrations are
|
||||||
|
|||||||
47
awx/main/tests/functional/utils/test_update_model.py
Normal file
47
awx/main/tests/functional/utils/test_update_model.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from django.db import DatabaseError
|
||||||
|
|
||||||
|
from awx.main.models.jobs import Job
|
||||||
|
from awx.main.utils.update_model import update_model
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def normal_job(deploy_jobtemplate):
|
||||||
|
return deploy_jobtemplate.create_unified_job()
|
||||||
|
|
||||||
|
|
||||||
|
class NewException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_normal_get(normal_job):
|
||||||
|
mod_job = Job.objects.get(pk=normal_job.id)
|
||||||
|
mod_job.job_explanation = 'foobar'
|
||||||
|
mod_job.save(update_fields=['job_explanation'])
|
||||||
|
new_job = update_model(Job, normal_job.pk)
|
||||||
|
assert new_job.job_explanation == 'foobar'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_exception(normal_job, mocker):
|
||||||
|
mocker.patch.object(Job.objects, 'get', side_effect=DatabaseError)
|
||||||
|
mocker.patch('awx.main.utils.update_model.time.sleep')
|
||||||
|
with pytest.raises(DatabaseError):
|
||||||
|
update_model(Job, normal_job.pk)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_unknown_exception(normal_job, mocker):
|
||||||
|
mocker.patch.object(Job.objects, 'get', side_effect=NewException)
|
||||||
|
mocker.patch('awx.main.utils.update_model.time.sleep')
|
||||||
|
with pytest.raises(NewException):
|
||||||
|
update_model(Job, normal_job.pk)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_deleted_job(normal_job):
|
||||||
|
job_pk = normal_job.pk
|
||||||
|
normal_job.delete()
|
||||||
|
assert update_model(Job, job_pk) is None
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
from django.db import transaction, DatabaseError, InterfaceError
|
from django.db import transaction, DatabaseError, InterfaceError
|
||||||
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
@@ -32,6 +33,8 @@ def update_model(model, pk, _attempt=0, _max_attempts=5, select_for_update=False
|
|||||||
update_fields.append('failed')
|
update_fields.append('failed')
|
||||||
instance.save(update_fields=update_fields)
|
instance.save(update_fields=update_fields)
|
||||||
return instance
|
return instance
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
return None
|
||||||
except (DatabaseError, InterfaceError) as e:
|
except (DatabaseError, InterfaceError) as e:
|
||||||
# Log out the error to the debug logger.
|
# Log out the error to the debug logger.
|
||||||
logger.debug('Database error updating %s, retrying in 5 seconds (retry #%d): %s', model._meta.object_name, _attempt + 1, e)
|
logger.debug('Database error updating %s, retrying in 5 seconds (retry #%d): %s', model._meta.object_name, _attempt + 1, e)
|
||||||
@@ -45,4 +48,5 @@ def update_model(model, pk, _attempt=0, _max_attempts=5, select_for_update=False
|
|||||||
raise RuntimeError(f'Could not fetch {pk} because of receiving abort signal')
|
raise RuntimeError(f'Could not fetch {pk} because of receiving abort signal')
|
||||||
return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates)
|
return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates)
|
||||||
else:
|
else:
|
||||||
logger.error('Failed to update %s after %d retries.', model._meta.object_name, _attempt)
|
logger.warning(f'Failed to update {model._meta.object_name} pk={pk} after {_attempt} retries.')
|
||||||
|
raise
|
||||||
|
|||||||
Reference in New Issue
Block a user