Merge pull request #3235 from ryanpetrello/sql-profiling

add a custom DB backend that provides system-level SQL profiling

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot]
2019-02-15 21:56:28 +00:00
committed by GitHub
15 changed files with 391 additions and 10 deletions

0
awx/main/db/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,155 @@
import os
import pkg_resources
import sqlite3
import sys
import traceback
import uuid
from django.core.cache import cache
from django.core.cache.backends.locmem import LocMemCache
from django.db.backends.postgresql.base import DatabaseWrapper as BaseDatabaseWrapper
from awx.main.utils import memoize
__loc__ = LocMemCache(str(uuid.uuid4()), {})
__all__ = ['DatabaseWrapper']
class RecordedQueryLog(object):
def __init__(self, log, db, dest='/var/log/tower/profile'):
self.log = log
self.db = db
self.dest = dest
try:
self.threshold = cache.get('awx-profile-sql-threshold')
except Exception:
# if we can't reach memcached, just assume profiling's off
self.threshold = None
def append(self, query):
ret = self.log.append(query)
try:
self.write(query)
except Exception:
# not sure what else to do her e- we can't really safely
# *use* our loggers because it'll just generate more DB queries
# and potentially recurse into this state again
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
return ret
def write(self, query):
if self.threshold is None:
return
seconds = float(query['time'])
# if the query is slow enough...
if seconds >= self.threshold:
sql = query['sql']
if sql.startswith('EXPLAIN'):
return
# build a printable Python stack
bt = ' '.join(traceback.format_stack())
# and re-run the same query w/ EXPLAIN
explain = ''
cursor = self.db.cursor()
cursor.execute('EXPLAIN VERBOSE {}'.format(sql))
for line in cursor.fetchall():
explain += line[0] + '\n'
# write a row of data into a per-PID sqlite database
if not os.path.isdir(self.dest):
os.makedirs(self.dest)
progname = ' '.join(sys.argv)
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'runworker'):
if match in progname:
progname = match
break
else:
progname = os.path.basename(sys.argv[0])
filepath = os.path.join(
self.dest,
'{}.sqlite'.format(progname)
)
version = pkg_resources.get_distribution('awx').version
log = sqlite3.connect(filepath, timeout=3)
log.execute(
'CREATE TABLE IF NOT EXISTS queries ('
' id INTEGER PRIMARY KEY,'
' version TEXT,'
' pid INTEGER,'
' stamp DATETIME DEFAULT CURRENT_TIMESTAMP,'
' argv REAL,'
' time REAL,'
' sql TEXT,'
' explain TEXT,'
' bt TEXT'
');'
)
log.commit()
log.execute(
'INSERT INTO queries (pid, version, argv, time, sql, explain, bt) '
'VALUES (?, ?, ?, ?, ?, ?, ?);',
(os.getpid(), version, ' ' .join(sys.argv), seconds, sql, explain, bt)
)
log.commit()
def __len__(self):
return len(self.log)
def __iter__(self):
return iter(self.log)
def __getattr__(self, attr):
return getattr(self.log, attr)
class DatabaseWrapper(BaseDatabaseWrapper):
"""
This is a special subclass of Django's postgres DB backend which - based on
the value of a special flag in memcached - captures slow queries and
writes profile and Python stack metadata to the disk.
"""
def __init__(self, *args, **kwargs):
super(DatabaseWrapper, self).__init__(*args, **kwargs)
# Django's default base wrapper implementation has `queries_log`
# which is a `collections.deque` that every query is appended to
#
# this line wraps the deque with a proxy that can capture each query
# and - if it's slow enough - record profiling metadata to the file
# system for debugging purposes
self.queries_log = RecordedQueryLog(self.queries_log, self)
@property
@memoize(ttl=1, cache=__loc__)
def force_debug_cursor(self):
# in Django's base DB implementation, `self.force_debug_cursor` is just
# a simple boolean, and this value is used to signal to Django that it
# should record queries into `self.queries_log` as they're executed (this
# is the same mechanism used by libraries like the django-debug-toolbar)
#
# in _this_ implementation, we represent it as a property which will
# check memcache for a special flag to be set (when the flag is set, it
# means we should start recording queries because somebody called
# `awx-manage profile_sql`)
#
# it's worth noting that this property is wrapped w/ @memoize because
# Django references this attribute _constantly_ (in particular, once
# per executed query); doing a memcached.get() _at most_ once per
# second is a good enough window to detect when profiling is turned
# on/off by a system administrator
try:
threshold = cache.get('awx-profile-sql-threshold')
except Exception:
# if we can't reach memcached, just assume profiling's off
threshold = None
self.queries_log.threshold = threshold
return threshold is not None
@force_debug_cursor.setter
def force_debug_cursor(self, v):
return

View File

@@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand
from awx.main.tasks import profile_sql
class Command(BaseCommand):
"""
Enable or disable SQL Profiling across all Python processes.
SQL profile data will be recorded at /var/log/tower/profile
"""
def add_arguments(self, parser):
parser.add_argument('--threshold', dest='threshold', type=float, default=2.0,
help='The minimum query duration in seconds (default=2). Use 0 to disable.')
parser.add_argument('--minutes', dest='minutes', type=float, default=5,
help='How long to record for in minutes (default=5)')
def handle(self, **options):
profile_sql.delay(
threshold=options['threshold'], minutes=options['minutes']
)

View File

@@ -34,7 +34,7 @@ perf_logger = logging.getLogger('awx.analytics.performance')
class TimingMiddleware(threading.local):
dest = '/var/lib/awx/profile'
dest = '/var/log/tower/profile'
def process_request(self, request):
self.start_time = time.time()
@@ -57,7 +57,7 @@ class TimingMiddleware(threading.local):
def save_profile_file(self, request):
if not os.path.isdir(self.dest):
os.makedirs(self.dest)
filename = '%.3fs-%s' % (pstats.Stats(self.prof).total_tt, uuid.uuid4())
filename = '%.3fs-%s.pstats' % (pstats.Stats(self.prof).total_tt, uuid.uuid4())
filepath = os.path.join(self.dest, filename)
with open(filepath, 'w') as f:
f.write('%s %s\n' % (request.method, request.get_full_path()))

View File

@@ -277,6 +277,20 @@ def delete_project_files(project_path):
logger.exception('Could not remove lock file {}'.format(lock_file))
@task(queue='tower_broadcast_all', exchange_type='fanout')
def profile_sql(threshold=1, minutes=1):
if threshold == 0:
cache.delete('awx-profile-sql-threshold')
logger.error('SQL PROFILING DISABLED')
else:
cache.set(
'awx-profile-sql-threshold',
threshold,
timeout=minutes * 60
)
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
@task()
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):

View File

@@ -0,0 +1,154 @@
import collections
import os
import sqlite3
import sys
import unittest
import pytest
import awx
from awx.main.db.profiled_pg.base import RecordedQueryLog
QUERY = {
'sql': 'SELECT * FROM main_job',
'time': '.01'
}
EXPLAIN = 'Seq Scan on public.main_job (cost=0.00..1.18 rows=18 width=86)'
class FakeDatabase():
def __init__(self):
self._cursor = unittest.mock.Mock(spec_sec=['execute', 'fetchall'])
self._cursor.fetchall.return_value = [(EXPLAIN,)]
def cursor(self):
return self._cursor
@property
def execute_calls(self):
return self._cursor.execute.call_args_list
# all of these should still be valid operations we should proxy through
# to the underlying deque object
def test_deque_appendleft():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.appendleft(QUERY)
assert len(log) == 1
def test_deque_count():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
assert log.count('x') == 0
assert log.count(QUERY) == 1
def test_deque_clear():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.clear()
assert len(log) == 0
@pytest.mark.parametrize('method', ['extend', 'extendleft'])
def test_deque_extend(method):
log = RecordedQueryLog(collections.deque(), FakeDatabase)
getattr(log, method)([QUERY])
assert len(log) == 1
@pytest.mark.parametrize('method', ['pop', 'popleft'])
def test_deque_pop(method):
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
getattr(log, method)()
assert len(log) == 0
def test_deque_remove():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.remove(QUERY)
assert len(log) == 0
def test_deque_reverse():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.reverse()
assert len(log) == 1
def test_sql_not_recorded_by_default():
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db)
assert log.maxlen == 100
assert log.threshold is None
log.append(QUERY)
assert len(log) == 1
assert [x for x in log] == [QUERY]
assert db.execute_calls == []
def test_sql_below_threshold():
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db)
log.threshold = 1
log.append(QUERY)
assert len(log) == 1
assert [x for x in log] == [QUERY]
assert db.execute_calls == []
def test_sqlite_failure(tmpdir):
tmpdir = str(tmpdir)
db = FakeDatabase()
db._cursor.execute.side_effect = OSError
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
log.threshold = 0.00000001
log.append(QUERY)
assert len(log) == 1
assert os.listdir(tmpdir) == []
def test_sql_above_threshold(tmpdir):
tmpdir = str(tmpdir)
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
log.threshold = 0.00000001
for _ in range(5):
log.append(QUERY)
assert len(log) == 5
assert len(db.execute_calls) == 5
for _call in db.execute_calls:
args, kw = _call
assert args == ('EXPLAIN VERBOSE {}'.format(QUERY['sql']),)
path = os.path.join(
tmpdir,
'{}.sqlite'.format(os.path.basename(sys.argv[0]))
)
assert os.path.exists(path)
# verify the results
def dict_factory(cursor, row):
d = {}
for idx,col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
cursor = sqlite3.connect(path)
cursor.row_factory = dict_factory
queries_logged = cursor.execute('SELECT * FROM queries').fetchall()
assert len(queries_logged) == 5
for q in queries_logged:
assert q['pid'] == os.getpid()
assert q['version'] == awx.__version__
assert q['time'] == 0.01
assert q['sql'] == QUERY['sql']
assert EXPLAIN in q['explain']
assert 'test_sql_above_threshold' in q['bt']

View File

@@ -113,13 +113,13 @@ def get_memoize_cache():
return cache
def memoize(ttl=60, cache_key=None, track_function=False):
def memoize(ttl=60, cache_key=None, track_function=False, cache=None):
'''
Decorator to wrap a function and cache its result.
'''
if cache_key and track_function:
raise IllegalArgumentError("Can not specify cache_key when track_function is True")
cache = get_memoize_cache()
cache = cache or get_memoize_cache()
def memoize_decorator(f):
@wraps(f)