Merge pull request #9332 from ryanpetrello/guid-trace

add a per-request GUID and log as it travels through background services

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2021-02-17 20:07:44 +00:00
committed by GitHub
13 changed files with 93 additions and 13 deletions

View File

@@ -6,6 +6,7 @@ from multiprocessing import Process
from django.conf import settings
from django.db import connections
from schedule import Scheduler
from django_guid.middleware import GuidMiddleware
from awx.main.dispatch.worker import TaskWorker
@@ -35,6 +36,7 @@ class Scheduler(Scheduler):
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
GuidMiddleware.set_guid(GuidMiddleware._generate_guid())
self.run_pending()
except Exception:
logger.exception(

View File

@@ -16,6 +16,7 @@ from queue import Full as QueueFull, Empty as QueueEmpty
from django.conf import settings
from django.db import connection as django_connection, connections
from django.core.cache import cache as django_cache
from django_guid.middleware import GuidMiddleware
from jinja2 import Template
import psutil
@@ -445,6 +446,8 @@ class AutoscalePool(WorkerPool):
return super(AutoscalePool, self).up()
def write(self, preferred_queue, body):
if 'guid' in body:
GuidMiddleware.set_guid(body['guid'])
try:
# when the cluster heartbeat occurs, clean up internally
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:

View File

@@ -5,6 +5,7 @@ import json
from uuid import uuid4
from django.conf import settings
from django_guid.middleware import GuidMiddleware
from . import pg_bus_conn
@@ -83,6 +84,9 @@ class task:
'kwargs': kwargs,
'task': cls.name
}
guid = GuidMiddleware.get_guid()
if guid:
obj['guid'] = guid
obj.update(**kw)
if callable(queue):
queue = queue()

View File

@@ -9,6 +9,7 @@ from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django_guid.middleware import GuidMiddleware
import psutil
@@ -152,6 +153,8 @@ class CallbackBrokerWorker(BaseWorker):
if body.get('event') == 'EOF':
try:
if 'guid' in body:
GuidMiddleware.set_guid(body['guid'])
final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
# EOF events are sent when stdout for the running task is
@@ -176,6 +179,8 @@ class CallbackBrokerWorker(BaseWorker):
handle_success_and_failure_notifications.apply_async([uj.id])
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
finally:
GuidMiddleware.set_guid('')
return
event = cls.create_from_data(**body)

View File

@@ -6,6 +6,8 @@ import traceback
from kubernetes.config import kube_config
from django_guid.middleware import GuidMiddleware
from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown
from .base import BaseWorker
@@ -52,6 +54,8 @@ class TaskWorker(BaseWorker):
uuid = body.get('uuid', '<unknown>')
args = body.get('args', [])
kwargs = body.get('kwargs', {})
if 'guid' in body:
GuidMiddleware.set_guid(body.pop('guid'))
_call = TaskWorker.resolve_callable(task)
if inspect.isclass(_call):
# the callable is a class, e.g., RunJob; instantiate and

View File

@@ -34,6 +34,7 @@ from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _, gettext_noop
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django_guid.middleware import GuidMiddleware
# Kubernetes
from kubernetes.client.rest import ApiException
@@ -839,6 +840,7 @@ class BaseTask(object):
self.cleanup_paths = []
self.parent_workflow_job_id = None
self.host_map = {}
self.guid = GuidMiddleware.get_guid()
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
@@ -1274,6 +1276,9 @@ class BaseTask(object):
except json.JSONDecodeError:
pass
if 'event_data' in event_data:
event_data['event_data']['guid'] = self.guid
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
self.event_ct += 1
@@ -1310,6 +1315,7 @@ class BaseTask(object):
event_data = {
'event': 'EOF',
'final_counter': self.event_ct,
'guid': self.guid,
}
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)

View File

@@ -527,7 +527,7 @@ class TestGenericRun():
task.instance = Job(pk=1, id=1)
task.event_ct = 17
task.finished_callback(None)
task.dispatcher.dispatch.assert_called_with({'event': 'EOF', 'final_counter': 17, 'job_id': 1})
task.dispatcher.dispatch.assert_called_with({'event': 'EOF', 'final_counter': 17, 'job_id': 1, 'guid': None})
def test_save_job_metadata(self, job, update_model_wrapper):
class MockMe():

View File

@@ -15,6 +15,10 @@ from django.apps import apps
from django.db import models
from django.conf import settings
from django_guid.log_filters import CorrelationId
from django_guid.middleware import GuidMiddleware
from awx import MODE
from awx.main.constants import LOGGER_BLOCKLIST
from awx.main.utils.common import get_search_fields
@@ -364,3 +368,14 @@ class SmartFilter(object):
return res[0].result
raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string)
class DefaultCorrelationId(CorrelationId):
def filter(self, record):
guid = GuidMiddleware.get_guid() or '-'
if MODE == 'development':
guid = guid[:8]
record.guid = guid
return True

View File

@@ -154,6 +154,9 @@ class LogstashFormatter(LogstashFormatterBase):
if kind == 'job_events' and raw_data.get('python_objects', {}).get('job_event'):
job_event = raw_data['python_objects']['job_event']
guid = job_event.event_data.pop('guid', None)
if guid:
data_for_log['guid'] = guid
for field_object in job_event._meta.fields:
if not field_object.__class__ or not field_object.__class__.__name__:

View File

@@ -285,6 +285,7 @@ INSTALLED_APPS = [
'polymorphic',
'taggit',
'social_django',
'django_guid',
'corsheaders',
'awx.conf',
'awx.main',
@@ -828,11 +829,14 @@ LOGGING = {
},
'dynamic_level_filter': {
'()': 'awx.main.utils.filters.DynamicLevelFilter'
}
},
'guid': {
'()': 'awx.main.utils.filters.DefaultCorrelationId'
},
},
'formatters': {
'simple': {
'format': '%(asctime)s %(levelname)-8s %(name)s %(message)s',
'format': '%(asctime)s %(levelname)-8s [%(guid)s] %(name)s %(message)s',
},
'json': {
'()': 'awx.main.utils.formatters.LogstashFormatter'
@@ -842,7 +846,7 @@ LOGGING = {
'format': '%(relativeSeconds)9.3f %(levelname)-8s %(message)s'
},
'dispatcher': {
'format': '%(asctime)s %(levelname)-8s %(name)s PID:%(process)d %(message)s',
'format': '%(asctime)s %(levelname)-8s [%(guid)s] %(name)s PID:%(process)d %(message)s',
},
'job_lifecycle': {
'()': 'awx.main.utils.formatters.JobLifeCycleFormatter',
@@ -852,7 +856,7 @@ LOGGING = {
'console': {
'()': 'logging.StreamHandler',
'level': 'DEBUG',
'filters': ['require_debug_true_or_test'],
'filters': ['require_debug_true_or_test', 'guid'],
'formatter': 'simple',
},
'null': {
@@ -872,33 +876,33 @@ LOGGING = {
'class': 'awx.main.utils.handlers.RSysLogHandler',
'formatter': 'json',
'address': '/var/run/awx-rsyslog/rsyslog.sock',
'filters': ['external_log_enabled', 'dynamic_level_filter'],
'filters': ['external_log_enabled', 'dynamic_level_filter', 'guid'],
},
'tower_warnings': {
# don't define a level here, it's set by settings.LOG_AGGREGATOR_LEVEL
'class': 'logging.handlers.WatchedFileHandler',
'filters': ['require_debug_false', 'dynamic_level_filter'],
'filters': ['require_debug_false', 'dynamic_level_filter', 'guid'],
'filename': os.path.join(LOG_ROOT, 'tower.log'),
'formatter':'simple',
},
'callback_receiver': {
# don't define a level here, it's set by settings.LOG_AGGREGATOR_LEVEL
'class': 'logging.handlers.WatchedFileHandler',
'filters': ['require_debug_false', 'dynamic_level_filter'],
'filters': ['require_debug_false', 'dynamic_level_filter', 'guid'],
'filename': os.path.join(LOG_ROOT, 'callback_receiver.log'),
'formatter':'simple',
},
'dispatcher': {
# don't define a level here, it's set by settings.LOG_AGGREGATOR_LEVEL
'class': 'logging.handlers.WatchedFileHandler',
'filters': ['require_debug_false', 'dynamic_level_filter'],
'filters': ['require_debug_false', 'dynamic_level_filter', 'guid'],
'filename': os.path.join(LOG_ROOT, 'dispatcher.log'),
'formatter':'dispatcher',
},
'wsbroadcast': {
# don't define a level here, it's set by settings.LOG_AGGREGATOR_LEVEL
'class': 'logging.handlers.WatchedFileHandler',
'filters': ['require_debug_false', 'dynamic_level_filter'],
'filters': ['require_debug_false', 'dynamic_level_filter', 'guid'],
'filename': os.path.join(LOG_ROOT, 'wsbroadcast.log'),
'formatter':'simple',
},
@@ -914,7 +918,7 @@ LOGGING = {
'task_system': {
# don't define a level here, it's set by settings.LOG_AGGREGATOR_LEVEL
'class': 'logging.handlers.WatchedFileHandler',
'filters': ['require_debug_false', 'dynamic_level_filter'],
'filters': ['require_debug_false', 'dynamic_level_filter', 'guid'],
'filename': os.path.join(LOG_ROOT, 'task_system.log'),
'formatter':'simple',
},
@@ -1094,6 +1098,7 @@ AWX_CALLBACK_PROFILE = False
AWX_CLEANUP_PATHS = True
MIDDLEWARE = [
'django_guid.middleware.GuidMiddleware',
'awx.main.middleware.TimingMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'awx.main.middleware.MigrationRanCheckMiddleware',
@@ -1134,3 +1139,7 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
# How often websocket process will generate stats
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
DJANGO_GUID = {
'GUID_HEADER_NAME': 'X-API-Request-Id',
}