mirror of
https://github.com/ansible/awx.git
synced 2026-02-26 23:46:05 -03:30
Pass combined artifacts from nested workflows into downstream nodes (#12223)
* Track combined artifacts on workflow jobs * Avoid schema change for passing nested workflow artifacts * Basic support for nested workflow artifacts, add test * Forgot that only does not work with polymorphic * Remove incorrect field * Consolidate logic and prevent recursion with UJ artifacts method * Stop trying to do precedence by status, filter for obvious ones * Review comments about sets * Fix up bug with convergence node paths and artifacts
This commit is contained in:
@@ -743,6 +743,12 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
|
|||||||
return "$hidden due to Ansible no_log flag$"
|
return "$hidden due to Ansible no_log flag$"
|
||||||
return artifacts
|
return artifacts
|
||||||
|
|
||||||
|
def get_effective_artifacts(self, **kwargs):
|
||||||
|
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
|
||||||
|
if isinstance(self.artifacts, dict):
|
||||||
|
return self.artifacts
|
||||||
|
return {}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_container_group_task(self):
|
def is_container_group_task(self):
|
||||||
return bool(self.instance_group and self.instance_group.is_container_group)
|
return bool(self.instance_group and self.instance_group.is_container_group)
|
||||||
|
|||||||
@@ -1204,6 +1204,10 @@ class UnifiedJob(
|
|||||||
pass
|
pass
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def get_effective_artifacts(self, **kwargs):
|
||||||
|
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
|
||||||
|
return {}
|
||||||
|
|
||||||
def get_passwords_needed_to_start(self):
|
def get_passwords_needed_to_start(self):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|||||||
@@ -318,8 +318,8 @@ class WorkflowJobNode(WorkflowNodeBase):
|
|||||||
for parent_node in self.get_parent_nodes():
|
for parent_node in self.get_parent_nodes():
|
||||||
is_root_node = False
|
is_root_node = False
|
||||||
aa_dict.update(parent_node.ancestor_artifacts)
|
aa_dict.update(parent_node.ancestor_artifacts)
|
||||||
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
|
if parent_node.job:
|
||||||
aa_dict.update(parent_node.job.artifacts)
|
aa_dict.update(parent_node.job.get_effective_artifacts(parents_set=set([self.workflow_job_id])))
|
||||||
if aa_dict and not is_root_node:
|
if aa_dict and not is_root_node:
|
||||||
self.ancestor_artifacts = aa_dict
|
self.ancestor_artifacts = aa_dict
|
||||||
self.save(update_fields=['ancestor_artifacts'])
|
self.save(update_fields=['ancestor_artifacts'])
|
||||||
@@ -682,6 +682,27 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
|||||||
wj = wj.get_workflow_job()
|
wj = wj.get_workflow_job()
|
||||||
return ancestors
|
return ancestors
|
||||||
|
|
||||||
|
def get_effective_artifacts(self, **kwargs):
|
||||||
|
"""
|
||||||
|
For downstream jobs of a workflow nested inside of a workflow,
|
||||||
|
we send aggregated artifacts from the nodes inside of the nested workflow
|
||||||
|
"""
|
||||||
|
artifacts = {}
|
||||||
|
job_queryset = (
|
||||||
|
UnifiedJob.objects.filter(unified_job_node__workflow_job=self)
|
||||||
|
.defer('job_args', 'job_cwd', 'start_args', 'result_traceback')
|
||||||
|
.order_by('finished', 'id')
|
||||||
|
.filter(status__in=['successful', 'failed'])
|
||||||
|
.iterator()
|
||||||
|
)
|
||||||
|
parents_set = kwargs.get('parents_set', set())
|
||||||
|
new_parents_set = parents_set | {self.id}
|
||||||
|
for job in job_queryset:
|
||||||
|
if job.id in parents_set:
|
||||||
|
continue
|
||||||
|
artifacts.update(job.get_effective_artifacts(parents_set=new_parents_set))
|
||||||
|
return artifacts
|
||||||
|
|
||||||
def get_notification_templates(self):
|
def get_notification_templates(self):
|
||||||
return self.workflow_job_template.notification_templates
|
return self.workflow_job_template.notification_templates
|
||||||
|
|
||||||
|
|||||||
@@ -248,11 +248,11 @@ class TaskManager:
|
|||||||
workflow_job.save(update_fields=update_fields)
|
workflow_job.save(update_fields=update_fields)
|
||||||
status_changed = True
|
status_changed = True
|
||||||
if status_changed:
|
if status_changed:
|
||||||
|
if workflow_job.spawned_by_workflow:
|
||||||
|
schedule_task_manager()
|
||||||
workflow_job.websocket_emit_status(workflow_job.status)
|
workflow_job.websocket_emit_status(workflow_job.status)
|
||||||
# Operations whose queries rely on modifications made during the atomic scheduling session
|
# Operations whose queries rely on modifications made during the atomic scheduling session
|
||||||
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
|
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
|
||||||
if workflow_job.spawned_by_workflow:
|
|
||||||
schedule_task_manager()
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@timeit
|
@timeit
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList
|
|||||||
# Django
|
# Django
|
||||||
from django.test import TransactionTestCase
|
from django.test import TransactionTestCase
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
|
from django.utils.timezone import now
|
||||||
|
|
||||||
|
|
||||||
class TestWorkflowDAGFunctional(TransactionTestCase):
|
class TestWorkflowDAGFunctional(TransactionTestCase):
|
||||||
@@ -381,3 +382,38 @@ def test_workflow_ancestors_recursion_prevention(organization):
|
|||||||
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) # well, this is a problem
|
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) # well, this is a problem
|
||||||
# mostly, we just care that this assertion finishes in finite time
|
# mostly, we just care that this assertion finishes in finite time
|
||||||
assert wfj.get_ancestor_workflows() == []
|
assert wfj.get_ancestor_workflows() == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestCombinedArtifacts:
|
||||||
|
@pytest.fixture
|
||||||
|
def wfj_artifacts(self, job_template, organization):
|
||||||
|
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='has_artifacts')
|
||||||
|
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
|
||||||
|
job = job_template.create_unified_job(_eager_fields=dict(artifacts={'foooo': 'bar'}, status='successful', finished=now()))
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template, job=job)
|
||||||
|
return wfj
|
||||||
|
|
||||||
|
def test_multiple_types(self, project, wfj_artifacts):
|
||||||
|
project_update = project.create_unified_job()
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=project, job=project_update)
|
||||||
|
|
||||||
|
assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'bar'}
|
||||||
|
|
||||||
|
def test_precedence_based_on_time(self, wfj_artifacts, job_template):
|
||||||
|
later_job = job_template.create_unified_job(
|
||||||
|
_eager_fields=dict(artifacts={'foooo': 'zoo'}, status='successful', finished=now()) # finished later, should win
|
||||||
|
)
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=job_template, job=later_job)
|
||||||
|
|
||||||
|
assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'zoo'}
|
||||||
|
|
||||||
|
def test_bad_data_with_artifacts(self, organization):
|
||||||
|
# This is toxic database data, this tests that it doesn't create an infinite loop
|
||||||
|
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='child')
|
||||||
|
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj)
|
||||||
|
job = Job.objects.create(artifacts={'foo': 'bar'}, status='successful')
|
||||||
|
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
|
||||||
|
# mostly, we just care that this assertion finishes in finite time
|
||||||
|
assert wfj.get_effective_artifacts() == {'foo': 'bar'}
|
||||||
|
|||||||
Reference in New Issue
Block a user