mirror of
https://github.com/ansible/awx.git
synced 2026-01-13 19:10:07 -03:30
add a custom DB backend that provides system-level SQL profiling
run this command on _any_ node in an awx cluster: $ awx-manage profile_sql --threshold=2.0 --minutes=1 ...and for 1 minute, the timing for _every_ SQL query in _every_ awx Python process that uses the Django ORM will be measured queries that run longer than (in this example) 2 seconds will be written to a per-process sqlite database in /var/lib/awx/profile, and the file will contain an EXPLAIN VERBOSE for the query and the full Python stack that led to that SQL query's execution (this includes not just WSGI requests, but background processes like the runworker and dispatcher) $ awx-manage profile_sql --threshold=0 ...can be used to disable profiling again (if you don't want to wait for the minute to expire)
This commit is contained in:
parent
808ed74700
commit
eed94b641e
0
awx/main/db/__init__.py
Normal file
0
awx/main/db/__init__.py
Normal file
0
awx/main/db/profiled_pg/__init__.py
Normal file
0
awx/main/db/profiled_pg/__init__.py
Normal file
155
awx/main/db/profiled_pg/base.py
Normal file
155
awx/main/db/profiled_pg/base.py
Normal file
@ -0,0 +1,155 @@
|
||||
import os
|
||||
import pkg_resources
|
||||
import sqlite3
|
||||
import sys
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
from django.core.cache import cache
|
||||
from django.core.cache.backends.locmem import LocMemCache
|
||||
from django.db.backends.postgresql.base import DatabaseWrapper as BaseDatabaseWrapper
|
||||
|
||||
from awx.main.utils import memoize
|
||||
|
||||
__loc__ = LocMemCache(str(uuid.uuid4()), {})
|
||||
__all__ = ['DatabaseWrapper']
|
||||
|
||||
|
||||
class RecordedQueryLog(object):
|
||||
|
||||
def __init__(self, log, db, dest='/var/lib/awx/profile'):
|
||||
self.log = log
|
||||
self.db = db
|
||||
self.dest = dest
|
||||
try:
|
||||
self.threshold = cache.get('awx-profile-sql-threshold')
|
||||
except Exception:
|
||||
# if we can't reach memcached, just assume profiling's off
|
||||
self.threshold = None
|
||||
|
||||
def append(self, query):
|
||||
ret = self.log.append(query)
|
||||
try:
|
||||
self.write(query)
|
||||
except Exception:
|
||||
# not sure what else to do her e- we can't really safely
|
||||
# *use* our loggers because it'll just generate more DB queries
|
||||
# and potentially recurse into this state again
|
||||
_, _, tb = sys.exc_info()
|
||||
traceback.print_tb(tb)
|
||||
return ret
|
||||
|
||||
def write(self, query):
|
||||
if self.threshold is None:
|
||||
return
|
||||
seconds = float(query['time'])
|
||||
|
||||
# if the query is slow enough...
|
||||
if seconds >= self.threshold:
|
||||
sql = query['sql']
|
||||
if sql.startswith('EXPLAIN'):
|
||||
return
|
||||
|
||||
# build a printable Python stack
|
||||
bt = ' '.join(traceback.format_stack())
|
||||
|
||||
# and re-run the same query w/ EXPLAIN
|
||||
explain = ''
|
||||
cursor = self.db.cursor()
|
||||
cursor.execute('EXPLAIN VERBOSE {}'.format(sql))
|
||||
for line in cursor.fetchall():
|
||||
explain += line[0] + '\n'
|
||||
|
||||
# write a row of data into a per-PID sqlite database
|
||||
if not os.path.isdir(self.dest):
|
||||
os.makedirs(self.dest)
|
||||
progname = ' '.join(sys.argv)
|
||||
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'runworker'):
|
||||
if match in progname:
|
||||
progname = match
|
||||
break
|
||||
else:
|
||||
progname = os.path.basename(sys.argv[0])
|
||||
filepath = os.path.join(
|
||||
self.dest,
|
||||
'{}.sqlite'.format(progname)
|
||||
)
|
||||
version = pkg_resources.get_distribution('awx').version
|
||||
log = sqlite3.connect(filepath, timeout=3)
|
||||
log.execute(
|
||||
'CREATE TABLE IF NOT EXISTS queries ('
|
||||
' id INTEGER PRIMARY KEY,'
|
||||
' version TEXT,'
|
||||
' pid INTEGER,'
|
||||
' stamp DATETIME DEFAULT CURRENT_TIMESTAMP,'
|
||||
' argv REAL,'
|
||||
' time REAL,'
|
||||
' sql TEXT,'
|
||||
' explain TEXT,'
|
||||
' bt TEXT'
|
||||
');'
|
||||
)
|
||||
log.commit()
|
||||
log.execute(
|
||||
'INSERT INTO queries (pid, version, argv, time, sql, explain, bt) '
|
||||
'VALUES (?, ?, ?, ?, ?, ?, ?);',
|
||||
(os.getpid(), version, ' ' .join(sys.argv), seconds, sql, explain, bt)
|
||||
)
|
||||
log.commit()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.log)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.log)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return getattr(self.log, attr)
|
||||
|
||||
|
||||
class DatabaseWrapper(BaseDatabaseWrapper):
|
||||
"""
|
||||
This is a special subclass of Django's postgres DB backend which - based on
|
||||
the value of a special flag in memcached - captures slow queries and
|
||||
writes profile and Python stack metadata to the disk.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(DatabaseWrapper, self).__init__(*args, **kwargs)
|
||||
# Django's default base wrapper implementation has `queries_log`
|
||||
# which is a `collections.deque` that every query is appended to
|
||||
#
|
||||
# this line wraps the deque with a proxy that can capture each query
|
||||
# and - if it's slow enough - record profiling metadata to the file
|
||||
# system for debugging purposes
|
||||
self.queries_log = RecordedQueryLog(self.queries_log, self)
|
||||
|
||||
@property
|
||||
@memoize(ttl=1, cache=__loc__)
|
||||
def force_debug_cursor(self):
|
||||
# in Django's base DB implementation, `self.force_debug_cursor` is just
|
||||
# a simple boolean, and this value is used to signal to Django that it
|
||||
# should record queries into `self.queries_log` as they're executed (this
|
||||
# is the same mechanism used by libraries like the django-debug-toolbar)
|
||||
#
|
||||
# in _this_ implementation, we represent it as a property which will
|
||||
# check memcache for a special flag to be set (when the flag is set, it
|
||||
# means we should start recording queries because somebody called
|
||||
# `awx-manage profile_sql`)
|
||||
#
|
||||
# it's worth noting that this property is wrapped w/ @memoize because
|
||||
# Django references this attribute _constantly_ (in particular, once
|
||||
# per executed query); doing a memcached.get() _at most_ once per
|
||||
# second is a good enough window to detect when profiling is turned
|
||||
# on/off by a system administrator
|
||||
try:
|
||||
threshold = cache.get('awx-profile-sql-threshold')
|
||||
except Exception:
|
||||
# if we can't reach memcached, just assume profiling's off
|
||||
threshold = None
|
||||
self.queries_log.threshold = threshold
|
||||
return threshold is not None
|
||||
|
||||
@force_debug_cursor.setter
|
||||
def force_debug_cursor(self, v):
|
||||
return
|
||||
21
awx/main/management/commands/profile_sql.py
Normal file
21
awx/main/management/commands/profile_sql.py
Normal file
@ -0,0 +1,21 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from awx.main.tasks import profile_sql
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Enable or disable SQL Profiling across all Python processes.
|
||||
SQL profile data will be recorded at /var/lib/awx/profile/
|
||||
"""
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--threshold', dest='threshold', type=float, default=2.0,
|
||||
help='The minimum query duration in seconds (default=2). Use 0 to disable.')
|
||||
parser.add_argument('--minutes', dest='minutes', type=float, default=5,
|
||||
help='How long to record for in minutes (default=5)')
|
||||
|
||||
def handle(self, **options):
|
||||
profile_sql.delay(
|
||||
threshold=options['threshold'], minutes=options['minutes']
|
||||
)
|
||||
@ -57,7 +57,7 @@ class TimingMiddleware(threading.local):
|
||||
def save_profile_file(self, request):
|
||||
if not os.path.isdir(self.dest):
|
||||
os.makedirs(self.dest)
|
||||
filename = '%.3fs-%s' % (pstats.Stats(self.prof).total_tt, uuid.uuid4())
|
||||
filename = '%.3fs-%s.pstats' % (pstats.Stats(self.prof).total_tt, uuid.uuid4())
|
||||
filepath = os.path.join(self.dest, filename)
|
||||
with open(filepath, 'w') as f:
|
||||
f.write('%s %s\n' % (request.method, request.get_full_path()))
|
||||
|
||||
@ -277,6 +277,20 @@ def delete_project_files(project_path):
|
||||
logger.exception('Could not remove lock file {}'.format(lock_file))
|
||||
|
||||
|
||||
@task(queue='tower_broadcast_all', exchange_type='fanout')
|
||||
def profile_sql(threshold=1, minutes=1):
|
||||
if threshold == 0:
|
||||
cache.delete('awx-profile-sql-threshold')
|
||||
logger.error('SQL PROFILING DISABLED')
|
||||
else:
|
||||
cache.set(
|
||||
'awx-profile-sql-threshold',
|
||||
threshold,
|
||||
timeout=minutes * 60
|
||||
)
|
||||
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
|
||||
|
||||
|
||||
@task()
|
||||
def send_notifications(notification_list, job_id=None):
|
||||
if not isinstance(notification_list, list):
|
||||
|
||||
154
awx/main/tests/unit/test_db.py
Normal file
154
awx/main/tests/unit/test_db.py
Normal file
@ -0,0 +1,154 @@
|
||||
import collections
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
import awx
|
||||
from awx.main.db.profiled_pg.base import RecordedQueryLog
|
||||
|
||||
|
||||
QUERY = {
|
||||
'sql': 'SELECT * FROM main_job',
|
||||
'time': '.01'
|
||||
}
|
||||
EXPLAIN = 'Seq Scan on public.main_job (cost=0.00..1.18 rows=18 width=86)'
|
||||
|
||||
|
||||
class FakeDatabase():
|
||||
|
||||
def __init__(self):
|
||||
self._cursor = unittest.mock.Mock(spec_sec=['execute', 'fetchall'])
|
||||
self._cursor.fetchall.return_value = [(EXPLAIN,)]
|
||||
|
||||
def cursor(self):
|
||||
return self._cursor
|
||||
|
||||
@property
|
||||
def execute_calls(self):
|
||||
return self._cursor.execute.call_args_list
|
||||
|
||||
|
||||
# all of these should still be valid operations we should proxy through
|
||||
# to the underlying deque object
|
||||
def test_deque_appendleft():
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.appendleft(QUERY)
|
||||
assert len(log) == 1
|
||||
|
||||
|
||||
def test_deque_count():
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.append(QUERY)
|
||||
assert log.count('x') == 0
|
||||
assert log.count(QUERY) == 1
|
||||
|
||||
|
||||
def test_deque_clear():
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.append(QUERY)
|
||||
log.clear()
|
||||
assert len(log) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize('method', ['extend', 'extendleft'])
|
||||
def test_deque_extend(method):
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
getattr(log, method)([QUERY])
|
||||
assert len(log) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize('method', ['pop', 'popleft'])
|
||||
def test_deque_pop(method):
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.append(QUERY)
|
||||
getattr(log, method)()
|
||||
assert len(log) == 0
|
||||
|
||||
|
||||
def test_deque_remove():
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.append(QUERY)
|
||||
log.remove(QUERY)
|
||||
assert len(log) == 0
|
||||
|
||||
|
||||
def test_deque_reverse():
|
||||
log = RecordedQueryLog(collections.deque(), FakeDatabase)
|
||||
log.append(QUERY)
|
||||
log.reverse()
|
||||
assert len(log) == 1
|
||||
|
||||
|
||||
def test_sql_not_recorded_by_default():
|
||||
db = FakeDatabase()
|
||||
log = RecordedQueryLog(collections.deque(maxlen=100), db)
|
||||
assert log.maxlen == 100
|
||||
assert log.threshold is None
|
||||
log.append(QUERY)
|
||||
assert len(log) == 1
|
||||
assert [x for x in log] == [QUERY]
|
||||
assert db.execute_calls == []
|
||||
|
||||
|
||||
def test_sql_below_threshold():
|
||||
db = FakeDatabase()
|
||||
log = RecordedQueryLog(collections.deque(maxlen=100), db)
|
||||
log.threshold = 1
|
||||
log.append(QUERY)
|
||||
assert len(log) == 1
|
||||
assert [x for x in log] == [QUERY]
|
||||
assert db.execute_calls == []
|
||||
|
||||
|
||||
def test_sqlite_failure(tmpdir):
|
||||
tmpdir = str(tmpdir)
|
||||
db = FakeDatabase()
|
||||
db._cursor.execute.side_effect = OSError
|
||||
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
|
||||
log.threshold = 0.00000001
|
||||
log.append(QUERY)
|
||||
assert len(log) == 1
|
||||
assert os.listdir(tmpdir) == []
|
||||
|
||||
|
||||
def test_sql_above_threshold(tmpdir):
|
||||
tmpdir = str(tmpdir)
|
||||
db = FakeDatabase()
|
||||
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
|
||||
log.threshold = 0.00000001
|
||||
|
||||
for _ in range(5):
|
||||
log.append(QUERY)
|
||||
|
||||
assert len(log) == 5
|
||||
assert len(db.execute_calls) == 5
|
||||
for _call in db.execute_calls:
|
||||
args, kw = _call
|
||||
assert args == ('EXPLAIN VERBOSE {}'.format(QUERY['sql']),)
|
||||
|
||||
path = os.path.join(
|
||||
tmpdir,
|
||||
'{}.sqlite'.format(os.path.basename(sys.argv[0]))
|
||||
)
|
||||
assert os.path.exists(path)
|
||||
|
||||
# verify the results
|
||||
def dict_factory(cursor, row):
|
||||
d = {}
|
||||
for idx,col in enumerate(cursor.description):
|
||||
d[col[0]] = row[idx]
|
||||
return d
|
||||
cursor = sqlite3.connect(path)
|
||||
cursor.row_factory = dict_factory
|
||||
queries_logged = cursor.execute('SELECT * FROM queries').fetchall()
|
||||
assert len(queries_logged) == 5
|
||||
for q in queries_logged:
|
||||
assert q['pid'] == os.getpid()
|
||||
assert q['version'] == awx.__version__
|
||||
assert q['time'] == 0.01
|
||||
assert q['sql'] == QUERY['sql']
|
||||
assert EXPLAIN in q['explain']
|
||||
assert 'test_sql_above_threshold' in q['bt']
|
||||
@ -128,13 +128,13 @@ def get_memoize_cache():
|
||||
return cache
|
||||
|
||||
|
||||
def memoize(ttl=60, cache_key=None, track_function=False):
|
||||
def memoize(ttl=60, cache_key=None, track_function=False, cache=None):
|
||||
'''
|
||||
Decorator to wrap a function and cache its result.
|
||||
'''
|
||||
if cache_key and track_function:
|
||||
raise IllegalArgumentError("Can not specify cache_key when track_function is True")
|
||||
cache = get_memoize_cache()
|
||||
cache = cache or get_memoize_cache()
|
||||
|
||||
def memoize_decorator(f):
|
||||
@wraps(f)
|
||||
|
||||
@ -24,7 +24,7 @@ MANAGERS = ADMINS
|
||||
# Database settings to use PostgreSQL for development.
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ENGINE': 'django.db.backends.postgresql',
|
||||
'ENGINE': 'awx.main.db.profiled_pg',
|
||||
'NAME': 'awx-dev',
|
||||
'USER': 'awx-dev',
|
||||
'PASSWORD': 'AWXsome1',
|
||||
|
||||
@ -75,7 +75,7 @@ LOGGING['handlers']['management_playbooks'] = {'class': 'logging.NullHandler'}
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ATOMIC_REQUESTS': True,
|
||||
'ENGINE': 'django.db.backends.postgresql',
|
||||
'ENGINE': 'awx.main.db.profiled_pg',
|
||||
'NAME': os.getenv("DATABASE_NAME", None),
|
||||
'USER': os.getenv("DATABASE_USER", None),
|
||||
'PASSWORD': os.getenv("DATABASE_PASSWORD", None),
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ATOMIC_REQUESTS': True,
|
||||
'ENGINE': 'django.db.backends.postgresql',
|
||||
'ENGINE': 'awx.main.db.profiled_pg',
|
||||
'NAME': "{{ pg_database }}",
|
||||
'USER': "{{ pg_username }}",
|
||||
'PASSWORD': "{{ pg_password }}",
|
||||
|
||||
@ -73,7 +73,7 @@ data:
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ATOMIC_REQUESTS': True,
|
||||
'ENGINE': 'django.db.backends.postgresql',
|
||||
'ENGINE': 'awx.main.db.profiled_pg',
|
||||
'NAME': "awx",
|
||||
'USER': "awx",
|
||||
'PASSWORD': "awx",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user