De-flake the collector test.

This commit is contained in:
Bill Nottingham
2020-04-21 16:25:49 -04:00
parent 921feb561d
commit 58c821f3e1

View File

@@ -22,61 +22,72 @@ from awx.main.models import (
def sqlite_copy_expert(request):
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
# behavior to test that it writes a file that contains stdout from events
path = tempfile.mkdtemp(prefix='copied_tables')
path = tempfile.mkdtemp(prefix="copied_tables")
def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here.
assert sql.startswith('COPY (')
assert sql.endswith(') TO STDOUT WITH CSV HEADER')
assert sql.startswith("COPY (")
assert sql.endswith(") TO STDOUT WITH CSV HEADER")
sql = sql.replace('COPY (', '')
sql = sql.replace(') TO STDOUT WITH CSV HEADER', '')
sql = sql.replace("COPY (", "")
sql = sql.replace(") TO STDOUT WITH CSV HEADER", "")
# sqlite equivalent
sql = sql.replace('ARRAY_AGG', 'GROUP_CONCAT')
sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT")
# Remove JSON style queries
# TODO: could replace JSON style queries with sqlite kind of equivalents
sql_new = []
for line in sql.split('\n'):
if line.find('main_jobevent.event_data::') == -1:
for line in sql.split("\n"):
if line.find("main_jobevent.event_data::") == -1:
sql_new.append(line)
elif not line.endswith(','):
sql_new[-1] = sql_new[-1].rstrip(',')
sql = '\n'.join(sql_new)
elif not line.endswith(","):
sql_new[-1] = sql_new[-1].rstrip(",")
sql = "\n".join(sql_new)
self.execute(sql)
results = self.fetchall()
headers = [i[0] for i in self.description]
csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n')
csv_handle = csv.writer(
fd,
delimiter=",",
quoting=csv.QUOTE_ALL,
escapechar="\\",
lineterminator="\n",
)
csv_handle.writerow(headers)
csv_handle.writerows(results)
setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout)
setattr(SQLiteCursorWrapper, "copy_expert", write_stdout)
request.addfinalizer(lambda: shutil.rmtree(path))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert'))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert"))
return path
@pytest.mark.django_db
def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template):
'''
def test_copy_tables_unified_job_query(
sqlite_copy_expert, project, inventory, job_template
):
"""
Ensure that various unified job types are in the output of the query.
'''
"""
time_start = now()
inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce')
inv_src = InventorySource.objects.create(
name="inventory_update1", inventory=inventory, source="gce"
)
project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name
project_update_name = ProjectUpdate.objects.create(
project=project, name="project_update1"
).name
inventory_update_name = inv_src.create_unified_job().name
job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset='unified_jobs')
with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f:
lines = ''.join([l for l in f])
collectors.copy_tables(time_start, tmpdir, subset="unified_jobs")
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
lines = "".join([l for l in f])
assert project_update_name in lines
assert inventory_update_name in lines
@@ -84,7 +95,7 @@ def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, j
@pytest.fixture
def workflow_job(states=['new', 'new', 'new', 'new', 'new']):
def workflow_job(states=["new", "new", "new", "new", "new"]):
"""
Workflow topology:
node[0]
@@ -98,8 +109,11 @@ def workflow_job(states=['new', 'new', 'new', 'new', 'new']):
node[2] node[4]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
jt = JobTemplate.objects.create(name="test-jt")
nodes = [
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt)
for i in range(0, 6)
]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
@@ -119,8 +133,8 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now()
with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset='workflow_job_node_query')
with open(os.path.join(tmpdir, 'workflow_job_node_table.csv')) as f:
collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query")
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
reader = csv.reader(f)
# Pop the headers
next(reader)
@@ -128,10 +142,19 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
ids = [int(l[0]) for l in lines]
assert ids == list(workflow_job.workflow_nodes.all().values_list('id', flat=True))
assert ids == list(
workflow_job.workflow_nodes.all().values_list("id", flat=True)
)
for index, relationship in zip([7, 8, 9], ['success_nodes', 'failure_nodes', 'always_nodes']):
for index, relationship in zip(
[7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"]
):
for i, l in enumerate(lines):
related_nodes = [int(e) for e in l[index].split(',')] if l[index] else []
assert related_nodes == list(getattr(workflow_job.workflow_nodes.all()[i], relationship).all().values_list('id', flat=True)), \
f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"
related_nodes = (
[int(e) for e in l[index].split(",")] if l[index] else []
)
assert related_nodes == list(
getattr(workflow_job.workflow_nodes.all()[i], relationship)
.all()
.values_list("id", flat=True)
), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"