From 783b744bdbd2924e8e00dcd778e73d32f063a8ec Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 23 Jun 2022 15:54:53 -0400 Subject: [PATCH] Pass combined artifacts from nested workflows into downstream nodes (#12223) * Track combined artifacts on workflow jobs * Avoid schema change for passing nested workflow artifacts * Basic support for nested workflow artifacts, add test * Forgot that only does not work with polymorphic * Remove incorrect field * Consolidate logic and prevent recursion with UJ artifacts method * Stop trying to do precedence by status, filter for obvious ones * Review comments about sets * Fix up bug with convergence node paths and artifacts --- awx/main/models/jobs.py | 6 ++++ awx/main/models/unified_jobs.py | 4 +++ awx/main/models/workflow.py | 25 +++++++++++-- awx/main/scheduler/task_manager.py | 4 +-- .../tests/functional/models/test_workflow.py | 36 +++++++++++++++++++ 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c879f08e93..b1926435b1 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -743,6 +743,12 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana return "$hidden due to Ansible no_log flag$" return artifacts + def get_effective_artifacts(self, **kwargs): + """Return unified job artifacts (from set_stats) to pass downstream in workflows""" + if isinstance(self.artifacts, dict): + return self.artifacts + return {} + @property def is_container_group_task(self): return bool(self.instance_group and self.instance_group.is_container_group) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c47c42969a..47794a7ef8 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1204,6 +1204,10 @@ class UnifiedJob( pass return None + def get_effective_artifacts(self, **kwargs): + """Return unified job artifacts (from set_stats) to pass downstream in workflows""" + return {} + def get_passwords_needed_to_start(self): return [] diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 30a2574748..b5479a5be9 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -318,8 +318,8 @@ class WorkflowJobNode(WorkflowNodeBase): for parent_node in self.get_parent_nodes(): is_root_node = False aa_dict.update(parent_node.ancestor_artifacts) - if parent_node.job and hasattr(parent_node.job, 'artifacts'): - aa_dict.update(parent_node.job.artifacts) + if parent_node.job: + aa_dict.update(parent_node.job.get_effective_artifacts(parents_set=set([self.workflow_job_id]))) if aa_dict and not is_root_node: self.ancestor_artifacts = aa_dict self.save(update_fields=['ancestor_artifacts']) @@ -682,6 +682,27 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio wj = wj.get_workflow_job() return ancestors + def get_effective_artifacts(self, **kwargs): + """ + For downstream jobs of a workflow nested inside of a workflow, + we send aggregated artifacts from the nodes inside of the nested workflow + """ + artifacts = {} + job_queryset = ( + UnifiedJob.objects.filter(unified_job_node__workflow_job=self) + .defer('job_args', 'job_cwd', 'start_args', 'result_traceback') + .order_by('finished', 'id') + .filter(status__in=['successful', 'failed']) + .iterator() + ) + parents_set = kwargs.get('parents_set', set()) + new_parents_set = parents_set | {self.id} + for job in job_queryset: + if job.id in parents_set: + continue + artifacts.update(job.get_effective_artifacts(parents_set=new_parents_set)) + return artifacts + def get_notification_templates(self): return self.workflow_job_template.notification_templates diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 5ca72763d0..8c2f193a1c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -248,11 +248,11 @@ class TaskManager: workflow_job.save(update_fields=update_fields) status_changed = True if status_changed: + if workflow_job.spawned_by_workflow: + schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') - if workflow_job.spawned_by_workflow: - schedule_task_manager() return result @timeit diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index 9544e43561..d8fa495c6c 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -19,6 +19,7 @@ from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList # Django from django.test import TransactionTestCase from django.core.exceptions import ValidationError +from django.utils.timezone import now class TestWorkflowDAGFunctional(TransactionTestCase): @@ -381,3 +382,38 @@ def test_workflow_ancestors_recursion_prevention(organization): WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) # well, this is a problem # mostly, we just care that this assertion finishes in finite time assert wfj.get_ancestor_workflows() == [] + + +@pytest.mark.django_db +class TestCombinedArtifacts: + @pytest.fixture + def wfj_artifacts(self, job_template, organization): + wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='has_artifacts') + wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow') + job = job_template.create_unified_job(_eager_fields=dict(artifacts={'foooo': 'bar'}, status='successful', finished=now())) + WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template, job=job) + return wfj + + def test_multiple_types(self, project, wfj_artifacts): + project_update = project.create_unified_job() + WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=project, job=project_update) + + assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'bar'} + + def test_precedence_based_on_time(self, wfj_artifacts, job_template): + later_job = job_template.create_unified_job( + _eager_fields=dict(artifacts={'foooo': 'zoo'}, status='successful', finished=now()) # finished later, should win + ) + WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=job_template, job=later_job) + + assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'zoo'} + + def test_bad_data_with_artifacts(self, organization): + # This is toxic database data, this tests that it doesn't create an infinite loop + wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='child') + wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow') + WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) + job = Job.objects.create(artifacts={'foo': 'bar'}, status='successful') + WorkflowJobNode.objects.create(workflow_job=wfj, job=job) + # mostly, we just care that this assertion finishes in finite time + assert wfj.get_effective_artifacts() == {'foo': 'bar'}