mirror of
https://github.com/ansible/awx.git
synced 2026-03-19 01:47:31 -02:30
Fixes for data corruption/exception in cache usage
Specifically as it relates to serializers and job event writing at high speeds
This commit is contained in:
@@ -162,7 +162,10 @@ class SettingsWrapper(UserSettingsHolder):
|
|||||||
def _get_local(self, name):
|
def _get_local(self, name):
|
||||||
self._preload_cache()
|
self._preload_cache()
|
||||||
cache_key = Setting.get_cache_key(name)
|
cache_key = Setting.get_cache_key(name)
|
||||||
cache_value = cache.get(cache_key, empty)
|
try:
|
||||||
|
cache_value = cache.get(cache_key, empty)
|
||||||
|
except ValueError:
|
||||||
|
cache_value = empty
|
||||||
logger.debug('cache get(%r, %r) -> %r', cache_key, empty, cache_value)
|
logger.debug('cache get(%r, %r) -> %r', cache_key, empty, cache_value)
|
||||||
if cache_value == SETTING_CACHE_NOTSET:
|
if cache_value == SETTING_CACHE_NOTSET:
|
||||||
value = empty
|
value = empty
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from django.conf import settings
|
|||||||
from django.core.management.base import NoArgsCommand
|
from django.core.management.base import NoArgsCommand
|
||||||
from django.db import connection as django_connection
|
from django.db import connection as django_connection
|
||||||
from django.db import DatabaseError
|
from django.db import DatabaseError
|
||||||
|
from django.core.cache import cache as django_cache
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import * # noqa
|
from awx.main.models import * # noqa
|
||||||
@@ -46,6 +47,7 @@ class CallbackBrokerWorker(ConsumerMixin):
|
|||||||
|
|
||||||
if use_workers:
|
if use_workers:
|
||||||
django_connection.close()
|
django_connection.close()
|
||||||
|
django_cache.close()
|
||||||
for idx in range(settings.JOB_EVENT_WORKERS):
|
for idx in range(settings.JOB_EVENT_WORKERS):
|
||||||
queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
|
queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
|
||||||
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
|
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
|
||||||
@@ -85,8 +87,9 @@ class CallbackBrokerWorker(ConsumerMixin):
|
|||||||
return queue_actual
|
return queue_actual
|
||||||
except Exception:
|
except Exception:
|
||||||
import traceback
|
import traceback
|
||||||
traceback.print_exc()
|
tb = traceback.format_exc()
|
||||||
logger.warn("Could not write to queue %s" % preferred_queue)
|
logger.warn("Could not write to queue %s" % preferred_queue)
|
||||||
|
logger.warn("Detail: {}".format(tb))
|
||||||
continue
|
continue
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -113,8 +116,9 @@ class CallbackBrokerWorker(ConsumerMixin):
|
|||||||
logger.error('Database Error Saving Job Event: {}'.format(e))
|
logger.error('Database Error Saving Job Event: {}'.format(e))
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
import traceback
|
import traceback
|
||||||
traceback.print_exc()
|
tb = traceback.format_exc()
|
||||||
logger.error('Callback Task Processor Raised Exception: %r', exc)
|
logger.error('Callback Task Processor Raised Exception: %r', exc)
|
||||||
|
logger.error('Detail: {}'.format(tb))
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
|
|||||||
@@ -1179,7 +1179,10 @@ class JobEvent(CreatedModifiedModel):
|
|||||||
# Try to find a parent event based on UUID.
|
# Try to find a parent event based on UUID.
|
||||||
if parent_event_uuid:
|
if parent_event_uuid:
|
||||||
cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid)
|
cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid)
|
||||||
parent_id = cache.get(cache_key)
|
try:
|
||||||
|
parent_id = cache.get(cache_key)
|
||||||
|
except Exception:
|
||||||
|
parent_id = None
|
||||||
if parent_id is None:
|
if parent_id is None:
|
||||||
parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first()
|
parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first()
|
||||||
if parent_id:
|
if parent_id:
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ import pexpect
|
|||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery import Task, task
|
from celery import Task, task
|
||||||
from celery.signals import celeryd_init, worker_process_init
|
from celery.signals import celeryd_init, worker_ready
|
||||||
from celery import current_app
|
from celery import current_app
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@@ -100,8 +100,9 @@ def _setup_tower_logger():
|
|||||||
logger = logging.getLogger('awx.main.tasks')
|
logger = logging.getLogger('awx.main.tasks')
|
||||||
|
|
||||||
|
|
||||||
@worker_process_init.connect
|
@worker_ready.connect
|
||||||
def task_set_logger_pre_run(*args, **kwargs):
|
def task_set_logger_pre_run(*args, **kwargs):
|
||||||
|
cache.close()
|
||||||
if settings.LOG_AGGREGATOR_ENABLED:
|
if settings.LOG_AGGREGATOR_ENABLED:
|
||||||
_setup_tower_logger()
|
_setup_tower_logger()
|
||||||
logger.debug('Custom Tower logger configured for worker process.')
|
logger.debug('Custom Tower logger configured for worker process.')
|
||||||
|
|||||||
Reference in New Issue
Block a user