mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
Merge pull request #6204 from Ladas/send_job_and_template_nodes_to_analytics
Send job and template nodes to analytics Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
@@ -230,7 +230,9 @@ def query_info(since, collection_type):
|
|||||||
@table_version('events_table.csv', '1.1')
|
@table_version('events_table.csv', '1.1')
|
||||||
@table_version('unified_jobs_table.csv', '1.0')
|
@table_version('unified_jobs_table.csv', '1.0')
|
||||||
@table_version('unified_job_template_table.csv', '1.0')
|
@table_version('unified_job_template_table.csv', '1.0')
|
||||||
def copy_tables(since, full_path):
|
@table_version('workflow_job_node_table.csv', '1.0')
|
||||||
|
@table_version('workflow_job_template_node_table.csv', '1.0')
|
||||||
|
def copy_tables(since, full_path, subset=None):
|
||||||
def _copy_table(table, query, path):
|
def _copy_table(table, query, path):
|
||||||
file_path = os.path.join(path, table + '_table.csv')
|
file_path = os.path.join(path, table + '_table.csv')
|
||||||
file = open(file_path, 'w', encoding='utf-8')
|
file = open(file_path, 'w', encoding='utf-8')
|
||||||
@@ -262,7 +264,8 @@ def copy_tables(since, full_path):
|
|||||||
FROM main_jobevent
|
FROM main_jobevent
|
||||||
WHERE main_jobevent.created > {}
|
WHERE main_jobevent.created > {}
|
||||||
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
|
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
|
||||||
_copy_table(table='events', query=events_query, path=full_path)
|
if not subset or 'events' in subset:
|
||||||
|
_copy_table(table='events', query=events_query, path=full_path)
|
||||||
|
|
||||||
unified_job_query = '''COPY (SELECT main_unifiedjob.id,
|
unified_job_query = '''COPY (SELECT main_unifiedjob.id,
|
||||||
main_unifiedjob.polymorphic_ctype_id,
|
main_unifiedjob.polymorphic_ctype_id,
|
||||||
@@ -290,7 +293,8 @@ def copy_tables(since, full_path):
|
|||||||
WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0})
|
WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0})
|
||||||
AND main_unifiedjob.launch_type != 'sync'
|
AND main_unifiedjob.launch_type != 'sync'
|
||||||
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
|
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
|
||||||
_copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
|
if not subset or 'unified_jobs' in subset:
|
||||||
|
_copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
|
||||||
|
|
||||||
unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id,
|
unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id,
|
||||||
main_unifiedjobtemplate.polymorphic_ctype_id,
|
main_unifiedjobtemplate.polymorphic_ctype_id,
|
||||||
@@ -309,6 +313,71 @@ def copy_tables(since, full_path):
|
|||||||
main_unifiedjobtemplate.status
|
main_unifiedjobtemplate.status
|
||||||
FROM main_unifiedjobtemplate, django_content_type
|
FROM main_unifiedjobtemplate, django_content_type
|
||||||
WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id
|
WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id
|
||||||
ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER'''
|
ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER'''
|
||||||
_copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
|
if not subset or 'unified_job_template' in subset:
|
||||||
|
_copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
|
||||||
|
|
||||||
|
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
|
||||||
|
main_workflowjobnode.created,
|
||||||
|
main_workflowjobnode.modified,
|
||||||
|
main_workflowjobnode.job_id,
|
||||||
|
main_workflowjobnode.unified_job_template_id,
|
||||||
|
main_workflowjobnode.workflow_job_id,
|
||||||
|
main_workflowjobnode.inventory_id,
|
||||||
|
success_nodes.nodes AS success_nodes,
|
||||||
|
failure_nodes.nodes AS failure_nodes,
|
||||||
|
always_nodes.nodes AS always_nodes,
|
||||||
|
main_workflowjobnode.do_not_run,
|
||||||
|
main_workflowjobnode.all_parents_must_converge
|
||||||
|
FROM main_workflowjobnode
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes
|
||||||
|
FROM main_workflowjobnode_success_nodes
|
||||||
|
GROUP BY from_workflowjobnode_id
|
||||||
|
) success_nodes ON main_workflowjobnode.id = success_nodes.from_workflowjobnode_id
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes
|
||||||
|
FROM main_workflowjobnode_failure_nodes
|
||||||
|
GROUP BY from_workflowjobnode_id
|
||||||
|
) failure_nodes ON main_workflowjobnode.id = failure_nodes.from_workflowjobnode_id
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes
|
||||||
|
FROM main_workflowjobnode_always_nodes
|
||||||
|
GROUP BY from_workflowjobnode_id
|
||||||
|
) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id
|
||||||
|
WHERE main_workflowjobnode.modified > {}
|
||||||
|
ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
|
||||||
|
if not subset or 'workflow_job_node' in subset:
|
||||||
|
_copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path)
|
||||||
|
|
||||||
|
workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id,
|
||||||
|
main_workflowjobtemplatenode.created,
|
||||||
|
main_workflowjobtemplatenode.modified,
|
||||||
|
main_workflowjobtemplatenode.unified_job_template_id,
|
||||||
|
main_workflowjobtemplatenode.workflow_job_template_id,
|
||||||
|
main_workflowjobtemplatenode.inventory_id,
|
||||||
|
success_nodes.nodes AS success_nodes,
|
||||||
|
failure_nodes.nodes AS failure_nodes,
|
||||||
|
always_nodes.nodes AS always_nodes,
|
||||||
|
main_workflowjobtemplatenode.all_parents_must_converge
|
||||||
|
FROM main_workflowjobtemplatenode
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes
|
||||||
|
FROM main_workflowjobtemplatenode_success_nodes
|
||||||
|
GROUP BY from_workflowjobtemplatenode_id
|
||||||
|
) success_nodes ON main_workflowjobtemplatenode.id = success_nodes.from_workflowjobtemplatenode_id
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes
|
||||||
|
FROM main_workflowjobtemplatenode_failure_nodes
|
||||||
|
GROUP BY from_workflowjobtemplatenode_id
|
||||||
|
) failure_nodes ON main_workflowjobtemplatenode.id = failure_nodes.from_workflowjobtemplatenode_id
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes
|
||||||
|
FROM main_workflowjobtemplatenode_always_nodes
|
||||||
|
GROUP BY from_workflowjobtemplatenode_id
|
||||||
|
) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id
|
||||||
|
ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''
|
||||||
|
if not subset or 'workflow_job_template_node' in subset:
|
||||||
|
_copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ from awx.main.analytics import collectors
|
|||||||
from awx.main.models import (
|
from awx.main.models import (
|
||||||
ProjectUpdate,
|
ProjectUpdate,
|
||||||
InventorySource,
|
InventorySource,
|
||||||
|
WorkflowJob,
|
||||||
|
WorkflowJobNode,
|
||||||
|
JobTemplate,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -19,60 +22,139 @@ from awx.main.models import (
|
|||||||
def sqlite_copy_expert(request):
|
def sqlite_copy_expert(request):
|
||||||
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
|
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
|
||||||
# behavior to test that it writes a file that contains stdout from events
|
# behavior to test that it writes a file that contains stdout from events
|
||||||
path = tempfile.mkdtemp(prefix='copied_tables')
|
path = tempfile.mkdtemp(prefix="copied_tables")
|
||||||
|
|
||||||
def write_stdout(self, sql, fd):
|
def write_stdout(self, sql, fd):
|
||||||
# Would be cool if we instead properly disected the SQL query and verified
|
# Would be cool if we instead properly disected the SQL query and verified
|
||||||
# it that way. But instead, we just take the nieve approach here.
|
# it that way. But instead, we just take the nieve approach here.
|
||||||
assert sql.startswith('COPY (')
|
assert sql.startswith("COPY (")
|
||||||
assert sql.endswith(') TO STDOUT WITH CSV HEADER')
|
assert sql.endswith(") TO STDOUT WITH CSV HEADER")
|
||||||
|
|
||||||
sql = sql.replace('COPY (', '')
|
sql = sql.replace("COPY (", "")
|
||||||
sql = sql.replace(') TO STDOUT WITH CSV HEADER', '')
|
sql = sql.replace(") TO STDOUT WITH CSV HEADER", "")
|
||||||
|
# sqlite equivalent
|
||||||
|
sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT")
|
||||||
|
|
||||||
# Remove JSON style queries
|
# Remove JSON style queries
|
||||||
# TODO: could replace JSON style queries with sqlite kind of equivalents
|
# TODO: could replace JSON style queries with sqlite kind of equivalents
|
||||||
sql_new = []
|
sql_new = []
|
||||||
for line in sql.split('\n'):
|
for line in sql.split("\n"):
|
||||||
if line.find('main_jobevent.event_data::') == -1:
|
if line.find("main_jobevent.event_data::") == -1:
|
||||||
sql_new.append(line)
|
sql_new.append(line)
|
||||||
elif not line.endswith(','):
|
elif not line.endswith(","):
|
||||||
sql_new[-1] = sql_new[-1].rstrip(',')
|
sql_new[-1] = sql_new[-1].rstrip(",")
|
||||||
sql = '\n'.join(sql_new)
|
sql = "\n".join(sql_new)
|
||||||
|
|
||||||
self.execute(sql)
|
self.execute(sql)
|
||||||
results = self.fetchall()
|
results = self.fetchall()
|
||||||
headers = [i[0] for i in self.description]
|
headers = [i[0] for i in self.description]
|
||||||
|
|
||||||
csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n')
|
csv_handle = csv.writer(
|
||||||
|
fd,
|
||||||
|
delimiter=",",
|
||||||
|
quoting=csv.QUOTE_ALL,
|
||||||
|
escapechar="\\",
|
||||||
|
lineterminator="\n",
|
||||||
|
)
|
||||||
csv_handle.writerow(headers)
|
csv_handle.writerow(headers)
|
||||||
csv_handle.writerows(results)
|
csv_handle.writerows(results)
|
||||||
|
|
||||||
|
setattr(SQLiteCursorWrapper, "copy_expert", write_stdout)
|
||||||
setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout)
|
|
||||||
request.addfinalizer(lambda: shutil.rmtree(path))
|
request.addfinalizer(lambda: shutil.rmtree(path))
|
||||||
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert'))
|
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert"))
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template):
|
def test_copy_tables_unified_job_query(
|
||||||
'''
|
sqlite_copy_expert, project, inventory, job_template
|
||||||
|
):
|
||||||
|
"""
|
||||||
Ensure that various unified job types are in the output of the query.
|
Ensure that various unified job types are in the output of the query.
|
||||||
'''
|
"""
|
||||||
|
|
||||||
time_start = now()
|
time_start = now()
|
||||||
inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce')
|
inv_src = InventorySource.objects.create(
|
||||||
|
name="inventory_update1", inventory=inventory, source="gce"
|
||||||
|
)
|
||||||
|
|
||||||
project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name
|
project_update_name = ProjectUpdate.objects.create(
|
||||||
|
project=project, name="project_update1"
|
||||||
|
).name
|
||||||
inventory_update_name = inv_src.create_unified_job().name
|
inventory_update_name = inv_src.create_unified_job().name
|
||||||
job_name = job_template.create_unified_job().name
|
job_name = job_template.create_unified_job().name
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
collectors.copy_tables(time_start, tmpdir)
|
collectors.copy_tables(time_start, tmpdir, subset="unified_jobs")
|
||||||
with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f:
|
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
|
||||||
lines = ''.join([l for l in f])
|
lines = "".join([l for l in f])
|
||||||
|
|
||||||
assert project_update_name in lines
|
assert project_update_name in lines
|
||||||
assert inventory_update_name in lines
|
assert inventory_update_name in lines
|
||||||
assert job_name in lines
|
assert job_name in lines
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def workflow_job(states=["new", "new", "new", "new", "new"]):
|
||||||
|
"""
|
||||||
|
Workflow topology:
|
||||||
|
node[0]
|
||||||
|
/\
|
||||||
|
s/ \f
|
||||||
|
/ \
|
||||||
|
node[1,5] node[3]
|
||||||
|
/ \
|
||||||
|
s/ \f
|
||||||
|
/ \
|
||||||
|
node[2] node[4]
|
||||||
|
"""
|
||||||
|
wfj = WorkflowJob.objects.create()
|
||||||
|
jt = JobTemplate.objects.create(name="test-jt")
|
||||||
|
nodes = [
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt)
|
||||||
|
for i in range(0, 6)
|
||||||
|
]
|
||||||
|
for node, state in zip(nodes, states):
|
||||||
|
if state:
|
||||||
|
node.job = jt.create_job()
|
||||||
|
node.job.status = state
|
||||||
|
node.job.save()
|
||||||
|
node.save()
|
||||||
|
nodes[0].success_nodes.add(nodes[1])
|
||||||
|
nodes[0].success_nodes.add(nodes[5])
|
||||||
|
nodes[1].success_nodes.add(nodes[2])
|
||||||
|
nodes[0].failure_nodes.add(nodes[3])
|
||||||
|
nodes[3].failure_nodes.add(nodes[4])
|
||||||
|
return wfj
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
|
||||||
|
time_start = now()
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query")
|
||||||
|
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
|
||||||
|
reader = csv.reader(f)
|
||||||
|
# Pop the headers
|
||||||
|
next(reader)
|
||||||
|
lines = [l for l in reader]
|
||||||
|
|
||||||
|
ids = [int(l[0]) for l in lines]
|
||||||
|
|
||||||
|
assert ids == list(
|
||||||
|
workflow_job.workflow_nodes.all().values_list("id", flat=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
for index, relationship in zip(
|
||||||
|
[7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"]
|
||||||
|
):
|
||||||
|
for i, l in enumerate(lines):
|
||||||
|
related_nodes = (
|
||||||
|
[int(e) for e in l[index].split(",")] if l[index] else []
|
||||||
|
)
|
||||||
|
assert related_nodes == list(
|
||||||
|
getattr(workflow_job.workflow_nodes.all()[i], relationship)
|
||||||
|
.all()
|
||||||
|
.values_list("id", flat=True)
|
||||||
|
), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"
|
||||||
|
|||||||
Reference in New Issue
Block a user