Files
awx/awx/main/tests/functional/analytics/test_collectors.py
2020-05-14 15:43:50 -04:00

162 lines
5.2 KiB
Python

import pytest
import tempfile
import os
import shutil
import csv
from django.utils.timezone import now
from datetime import timedelta
from django.db.backends.sqlite3.base import SQLiteCursorWrapper
from awx.main.analytics import collectors
from awx.main.models import (
ProjectUpdate,
InventorySource,
WorkflowJob,
WorkflowJobNode,
JobTemplate,
)
@pytest.fixture
def sqlite_copy_expert(request):
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
# behavior to test that it writes a file that contains stdout from events
path = tempfile.mkdtemp(prefix="copied_tables")
def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here.
assert sql.startswith("COPY (")
assert sql.endswith(") TO STDOUT WITH CSV HEADER")
sql = sql.replace("COPY (", "")
sql = sql.replace(") TO STDOUT WITH CSV HEADER", "")
# sqlite equivalent
sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT")
# Remove JSON style queries
# TODO: could replace JSON style queries with sqlite kind of equivalents
sql_new = []
for line in sql.split("\n"):
if line.find("main_jobevent.event_data::") == -1:
sql_new.append(line)
elif not line.endswith(","):
sql_new[-1] = sql_new[-1].rstrip(",")
sql = "\n".join(sql_new)
self.execute(sql)
results = self.fetchall()
headers = [i[0] for i in self.description]
csv_handle = csv.writer(
fd,
delimiter=",",
quoting=csv.QUOTE_ALL,
escapechar="\\",
lineterminator="\n",
)
csv_handle.writerow(headers)
csv_handle.writerows(results)
setattr(SQLiteCursorWrapper, "copy_expert", write_stdout)
request.addfinalizer(lambda: shutil.rmtree(path))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert"))
return path
@pytest.mark.django_db
def test_copy_tables_unified_job_query(
sqlite_copy_expert, project, inventory, job_template
):
"""
Ensure that various unified job types are in the output of the query.
"""
time_start = now() - timedelta(hours=9)
inv_src = InventorySource.objects.create(
name="inventory_update1", inventory=inventory, source="gce"
)
project_update_name = ProjectUpdate.objects.create(
project=project, name="project_update1"
).name
inventory_update_name = inv_src.create_unified_job().name
job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="unified_jobs")
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
lines = "".join([line for line in f])
assert project_update_name in lines
assert inventory_update_name in lines
assert job_name in lines
@pytest.fixture
def workflow_job(states=["new", "new", "new", "new", "new"]):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1,5] node[3]
/ \
s/ \f
/ \
node[2] node[4]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name="test-jt")
nodes = [
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt)
for i in range(0, 6)
]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[0].success_nodes.add(nodes[5])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
return wfj
@pytest.mark.django_db
def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now() - timedelta(hours=9)
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query")
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
reader = csv.reader(f)
# Pop the headers
next(reader)
lines = [line for line in reader]
ids = [int(line[0]) for line in lines]
assert ids == list(
workflow_job.workflow_nodes.all().values_list("id", flat=True)
)
for index, relationship in zip(
[7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"]
):
for i, l in enumerate(lines):
related_nodes = (
[int(e) for e in l[index].split(",")] if l[index] else []
)
assert related_nodes == list(
getattr(workflow_job.workflow_nodes.all()[i], relationship)
.all()
.values_list("id", flat=True)
), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"