diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 852d6a71ec..331983e24a 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -230,7 +230,9 @@ def query_info(since, collection_type): @table_version('events_table.csv', '1.1') @table_version('unified_jobs_table.csv', '1.0') @table_version('unified_job_template_table.csv', '1.0') -def copy_tables(since, full_path): +@table_version('workflow_job_node_table.csv', '1.0') +@table_version('workflow_job_template_node_table.csv', '1.0') +def copy_tables(since, full_path, subset=None): def _copy_table(table, query, path): file_path = os.path.join(path, table + '_table.csv') file = open(file_path, 'w', encoding='utf-8') @@ -262,7 +264,8 @@ def copy_tables(since, full_path): FROM main_jobevent WHERE main_jobevent.created > {} ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='events', query=events_query, path=full_path) + if not subset or 'events' in subset: + _copy_table(table='events', query=events_query, path=full_path) unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -290,7 +293,8 @@ def copy_tables(since, full_path): WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) AND main_unifiedjob.launch_type != 'sync' ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) + if not subset or 'unified_jobs' in subset: + _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, main_unifiedjobtemplate.polymorphic_ctype_id, @@ -309,6 +313,71 @@ def copy_tables(since, full_path): main_unifiedjobtemplate.status FROM main_unifiedjobtemplate, django_content_type WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id - ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' - _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' + if not subset or 'unified_job_template' in subset: + _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + + workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, + main_workflowjobnode.created, + main_workflowjobnode.modified, + main_workflowjobnode.job_id, + main_workflowjobnode.unified_job_template_id, + main_workflowjobnode.workflow_job_id, + main_workflowjobnode.inventory_id, + success_nodes.nodes AS success_nodes, + failure_nodes.nodes AS failure_nodes, + always_nodes.nodes AS always_nodes, + main_workflowjobnode.do_not_run, + main_workflowjobnode.all_parents_must_converge + FROM main_workflowjobnode + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_success_nodes + GROUP BY from_workflowjobnode_id + ) success_nodes ON main_workflowjobnode.id = success_nodes.from_workflowjobnode_id + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_failure_nodes + GROUP BY from_workflowjobnode_id + ) failure_nodes ON main_workflowjobnode.id = failure_nodes.from_workflowjobnode_id + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_always_nodes + GROUP BY from_workflowjobnode_id + ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id + WHERE main_workflowjobnode.modified > {} + ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + if not subset or 'workflow_job_node' in subset: + _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) + + workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, + main_workflowjobtemplatenode.created, + main_workflowjobtemplatenode.modified, + main_workflowjobtemplatenode.unified_job_template_id, + main_workflowjobtemplatenode.workflow_job_template_id, + main_workflowjobtemplatenode.inventory_id, + success_nodes.nodes AS success_nodes, + failure_nodes.nodes AS failure_nodes, + always_nodes.nodes AS always_nodes, + main_workflowjobtemplatenode.all_parents_must_converge + FROM main_workflowjobtemplatenode + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_success_nodes + GROUP BY from_workflowjobtemplatenode_id + ) success_nodes ON main_workflowjobtemplatenode.id = success_nodes.from_workflowjobtemplatenode_id + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_failure_nodes + GROUP BY from_workflowjobtemplatenode_id + ) failure_nodes ON main_workflowjobtemplatenode.id = failure_nodes.from_workflowjobtemplatenode_id + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_always_nodes + GROUP BY from_workflowjobtemplatenode_id + ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id + ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' + if not subset or 'workflow_job_template_node' in subset: + _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) + return diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 21a243c907..d1c426b8ec 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -12,6 +12,9 @@ from awx.main.analytics import collectors from awx.main.models import ( ProjectUpdate, InventorySource, + WorkflowJob, + WorkflowJobNode, + JobTemplate, ) @@ -19,60 +22,139 @@ from awx.main.models import ( def sqlite_copy_expert(request): # copy_expert is postgres-specific, and SQLite doesn't support it; mock its # behavior to test that it writes a file that contains stdout from events - path = tempfile.mkdtemp(prefix='copied_tables') + path = tempfile.mkdtemp(prefix="copied_tables") def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified # it that way. But instead, we just take the nieve approach here. - assert sql.startswith('COPY (') - assert sql.endswith(') TO STDOUT WITH CSV HEADER') + assert sql.startswith("COPY (") + assert sql.endswith(") TO STDOUT WITH CSV HEADER") - sql = sql.replace('COPY (', '') - sql = sql.replace(') TO STDOUT WITH CSV HEADER', '') + sql = sql.replace("COPY (", "") + sql = sql.replace(") TO STDOUT WITH CSV HEADER", "") + # sqlite equivalent + sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT") # Remove JSON style queries # TODO: could replace JSON style queries with sqlite kind of equivalents sql_new = [] - for line in sql.split('\n'): - if line.find('main_jobevent.event_data::') == -1: + for line in sql.split("\n"): + if line.find("main_jobevent.event_data::") == -1: sql_new.append(line) - elif not line.endswith(','): - sql_new[-1] = sql_new[-1].rstrip(',') - sql = '\n'.join(sql_new) + elif not line.endswith(","): + sql_new[-1] = sql_new[-1].rstrip(",") + sql = "\n".join(sql_new) self.execute(sql) results = self.fetchall() headers = [i[0] for i in self.description] - csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n') + csv_handle = csv.writer( + fd, + delimiter=",", + quoting=csv.QUOTE_ALL, + escapechar="\\", + lineterminator="\n", + ) csv_handle.writerow(headers) csv_handle.writerows(results) - - setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout) + setattr(SQLiteCursorWrapper, "copy_expert", write_stdout) request.addfinalizer(lambda: shutil.rmtree(path)) - request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert')) + request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert")) return path @pytest.mark.django_db -def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template): - ''' +def test_copy_tables_unified_job_query( + sqlite_copy_expert, project, inventory, job_template +): + """ Ensure that various unified job types are in the output of the query. - ''' + """ time_start = now() - inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce') + inv_src = InventorySource.objects.create( + name="inventory_update1", inventory=inventory, source="gce" + ) - project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name + project_update_name = ProjectUpdate.objects.create( + project=project, name="project_update1" + ).name inventory_update_name = inv_src.create_unified_job().name job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir) - with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f: - lines = ''.join([l for l in f]) + collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") + with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: + lines = "".join([l for l in f]) assert project_update_name in lines assert inventory_update_name in lines assert job_name in lines + + +@pytest.fixture +def workflow_job(states=["new", "new", "new", "new", "new"]): + """ + Workflow topology: + node[0] + /\ + s/ \f + / \ + node[1,5] node[3] + / \ + s/ \f + / \ + node[2] node[4] + """ + wfj = WorkflowJob.objects.create() + jt = JobTemplate.objects.create(name="test-jt") + nodes = [ + WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) + for i in range(0, 6) + ] + for node, state in zip(nodes, states): + if state: + node.job = jt.create_job() + node.job.status = state + node.job.save() + node.save() + nodes[0].success_nodes.add(nodes[1]) + nodes[0].success_nodes.add(nodes[5]) + nodes[1].success_nodes.add(nodes[2]) + nodes[0].failure_nodes.add(nodes[3]) + nodes[3].failure_nodes.add(nodes[4]) + return wfj + + +@pytest.mark.django_db +def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): + time_start = now() + + with tempfile.TemporaryDirectory() as tmpdir: + collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") + with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: + reader = csv.reader(f) + # Pop the headers + next(reader) + lines = [l for l in reader] + + ids = [int(l[0]) for l in lines] + + assert ids == list( + workflow_job.workflow_nodes.all().values_list("id", flat=True) + ) + + for index, relationship in zip( + [7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"] + ): + for i, l in enumerate(lines): + related_nodes = ( + [int(e) for e in l[index].split(",")] if l[index] else [] + ) + assert related_nodes == list( + getattr(workflow_job.workflow_nodes.all()[i], relationship) + .all() + .values_list("id", flat=True) + ), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"