diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 91aaf7ffbc..d1c426b8ec 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -22,61 +22,72 @@ from awx.main.models import ( def sqlite_copy_expert(request): # copy_expert is postgres-specific, and SQLite doesn't support it; mock its # behavior to test that it writes a file that contains stdout from events - path = tempfile.mkdtemp(prefix='copied_tables') + path = tempfile.mkdtemp(prefix="copied_tables") def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified # it that way. But instead, we just take the nieve approach here. - assert sql.startswith('COPY (') - assert sql.endswith(') TO STDOUT WITH CSV HEADER') + assert sql.startswith("COPY (") + assert sql.endswith(") TO STDOUT WITH CSV HEADER") - sql = sql.replace('COPY (', '') - sql = sql.replace(') TO STDOUT WITH CSV HEADER', '') + sql = sql.replace("COPY (", "") + sql = sql.replace(") TO STDOUT WITH CSV HEADER", "") # sqlite equivalent - sql = sql.replace('ARRAY_AGG', 'GROUP_CONCAT') + sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT") # Remove JSON style queries # TODO: could replace JSON style queries with sqlite kind of equivalents sql_new = [] - for line in sql.split('\n'): - if line.find('main_jobevent.event_data::') == -1: + for line in sql.split("\n"): + if line.find("main_jobevent.event_data::") == -1: sql_new.append(line) - elif not line.endswith(','): - sql_new[-1] = sql_new[-1].rstrip(',') - sql = '\n'.join(sql_new) + elif not line.endswith(","): + sql_new[-1] = sql_new[-1].rstrip(",") + sql = "\n".join(sql_new) self.execute(sql) results = self.fetchall() headers = [i[0] for i in self.description] - csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n') + csv_handle = csv.writer( + fd, + delimiter=",", + quoting=csv.QUOTE_ALL, + escapechar="\\", + lineterminator="\n", + ) csv_handle.writerow(headers) csv_handle.writerows(results) - - setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout) + setattr(SQLiteCursorWrapper, "copy_expert", write_stdout) request.addfinalizer(lambda: shutil.rmtree(path)) - request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert')) + request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert")) return path @pytest.mark.django_db -def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template): - ''' +def test_copy_tables_unified_job_query( + sqlite_copy_expert, project, inventory, job_template +): + """ Ensure that various unified job types are in the output of the query. - ''' + """ time_start = now() - inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce') + inv_src = InventorySource.objects.create( + name="inventory_update1", inventory=inventory, source="gce" + ) - project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name + project_update_name = ProjectUpdate.objects.create( + project=project, name="project_update1" + ).name inventory_update_name = inv_src.create_unified_job().name job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset='unified_jobs') - with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f: - lines = ''.join([l for l in f]) + collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") + with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: + lines = "".join([l for l in f]) assert project_update_name in lines assert inventory_update_name in lines @@ -84,7 +95,7 @@ def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, j @pytest.fixture -def workflow_job(states=['new', 'new', 'new', 'new', 'new']): +def workflow_job(states=["new", "new", "new", "new", "new"]): """ Workflow topology: node[0] @@ -98,8 +109,11 @@ def workflow_job(states=['new', 'new', 'new', 'new', 'new']): node[2] node[4] """ wfj = WorkflowJob.objects.create() - jt = JobTemplate.objects.create(name='test-jt') - nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)] + jt = JobTemplate.objects.create(name="test-jt") + nodes = [ + WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) + for i in range(0, 6) + ] for node, state in zip(nodes, states): if state: node.job = jt.create_job() @@ -119,8 +133,8 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): time_start = now() with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset='workflow_job_node_query') - with open(os.path.join(tmpdir, 'workflow_job_node_table.csv')) as f: + collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") + with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: reader = csv.reader(f) # Pop the headers next(reader) @@ -128,10 +142,19 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): ids = [int(l[0]) for l in lines] - assert ids == list(workflow_job.workflow_nodes.all().values_list('id', flat=True)) + assert ids == list( + workflow_job.workflow_nodes.all().values_list("id", flat=True) + ) - for index, relationship in zip([7, 8, 9], ['success_nodes', 'failure_nodes', 'always_nodes']): + for index, relationship in zip( + [7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"] + ): for i, l in enumerate(lines): - related_nodes = [int(e) for e in l[index].split(',')] if l[index] else [] - assert related_nodes == list(getattr(workflow_job.workflow_nodes.all()[i], relationship).all().values_list('id', flat=True)), \ - f"(right side) workflow_nodes.all()[{i}].{relationship}.all()" + related_nodes = ( + [int(e) for e in l[index].split(",")] if l[index] else [] + ) + assert related_nodes == list( + getattr(workflow_job.workflow_nodes.all()[i], relationship) + .all() + .values_list("id", flat=True) + ), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"