mirror of
https://github.com/ansible/awx.git
synced 2026-04-04 09:45:06 -02:30
AAP-12516 [option 2] Handle nested workflow artifacts via root node ancestor_artifacts (#16381)
* Add new test for artfact precedence upstream node vs outer workflow * Fix bugs, upstream artifacts come first for precedence * Track nested artifacts path through ancestor_artifacts on root nodes * Fix case where first root node did not get the vars * touchup comment * Prevent conflict with sliced jobs hack
This commit is contained in:
@@ -345,7 +345,11 @@ class WorkflowJobNode(WorkflowNodeBase):
|
|||||||
)
|
)
|
||||||
data.update(accepted_fields) # missing fields are handled in the scheduler
|
data.update(accepted_fields) # missing fields are handled in the scheduler
|
||||||
# build ancestor artifacts, save them to node model for later
|
# build ancestor artifacts, save them to node model for later
|
||||||
aa_dict = {}
|
# initialize from pre-seeded ancestor_artifacts (set on root nodes of
|
||||||
|
# child workflows via seed_root_ancestor_artifacts to carry artifacts
|
||||||
|
# from the parent workflow); exclude job_slice which is internal
|
||||||
|
# metadata handled separately below
|
||||||
|
aa_dict = {k: v for k, v in self.ancestor_artifacts.items() if k != 'job_slice'} if self.ancestor_artifacts else {}
|
||||||
is_root_node = True
|
is_root_node = True
|
||||||
for parent_node in self.get_parent_nodes():
|
for parent_node in self.get_parent_nodes():
|
||||||
is_root_node = False
|
is_root_node = False
|
||||||
@@ -366,11 +370,13 @@ class WorkflowJobNode(WorkflowNodeBase):
|
|||||||
data['survey_passwords'] = password_dict
|
data['survey_passwords'] = password_dict
|
||||||
# process extra_vars
|
# process extra_vars
|
||||||
extra_vars = data.get('extra_vars', {})
|
extra_vars = data.get('extra_vars', {})
|
||||||
if ujt_obj and isinstance(ujt_obj, (JobTemplate, WorkflowJobTemplate)):
|
if ujt_obj and isinstance(ujt_obj, JobTemplate):
|
||||||
if aa_dict:
|
if aa_dict:
|
||||||
functional_aa_dict = copy(aa_dict)
|
functional_aa_dict = copy(aa_dict)
|
||||||
functional_aa_dict.pop('_ansible_no_log', None)
|
functional_aa_dict.pop('_ansible_no_log', None)
|
||||||
extra_vars.update(functional_aa_dict)
|
extra_vars.update(functional_aa_dict)
|
||||||
|
elif ujt_obj and isinstance(ujt_obj, WorkflowJobTemplate):
|
||||||
|
pass # artifacts are applied via seed_root_ancestor_artifacts in the task manager
|
||||||
|
|
||||||
# Workflow Job extra_vars higher precedence than ancestor artifacts
|
# Workflow Job extra_vars higher precedence than ancestor artifacts
|
||||||
extra_vars.update(wj_special_vars)
|
extra_vars.update(wj_special_vars)
|
||||||
@@ -734,6 +740,18 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
|||||||
wj = wj.get_workflow_job()
|
wj = wj.get_workflow_job()
|
||||||
return ancestors
|
return ancestors
|
||||||
|
|
||||||
|
def seed_root_ancestor_artifacts(self, artifacts):
|
||||||
|
"""Apply parent workflow artifacts to root nodes so they propagate
|
||||||
|
through the normal ancestor_artifacts channel instead of being
|
||||||
|
baked into this workflow's extra_vars."""
|
||||||
|
self.workflow_job_nodes.exclude(
|
||||||
|
workflowjobnodes_success__isnull=False,
|
||||||
|
).exclude(
|
||||||
|
workflowjobnodes_failure__isnull=False,
|
||||||
|
).exclude(
|
||||||
|
workflowjobnodes_always__isnull=False,
|
||||||
|
).update(ancestor_artifacts=artifacts)
|
||||||
|
|
||||||
def get_effective_artifacts(self, **kwargs):
|
def get_effective_artifacts(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
For downstream jobs of a workflow nested inside of a workflow,
|
For downstream jobs of a workflow nested inside of a workflow,
|
||||||
|
|||||||
@@ -241,6 +241,8 @@ class WorkflowManager(TaskBase):
|
|||||||
job = spawn_node.unified_job_template.create_unified_job(**kv)
|
job = spawn_node.unified_job_template.create_unified_job(**kv)
|
||||||
spawn_node.job = job
|
spawn_node.job = job
|
||||||
spawn_node.save()
|
spawn_node.save()
|
||||||
|
if spawn_node.ancestor_artifacts and isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
|
||||||
|
job.seed_root_ancestor_artifacts(spawn_node.ancestor_artifacts)
|
||||||
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
|
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
|
||||||
can_start = True
|
can_start = True
|
||||||
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
|
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
|
||||||
|
|||||||
11
awx/main/tests/data/projects/debug/set_stats.yml
Normal file
11
awx/main/tests/data/projects/debug/set_stats.yml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
---
|
||||||
|
- hosts: all
|
||||||
|
gather_facts: false
|
||||||
|
connection: local
|
||||||
|
tasks:
|
||||||
|
- name: Set artifacts via set_stats
|
||||||
|
ansible.builtin.set_stats:
|
||||||
|
data: "{{ stats_data }}"
|
||||||
|
per_host: false
|
||||||
|
aggregate: false
|
||||||
|
when: stats_data is defined
|
||||||
206
awx/main/tests/live/tests/test_nested_workflow_artifacts.py
Normal file
206
awx/main/tests/live/tests/test_nested_workflow_artifacts.py
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from awx.main.tests.live.tests.conftest import wait_for_job
|
||||||
|
|
||||||
|
from awx.main.models import JobTemplate, WorkflowJobTemplate, WorkflowJobTemplateNode
|
||||||
|
|
||||||
|
JT_NAMES = ('artifact-test-first', 'artifact-test-second', 'artifact-test-reader')
|
||||||
|
WFT_NAMES = ('artifact-test-outer-wf', 'artifact-test-inner-wf')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db(transaction=True)
|
||||||
|
def test_nested_workflow_set_stats_precedence(live_tmp_folder, demo_inv, project_factory, default_org):
|
||||||
|
"""Reproducer for set_stats artifacts from an outer workflow leaking into
|
||||||
|
an inner (child) workflow and overriding the inner workflow's own artifacts.
|
||||||
|
|
||||||
|
Outer WF: [job_first] --success--> [inner_wf]
|
||||||
|
Inner WF: [job_second] --success--> [job_reader]
|
||||||
|
|
||||||
|
job_first sets via set_stats:
|
||||||
|
var1: "outer-only" (only source, should propagate through)
|
||||||
|
var2: "should-be-overridden" (will be overridden by job_second)
|
||||||
|
|
||||||
|
job_second sets via set_stats:
|
||||||
|
var2: "from-inner" (should override outer's value)
|
||||||
|
var3: "inner-only" (only source, should be available)
|
||||||
|
|
||||||
|
job_reader runs debug.yml (no set_stats), we inspect its extra_vars:
|
||||||
|
var1 should be "outer-only" - outer artifacts propagate when uncontested
|
||||||
|
var2 should be "from-inner" - inner artifacts override outer (THE BUG)
|
||||||
|
var3 should be "inner-only" - inner-only artifacts propagate normally
|
||||||
|
"""
|
||||||
|
# Clean up resources from prior runs (delete individually for signals)
|
||||||
|
for name in WFT_NAMES:
|
||||||
|
for wft in WorkflowJobTemplate.objects.filter(name=name):
|
||||||
|
wft.delete()
|
||||||
|
for name in JT_NAMES:
|
||||||
|
for jt in JobTemplate.objects.filter(name=name):
|
||||||
|
jt.delete()
|
||||||
|
|
||||||
|
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
|
||||||
|
if proj.current_job:
|
||||||
|
wait_for_job(proj.current_job)
|
||||||
|
|
||||||
|
# job_first: sets var1 (outer-only) and var2 (to be overridden by inner)
|
||||||
|
jt_first = JobTemplate.objects.create(
|
||||||
|
name='artifact-test-first',
|
||||||
|
project=proj,
|
||||||
|
playbook='set_stats.yml',
|
||||||
|
inventory=demo_inv,
|
||||||
|
extra_vars=json.dumps({'stats_data': {'var1': 'outer-only', 'var2': 'should-be-overridden'}}),
|
||||||
|
)
|
||||||
|
# job_second: overrides var2, introduces var3
|
||||||
|
jt_second = JobTemplate.objects.create(
|
||||||
|
name='artifact-test-second',
|
||||||
|
project=proj,
|
||||||
|
playbook='set_stats.yml',
|
||||||
|
inventory=demo_inv,
|
||||||
|
extra_vars=json.dumps({'stats_data': {'var2': 'from-inner', 'var3': 'inner-only'}}),
|
||||||
|
)
|
||||||
|
# job_reader: just runs, we check what extra_vars it receives
|
||||||
|
jt_reader = JobTemplate.objects.create(
|
||||||
|
name='artifact-test-reader',
|
||||||
|
project=proj,
|
||||||
|
playbook='debug.yml',
|
||||||
|
inventory=demo_inv,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Inner WFT: job_second -> job_reader
|
||||||
|
inner_wft = WorkflowJobTemplate.objects.create(name='artifact-test-inner-wf', organization=default_org)
|
||||||
|
inner_node_1 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=inner_wft,
|
||||||
|
unified_job_template=jt_second,
|
||||||
|
identifier='second',
|
||||||
|
)
|
||||||
|
inner_node_2 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=inner_wft,
|
||||||
|
unified_job_template=jt_reader,
|
||||||
|
identifier='reader',
|
||||||
|
)
|
||||||
|
inner_node_1.success_nodes.add(inner_node_2)
|
||||||
|
|
||||||
|
# Outer WFT: job_first -> inner_wf
|
||||||
|
outer_wft = WorkflowJobTemplate.objects.create(name='artifact-test-outer-wf', organization=default_org)
|
||||||
|
outer_node_1 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=outer_wft,
|
||||||
|
unified_job_template=jt_first,
|
||||||
|
identifier='first',
|
||||||
|
)
|
||||||
|
outer_node_2 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=outer_wft,
|
||||||
|
unified_job_template=inner_wft,
|
||||||
|
identifier='inner',
|
||||||
|
)
|
||||||
|
outer_node_1.success_nodes.add(outer_node_2)
|
||||||
|
|
||||||
|
# Launch and wait
|
||||||
|
outer_wfj = outer_wft.create_unified_job()
|
||||||
|
outer_wfj.signal_start()
|
||||||
|
wait_for_job(outer_wfj, running_timeout=120)
|
||||||
|
|
||||||
|
# Find the reader job inside the inner workflow
|
||||||
|
inner_wf_node = outer_wfj.workflow_job_nodes.get(identifier='inner')
|
||||||
|
inner_wfj = inner_wf_node.job
|
||||||
|
assert inner_wfj is not None, 'Inner workflow job was never created'
|
||||||
|
|
||||||
|
# Check that root node of inner WF (job_second) received outer artifacts
|
||||||
|
second_node = inner_wfj.workflow_job_nodes.get(identifier='second')
|
||||||
|
assert second_node.job is not None, 'Second job was never created'
|
||||||
|
second_extra_vars = json.loads(second_node.job.extra_vars)
|
||||||
|
assert second_extra_vars.get('var1') == 'outer-only', (
|
||||||
|
f'Root node var1: expected "outer-only" (outer artifact should be available to root node), '
|
||||||
|
f'got "{second_extra_vars.get("var1")}". '
|
||||||
|
f'Outer artifacts are not reaching root nodes of child workflows.'
|
||||||
|
)
|
||||||
|
|
||||||
|
reader_node = inner_wfj.workflow_job_nodes.get(identifier='reader')
|
||||||
|
assert reader_node.job is not None, 'Reader job was never created'
|
||||||
|
|
||||||
|
reader_extra_vars = json.loads(reader_node.job.extra_vars)
|
||||||
|
|
||||||
|
# var1: only set by outer job_first, no conflict — should propagate through
|
||||||
|
assert reader_extra_vars.get('var1') == 'outer-only', f'var1: expected "outer-only" (uncontested outer artifact), ' f'got "{reader_extra_vars.get("var1")}"'
|
||||||
|
|
||||||
|
# var2: set by outer as "should-be-overridden", then by inner as "from-inner"
|
||||||
|
# Inner workflow's own ancestor artifacts should take precedence
|
||||||
|
assert reader_extra_vars.get('var2') == 'from-inner', (
|
||||||
|
f'var2: expected "from-inner" (inner workflow artifact should override outer), '
|
||||||
|
f'got "{reader_extra_vars.get("var2")}". '
|
||||||
|
f'Outer workflow artifacts are leaking via wj_special_vars. '
|
||||||
|
f'reader node ancestor_artifacts={reader_node.ancestor_artifacts}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# var3: only set by inner job_second — should propagate normally
|
||||||
|
assert reader_extra_vars.get('var3') == 'inner-only', f'var3: expected "inner-only" (inner-only artifact), ' f'got "{reader_extra_vars.get("var3")}"'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db(transaction=True)
|
||||||
|
def test_workflow_extra_vars_override_artifacts(live_tmp_folder, demo_inv, project_factory, default_org):
|
||||||
|
"""Workflow extra_vars should take precedence over set_stats artifacts
|
||||||
|
within a single (non-nested) workflow.
|
||||||
|
|
||||||
|
WF (extra_vars: my_var="from-wf-extra-vars"):
|
||||||
|
[job_setter] --success--> [job_reader]
|
||||||
|
|
||||||
|
job_setter sets my_var="from-set-stats" via set_stats
|
||||||
|
job_reader should see my_var="from-wf-extra-vars" because workflow
|
||||||
|
extra_vars are higher precedence than ancestor artifacts.
|
||||||
|
"""
|
||||||
|
wft_name = 'artifact-test-wf-extra-vars-precedence'
|
||||||
|
jt_names = ('artifact-test-setter', 'artifact-test-checker')
|
||||||
|
|
||||||
|
for wft in WorkflowJobTemplate.objects.filter(name=wft_name):
|
||||||
|
wft.delete()
|
||||||
|
for name in jt_names:
|
||||||
|
for jt in JobTemplate.objects.filter(name=name):
|
||||||
|
jt.delete()
|
||||||
|
|
||||||
|
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
|
||||||
|
if proj.current_job:
|
||||||
|
wait_for_job(proj.current_job)
|
||||||
|
|
||||||
|
jt_setter = JobTemplate.objects.create(
|
||||||
|
name='artifact-test-setter',
|
||||||
|
project=proj,
|
||||||
|
playbook='set_stats.yml',
|
||||||
|
inventory=demo_inv,
|
||||||
|
extra_vars=json.dumps({'stats_data': {'my_var': 'from-set-stats'}}),
|
||||||
|
)
|
||||||
|
jt_checker = JobTemplate.objects.create(
|
||||||
|
name='artifact-test-checker',
|
||||||
|
project=proj,
|
||||||
|
playbook='debug.yml',
|
||||||
|
inventory=demo_inv,
|
||||||
|
)
|
||||||
|
|
||||||
|
wft = WorkflowJobTemplate.objects.create(
|
||||||
|
name=wft_name,
|
||||||
|
organization=default_org,
|
||||||
|
extra_vars=json.dumps({'my_var': 'from-wf-extra-vars'}),
|
||||||
|
)
|
||||||
|
node_1 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=wft,
|
||||||
|
unified_job_template=jt_setter,
|
||||||
|
identifier='setter',
|
||||||
|
)
|
||||||
|
node_2 = WorkflowJobTemplateNode.objects.create(
|
||||||
|
workflow_job_template=wft,
|
||||||
|
unified_job_template=jt_checker,
|
||||||
|
identifier='checker',
|
||||||
|
)
|
||||||
|
node_1.success_nodes.add(node_2)
|
||||||
|
|
||||||
|
wfj = wft.create_unified_job()
|
||||||
|
wfj.signal_start()
|
||||||
|
wait_for_job(wfj, running_timeout=120)
|
||||||
|
|
||||||
|
checker_node = wfj.workflow_job_nodes.get(identifier='checker')
|
||||||
|
assert checker_node.job is not None, 'Checker job was never created'
|
||||||
|
|
||||||
|
checker_extra_vars = json.loads(checker_node.job.extra_vars)
|
||||||
|
assert checker_extra_vars.get('my_var') == 'from-wf-extra-vars', (
|
||||||
|
f'Expected my_var="from-wf-extra-vars" (workflow extra_vars should override artifacts), '
|
||||||
|
f'got my_var="{checker_extra_vars.get("my_var")}". '
|
||||||
|
f'checker node ancestor_artifacts={checker_node.ancestor_artifacts}'
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user