Files
awx/awx/main/tests/functional/analytics/test_collectors.py
2020-04-21 20:21:38 +02:00

138 lines
4.8 KiB
Python

import pytest
import tempfile
import os
import shutil
import csv
from django.utils.timezone import now
from django.db.backends.sqlite3.base import SQLiteCursorWrapper
from awx.main.analytics import collectors
from awx.main.models import (
ProjectUpdate,
InventorySource,
WorkflowJob,
WorkflowJobNode,
JobTemplate,
)
@pytest.fixture
def sqlite_copy_expert(request):
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
# behavior to test that it writes a file that contains stdout from events
path = tempfile.mkdtemp(prefix='copied_tables')
def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here.
assert sql.startswith('COPY (')
assert sql.endswith(') TO STDOUT WITH CSV HEADER')
sql = sql.replace('COPY (', '')
sql = sql.replace(') TO STDOUT WITH CSV HEADER', '')
# sqlite equivalent
sql = sql.replace('ARRAY_AGG', 'GROUP_CONCAT')
# Remove JSON style queries
# TODO: could replace JSON style queries with sqlite kind of equivalents
sql_new = []
for line in sql.split('\n'):
if line.find('main_jobevent.event_data::') == -1:
sql_new.append(line)
elif not line.endswith(','):
sql_new[-1] = sql_new[-1].rstrip(',')
sql = '\n'.join(sql_new)
self.execute(sql)
results = self.fetchall()
headers = [i[0] for i in self.description]
csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n')
csv_handle.writerow(headers)
csv_handle.writerows(results)
setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout)
request.addfinalizer(lambda: shutil.rmtree(path))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert'))
return path
@pytest.mark.django_db
def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template):
'''
Ensure that various unified job types are in the output of the query.
'''
time_start = now()
inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce')
project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name
inventory_update_name = inv_src.create_unified_job().name
job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset='unified_jobs')
with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f:
lines = ''.join([l for l in f])
assert project_update_name in lines
assert inventory_update_name in lines
assert job_name in lines
@pytest.fixture
def workflow_job(states=['new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1,5] node[3]
/ \
s/ \f
/ \
node[2] node[4]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[0].success_nodes.add(nodes[5])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
return wfj
@pytest.mark.django_db
def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now()
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset='workflow_job_node_query')
with open(os.path.join(tmpdir, 'workflow_job_node_table.csv')) as f:
reader = csv.reader(f)
# Pop the headers
next(reader)
lines = [l for l in reader]
ids = [int(l[0]) for l in lines]
assert ids == list(workflow_job.workflow_nodes.all().values_list('id', flat=True))
for index, relationship in zip([7, 8, 9], ['success_nodes', 'failure_nodes', 'always_nodes']):
for i, l in enumerate(lines):
related_nodes = [int(e) for e in l[index].split(',')] if l[index] else []
assert related_nodes == list(getattr(workflow_job.workflow_nodes.all()[i], relationship).all().values_list('id', flat=True)), \
f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"