Add tags and skip_tags option to awx.awx.workflow_launch (#15011)

Signed-off-by: Tom Page <tpage@redhat.com>
This commit is contained in:
Tom Page 2024-04-03 20:29:43 +01:00 committed by GitHub
parent 3edaaebba2
commit c061f59f1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 1 deletions

View File

@ -39,6 +39,16 @@ options:
description:
- Limit to use for the I(job_template).
type: str
tags:
description:
- Specific tags to apply from the I(job_template).
type: list
elements: str
skip_tags:
description:
- Specific tags to skip from the I(job_template).
type: list
elements: str
scm_branch:
description:
- A specific branch of the SCM project to run the template on.
@ -100,6 +110,8 @@ def main():
organization=dict(),
inventory=dict(),
limit=dict(),
tags=dict(type='list', elements='str'),
skip_tags=dict(type='list', elements='str'),
scm_branch=dict(),
extra_vars=dict(type='dict'),
wait=dict(required=False, default=True, type='bool'),
@ -128,6 +140,14 @@ def main():
if field_val is not None:
optional_args[field_name] = field_val
# Special treatment of tags parameters
job_tags = module.params.get('tags')
if job_tags is not None:
optional_args['job_tags'] = ",".join(job_tags)
skip_tags = module.params.get('skip_tags')
if skip_tags is not None:
optional_args['skip_tags'] = ",".join(skip_tags)
# Create a datastructure to pass into our job launch
post_data = {}
for arg_name, arg_value in optional_args.items():
@ -152,6 +172,8 @@ def main():
check_vars_to_prompts = {
'inventory': 'ask_inventory_on_launch',
'limit': 'ask_limit_on_launch',
'job_tags': 'ask_tags_on_launch',
'skip_tags': 'ask_skip_tags_on_launch',
'scm_branch': 'ask_scm_branch_on_launch',
}

View File

@ -4,7 +4,7 @@ __metaclass__ = type
import pytest
from awx.main.models import WorkflowJobTemplate, NotificationTemplate
from awx.main.models import WorkflowJobTemplate, WorkflowJob, NotificationTemplate
@pytest.mark.django_db
@ -135,6 +135,37 @@ def test_associate_only_on_success(run_module, admin_user, organization, project
assert list(wfjt.notification_templates_error.values_list('id', flat=True)) == [nt1.id]
@pytest.mark.django_db
def test_workflow_launch_with_prompting(run_module, admin_user, organization, inventory):
WorkflowJobTemplate.objects.create(
name='foo-workflow-launch-test',
organization=organization,
ask_variables_on_launch=True,
ask_inventory_on_launch=True,
ask_tags_on_launch=True,
ask_skip_tags_on_launch=True,
)
result = run_module(
'workflow_launch',
dict(
name='foo-workflow-launch-test',
inventory=inventory.name,
wait=False,
extra_vars={"var1": "My First Variable", "var2": "My Second Variable", "var3": "My Third Variable"},
tags=["my_tag"],
skip_tags=["your_tag", "their_tag"],
),
admin_user,
)
assert result.get('changed', True), result
job = WorkflowJob.objects.get(id=result['id'])
assert job.extra_vars == '{"var1": "My First Variable", "var2": "My Second Variable", "var3": "My Third Variable"}'
assert job.inventory == inventory
assert job.job_tags == "my_tag"
assert job.skip_tags == "your_tag,their_tag"
@pytest.mark.django_db
def test_delete_with_spec(run_module, admin_user, organization, survey_spec):
WorkflowJobTemplate.objects.create(organization=organization, name='foo-workflow', survey_enabled=True, survey_spec=survey_spec)