Files
awx/awx/main/tests/unit/test_db.py
Ryan Petrello eed94b641e add a custom DB backend that provides system-level SQL profiling
run this command on _any_ node in an awx cluster:

$ awx-manage profile_sql --threshold=2.0 --minutes=1

...and for 1 minute, the timing for _every_ SQL query in _every_ awx
Python process that uses the Django ORM will be measured

queries that run longer than (in this example) 2 seconds will be
written to a per-process sqlite database in /var/lib/awx/profile, and
the file will contain an EXPLAIN VERBOSE for the query and the full
Python stack that led to that SQL query's execution (this includes not
just WSGI requests, but background processes like the runworker and
dispatcher)

$ awx-manage profile_sql --threshold=0

...can be used to disable profiling again (if you don't want to wait for
the minute to expire)
2019-02-14 15:04:46 -05:00

155 lines
4.0 KiB
Python

import collections
import os
import sqlite3
import sys
import unittest
import pytest
import awx
from awx.main.db.profiled_pg.base import RecordedQueryLog
QUERY = {
'sql': 'SELECT * FROM main_job',
'time': '.01'
}
EXPLAIN = 'Seq Scan on public.main_job (cost=0.00..1.18 rows=18 width=86)'
class FakeDatabase():
def __init__(self):
self._cursor = unittest.mock.Mock(spec_sec=['execute', 'fetchall'])
self._cursor.fetchall.return_value = [(EXPLAIN,)]
def cursor(self):
return self._cursor
@property
def execute_calls(self):
return self._cursor.execute.call_args_list
# all of these should still be valid operations we should proxy through
# to the underlying deque object
def test_deque_appendleft():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.appendleft(QUERY)
assert len(log) == 1
def test_deque_count():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
assert log.count('x') == 0
assert log.count(QUERY) == 1
def test_deque_clear():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.clear()
assert len(log) == 0
@pytest.mark.parametrize('method', ['extend', 'extendleft'])
def test_deque_extend(method):
log = RecordedQueryLog(collections.deque(), FakeDatabase)
getattr(log, method)([QUERY])
assert len(log) == 1
@pytest.mark.parametrize('method', ['pop', 'popleft'])
def test_deque_pop(method):
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
getattr(log, method)()
assert len(log) == 0
def test_deque_remove():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.remove(QUERY)
assert len(log) == 0
def test_deque_reverse():
log = RecordedQueryLog(collections.deque(), FakeDatabase)
log.append(QUERY)
log.reverse()
assert len(log) == 1
def test_sql_not_recorded_by_default():
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db)
assert log.maxlen == 100
assert log.threshold is None
log.append(QUERY)
assert len(log) == 1
assert [x for x in log] == [QUERY]
assert db.execute_calls == []
def test_sql_below_threshold():
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db)
log.threshold = 1
log.append(QUERY)
assert len(log) == 1
assert [x for x in log] == [QUERY]
assert db.execute_calls == []
def test_sqlite_failure(tmpdir):
tmpdir = str(tmpdir)
db = FakeDatabase()
db._cursor.execute.side_effect = OSError
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
log.threshold = 0.00000001
log.append(QUERY)
assert len(log) == 1
assert os.listdir(tmpdir) == []
def test_sql_above_threshold(tmpdir):
tmpdir = str(tmpdir)
db = FakeDatabase()
log = RecordedQueryLog(collections.deque(maxlen=100), db, dest=tmpdir)
log.threshold = 0.00000001
for _ in range(5):
log.append(QUERY)
assert len(log) == 5
assert len(db.execute_calls) == 5
for _call in db.execute_calls:
args, kw = _call
assert args == ('EXPLAIN VERBOSE {}'.format(QUERY['sql']),)
path = os.path.join(
tmpdir,
'{}.sqlite'.format(os.path.basename(sys.argv[0]))
)
assert os.path.exists(path)
# verify the results
def dict_factory(cursor, row):
d = {}
for idx,col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
cursor = sqlite3.connect(path)
cursor.row_factory = dict_factory
queries_logged = cursor.execute('SELECT * FROM queries').fetchall()
assert len(queries_logged) == 5
for q in queries_logged:
assert q['pid'] == os.getpid()
assert q['version'] == awx.__version__
assert q['time'] == 0.01
assert q['sql'] == QUERY['sql']
assert EXPLAIN in q['explain']
assert 'test_sql_above_threshold' in q['bt']