From abdd91bd1f7f0f04d721830bf86d819031aa7ef6 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 23 Jan 2017 14:39:03 -0500 Subject: [PATCH] Fixes for data corruption/exception in cache usage Specifically as it relates to serializers and job event writing at high speeds --- awx/conf/settings.py | 5 ++++- awx/main/management/commands/run_callback_receiver.py | 8 ++++++-- awx/main/models/jobs.py | 5 ++++- awx/main/tasks.py | 5 +++-- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/awx/conf/settings.py b/awx/conf/settings.py index de8c82e1c5..d5e379ba9f 100644 --- a/awx/conf/settings.py +++ b/awx/conf/settings.py @@ -162,7 +162,10 @@ class SettingsWrapper(UserSettingsHolder): def _get_local(self, name): self._preload_cache() cache_key = Setting.get_cache_key(name) - cache_value = cache.get(cache_key, empty) + try: + cache_value = cache.get(cache_key, empty) + except ValueError: + cache_value = empty logger.debug('cache get(%r, %r) -> %r', cache_key, empty, cache_value) if cache_value == SETTING_CACHE_NOTSET: value = empty diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 314650283d..9da8ac9bd0 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -17,6 +17,7 @@ from django.conf import settings from django.core.management.base import NoArgsCommand from django.db import connection as django_connection from django.db import DatabaseError +from django.core.cache import cache as django_cache # AWX from awx.main.models import * # noqa @@ -46,6 +47,7 @@ class CallbackBrokerWorker(ConsumerMixin): if use_workers: django_connection.close() + django_cache.close() for idx in range(settings.JOB_EVENT_WORKERS): queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) w = Process(target=self.callback_worker, args=(queue_actual, idx,)) @@ -85,8 +87,9 @@ class CallbackBrokerWorker(ConsumerMixin): return queue_actual except Exception: import traceback - traceback.print_exc() + tb = traceback.format_exc() logger.warn("Could not write to queue %s" % preferred_queue) + logger.warn("Detail: {}".format(tb)) continue return None @@ -113,8 +116,9 @@ class CallbackBrokerWorker(ConsumerMixin): logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: import traceback - traceback.print_exc() + tb = traceback.format_exc() logger.error('Callback Task Processor Raised Exception: %r', exc) + logger.error('Detail: {}'.format(tb)) class Command(NoArgsCommand): diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b141843b96..7ba44c6a6c 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1179,7 +1179,10 @@ class JobEvent(CreatedModifiedModel): # Try to find a parent event based on UUID. if parent_event_uuid: cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid) - parent_id = cache.get(cache_key) + try: + parent_id = cache.get(cache_key) + except Exception: + parent_id = None if parent_id is None: parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first() if parent_id: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 86b36c2e16..398ba39be0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,7 @@ import pexpect # Celery from celery import Task, task -from celery.signals import celeryd_init, worker_process_init +from celery.signals import celeryd_init, worker_ready from celery import current_app # Django @@ -100,8 +100,9 @@ def _setup_tower_logger(): logger = logging.getLogger('awx.main.tasks') -@worker_process_init.connect +@worker_ready.connect def task_set_logger_pre_run(*args, **kwargs): + cache.close() if settings.LOG_AGGREGATOR_ENABLED: _setup_tower_logger() logger.debug('Custom Tower logger configured for worker process.')