add a per-request GUID and log as it travels through background services

see: https://github.com/ansible/awx/issues/9329
This commit is contained in:
Ryan Petrello
2021-02-16 12:33:54 -05:00
parent b4956de6e4
commit 3cc3cf1f80
13 changed files with 93 additions and 13 deletions

View File

@@ -6,6 +6,7 @@ from multiprocessing import Process
from django.conf import settings
from django.db import connections
from schedule import Scheduler
from django_guid.middleware import GuidMiddleware
from awx.main.dispatch.worker import TaskWorker
@@ -35,6 +36,7 @@ class Scheduler(Scheduler):
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
GuidMiddleware.set_guid(GuidMiddleware._generate_guid())
self.run_pending()
except Exception:
logger.exception(

View File

@@ -16,6 +16,7 @@ from queue import Full as QueueFull, Empty as QueueEmpty
from django.conf import settings
from django.db import connection as django_connection, connections
from django.core.cache import cache as django_cache
from django_guid.middleware import GuidMiddleware
from jinja2 import Template
import psutil
@@ -445,6 +446,8 @@ class AutoscalePool(WorkerPool):
return super(AutoscalePool, self).up()
def write(self, preferred_queue, body):
if 'guid' in body:
GuidMiddleware.set_guid(body['guid'])
try:
# when the cluster heartbeat occurs, clean up internally
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:

View File

@@ -5,6 +5,7 @@ import json
from uuid import uuid4
from django.conf import settings
from django_guid.middleware import GuidMiddleware
from . import pg_bus_conn
@@ -83,6 +84,9 @@ class task:
'kwargs': kwargs,
'task': cls.name
}
guid = GuidMiddleware.get_guid()
if guid:
obj['guid'] = guid
obj.update(**kw)
if callable(queue):
queue = queue()

View File

@@ -9,6 +9,7 @@ from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django_guid.middleware import GuidMiddleware
import psutil
@@ -152,6 +153,8 @@ class CallbackBrokerWorker(BaseWorker):
if body.get('event') == 'EOF':
try:
if 'guid' in body:
GuidMiddleware.set_guid(body['guid'])
final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
# EOF events are sent when stdout for the running task is
@@ -176,6 +179,8 @@ class CallbackBrokerWorker(BaseWorker):
handle_success_and_failure_notifications.apply_async([uj.id])
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
finally:
GuidMiddleware.set_guid('')
return
event = cls.create_from_data(**body)

View File

@@ -6,6 +6,8 @@ import traceback
from kubernetes.config import kube_config
from django_guid.middleware import GuidMiddleware
from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown
from .base import BaseWorker
@@ -52,6 +54,8 @@ class TaskWorker(BaseWorker):
uuid = body.get('uuid', '<unknown>')
args = body.get('args', [])
kwargs = body.get('kwargs', {})
if 'guid' in body:
GuidMiddleware.set_guid(body.pop('guid'))
_call = TaskWorker.resolve_callable(task)
if inspect.isclass(_call):
# the callable is a class, e.g., RunJob; instantiate and