awx/awx/main/tests/functional/test_rbac_workflow.py
jessicamack 2bc08b421d
Bring WFJT job access to parity with UnifiedJobAccess (#15344) (#6935)
* Bring WFJT job access to parity with UnifiedJobAccess

* Run linters

Co-authored-by: Alan Rominger <arominge@redhat.com>
2025-04-30 13:43:47 -04:00

319 lines
15 KiB
Python

import pytest
from awx.main.access import (
UnifiedJobAccess,
WorkflowJobTemplateAccess,
WorkflowJobTemplateNodeAccess,
WorkflowJobAccess,
# WorkflowJobNodeAccess
)
from awx.main.models import JobTemplate, WorkflowJobTemplateNode
from rest_framework.exceptions import PermissionDenied
from awx.main.models import InventorySource, JobLaunchConfig
@pytest.mark.django_db
class TestWorkflowJobTemplateAccess:
def test_random_user_no_edit(self, wfjt, rando):
access = WorkflowJobTemplateAccess(rando)
assert not access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_edit(self, wfjt, org_admin):
access = WorkflowJobTemplateAccess(org_admin)
assert access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_role_inheritance(self, wfjt, org_admin):
assert org_admin in wfjt.admin_role
assert org_admin in wfjt.execute_role
assert org_admin in wfjt.read_role
def test_org_workflow_admin_role_inheritance(self, wfjt, org_member):
wfjt.organization.workflow_admin_role.members.add(org_member)
assert org_member in wfjt.admin_role
assert org_member in wfjt.execute_role
assert org_member in wfjt.read_role
def test_non_super_admin_no_add_without_org(self, wfjt, organization, rando):
organization.member_role.members.add(rando)
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateAccess(rando, save_messages=True)
assert not access.can_add({'name': 'without org'})
assert 'An organization is required to create a workflow job template for normal user' in access.messages['organization']
@pytest.mark.django_db
class TestWorkflowJobTemplateNodeAccess:
def test_no_jt_access_to_edit(self, wfjt_node, rando):
# without access to the related job template, admin to the WFJT can
# not change the prompted parameters
wfjt_node.workflow_job_template.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
assert not access.can_change(wfjt_node, {'job_type': 'check'})
def test_node_edit_allowed(self, wfjt_node, org_admin):
wfjt_node.unified_job_template.admin_role.members.add(org_admin)
access = WorkflowJobTemplateNodeAccess(org_admin)
assert access.can_change(wfjt_node, {'job_type': 'check'})
def test_access_to_edit_non_JT(self, rando, workflow_job_template, organization, project):
workflow_job_template.admin_role.members.add(rando)
node = workflow_job_template.workflow_job_template_nodes.create(unified_job_template=project)
assert not WorkflowJobTemplateNodeAccess(rando).can_change(node, {'limit': ''})
project.update_role.members.add(rando)
assert WorkflowJobTemplateNodeAccess(rando).can_change(node, {'limit': ''})
def test_add_JT_no_start_perm(self, wfjt, job_template, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
job_template.read_role.members.add(rando)
assert not access.can_add({'workflow_job_template': wfjt, 'unified_job_template': job_template})
def test_change_JT_no_start_perm(self, wfjt, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
jt1 = JobTemplate.objects.create()
jt1.execute_role.members.add(rando)
assert access.can_add({'workflow_job_template': wfjt, 'unified_job_template': jt1})
node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt1)
jt2 = JobTemplate.objects.create()
assert not access.can_change(node, {'unified_job_template': jt2.id})
def test_add_node_with_minimum_permissions(self, wfjt, job_template, inventory, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
job_template.execute_role.members.add(rando)
inventory.use_role.members.add(rando)
assert access.can_add({'workflow_job_template': wfjt, 'inventory': inventory, 'unified_job_template': job_template})
def test_remove_unwanted_foreign_node(self, wfjt_node, job_template, rando):
wfjt = wfjt_node.workflow_job_template
wfjt.admin_role.members.add(rando)
wfjt_node.unified_job_template = job_template
access = WorkflowJobTemplateNodeAccess(rando)
assert access.can_delete(wfjt_node)
@pytest.mark.parametrize(
"add_wfjt_admin, add_jt_admin, permission_type, expected_result, method_type",
[
(True, False, 'credentials', False, 'can_attach'),
(True, True, 'credentials', True, 'can_attach'),
(True, False, 'labels', False, 'can_attach'),
(True, True, 'labels', True, 'can_attach'),
(True, False, 'instance_groups', False, 'can_attach'),
(True, True, 'instance_groups', True, 'can_attach'),
(True, False, 'credentials', False, 'can_unattach'),
(True, True, 'credentials', True, 'can_unattach'),
(True, False, 'labels', False, 'can_unattach'),
(True, True, 'labels', True, 'can_unattach'),
(True, False, 'instance_groups', False, 'can_unattach'),
(True, True, 'instance_groups', True, 'can_unattach'),
],
)
def test_attacher_permissions(self, wfjt_node, job_template, rando, add_wfjt_admin, permission_type, add_jt_admin, expected_result, mocker, method_type):
wfjt = wfjt_node.workflow_job_template
if add_wfjt_admin:
wfjt.admin_role.members.add(rando)
wfjt.unified_job_template = job_template
if add_jt_admin:
job_template.execute_role.members.add(rando)
from awx.main.models import Credential, Label, InstanceGroup, Organization, CredentialType
if permission_type == 'credentials':
sub_obj = Credential.objects.create(credential_type=CredentialType.objects.create())
sub_obj.use_role.members.add(rando)
elif permission_type == 'labels':
sub_obj = Label.objects.create(organization=Organization.objects.create())
sub_obj.organization.member_role.members.add(rando)
elif permission_type == 'instance_groups':
sub_obj = InstanceGroup.objects.create()
org = Organization.objects.create()
sub_obj.use_role.members.add(rando) # only admins can see IGs
org.instance_groups.add(sub_obj)
access = WorkflowJobTemplateNodeAccess(rando)
if method_type == 'can_unattach':
assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type) == expected_result
else:
assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type, {}) == expected_result
# The actual attachment of labels, credentials and instance groups are tested from JobLaunchConfigAccess
@pytest.mark.parametrize(
"attachment_type, expect_exception, method_type",
[
("credentials", False, 'can_attach'),
("labels", False, 'can_attach'),
("instance_groups", False, 'can_attach'),
("success_nodes", False, 'can_attach'),
("failure_nodes", False, 'can_attach'),
("always_nodes", False, 'can_attach'),
("junk", True, 'can_attach'),
("credentials", False, 'can_unattach'),
("labels", False, 'can_unattach'),
("instance_groups", False, 'can_unattach'),
("success_nodes", False, 'can_unattach'),
("failure_nodes", False, 'can_unattach'),
("always_nodes", False, 'can_unattach'),
("junk", True, 'can_unattach'),
],
)
def test_attacher_raise_not_implemented(self, wfjt_node, rando, attachment_type, expect_exception, method_type):
wfjt = wfjt_node.workflow_job_template
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
if expect_exception:
with pytest.raises(NotImplementedError):
access.can_attach(wfjt_node, None, attachment_type, None)
else:
try:
getattr(access, method_type)(wfjt_node, None, attachment_type, None)
except NotImplementedError:
# We explicitly catch NotImplemented because the _nodes type will raise a different exception
assert False, "Exception was raised when it should not have been"
except Exception:
# File "/awx_devel/awx/main/access.py", line 2074, in check_same_WFJT
# raise Exception('Attaching workflow nodes only allowed for other nodes')
pass
# TODO: Implement additional tests for _nodes attachments here
@pytest.mark.django_db
class TestWorkflowJobAccess:
@pytest.mark.parametrize("role_name", ["admin_role", "workflow_admin_role"])
def test_org_admin_can_delete_workflow_job(self, role_name, workflow_job, org_member):
role = getattr(workflow_job.workflow_job_template.organization, role_name)
role.members.add(org_member)
access = WorkflowJobAccess(org_member)
assert access.can_delete(workflow_job)
def test_wfjt_admin_can_delete_workflow_job(self, workflow_job, rando):
workflow_job.workflow_job_template.admin_role.members.add(rando)
access = WorkflowJobAccess(rando)
assert not access.can_delete(workflow_job)
def test_cancel_your_own_job(self, wfjt, workflow_job, rando):
wfjt.execute_role.members.add(rando)
workflow_job.created_by = rando
workflow_job.save()
access = WorkflowJobAccess(rando)
assert access.can_cancel(workflow_job)
def test_admin_cancel_access(self, wfjt, workflow_job, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobAccess(rando)
assert access.can_cancel(workflow_job)
def test_execute_role_relaunch(self, wfjt, workflow_job, rando):
wfjt.execute_role.members.add(rando)
JobLaunchConfig.objects.create(job=workflow_job)
assert WorkflowJobAccess(rando).can_start(workflow_job)
def test_can_start_with_limits(self, workflow_job, inventory, admin_user):
inventory.organization.max_hosts = 1
inventory.organization.save()
inventory.hosts.create(name="Existing host 1")
inventory.hosts.create(name="Existing host 2")
workflow_job.inventory = inventory
workflow_job.save()
assert WorkflowJobAccess(admin_user).can_start(workflow_job)
def test_cannot_relaunch_friends_job(self, wfjt, rando, alice):
workflow_job = wfjt.workflow_jobs.create(name='foo', created_by=alice)
JobLaunchConfig.objects.create(job=workflow_job, extra_data={'foo': 'fooforyou'})
wfjt.execute_role.members.add(alice)
assert not WorkflowJobAccess(rando).can_start(workflow_job)
def test_relaunch_inventory_access(self, workflow_job, inventory, rando):
wfjt = workflow_job.workflow_job_template
wfjt.execute_role.members.add(rando)
assert rando in wfjt.execute_role
workflow_job.created_by = rando
workflow_job.inventory = inventory
workflow_job.save()
wfjt.ask_inventory_on_launch = True
wfjt.save()
JobLaunchConfig.objects.create(job=workflow_job, inventory=inventory)
with pytest.raises(PermissionDenied):
WorkflowJobAccess(rando).can_start(workflow_job)
inventory.use_role.members.add(rando)
assert WorkflowJobAccess(rando).can_start(workflow_job)
@pytest.mark.parametrize('org_role', ['admin_role', 'auditor_role'])
def test_workflow_job_org_audit_access(self, workflow_job_template, rando, org_role):
assert workflow_job_template.organization # sanity
workflow_job = workflow_job_template.create_unified_job()
assert workflow_job.organization # sanity
assert not UnifiedJobAccess(rando).can_read(workflow_job)
assert not WorkflowJobAccess(rando).can_read(workflow_job)
assert workflow_job not in WorkflowJobAccess(rando).filtered_queryset()
org = workflow_job.organization
role = getattr(org, org_role)
role.members.add(rando)
assert UnifiedJobAccess(rando).can_read(workflow_job)
assert WorkflowJobAccess(rando).can_read(workflow_job)
assert workflow_job in WorkflowJobAccess(rando).filtered_queryset()
# Organization-level permissions should persist after deleting the WFJT
workflow_job_template.delete()
assert UnifiedJobAccess(rando).can_read(workflow_job)
assert WorkflowJobAccess(rando).can_read(workflow_job)
assert workflow_job in WorkflowJobAccess(rando).filtered_queryset()
@pytest.mark.django_db
class TestWFJTCopyAccess:
def test_copy_permissions_org_admin(self, wfjt, org_admin, org_member):
admin_access = WorkflowJobTemplateAccess(org_admin)
assert admin_access.can_copy(wfjt)
wfjt.organization.workflow_admin_role.members.add(org_member)
admin_access = WorkflowJobTemplateAccess(org_member)
assert admin_access.can_copy(wfjt)
def test_copy_permissions_user(self, wfjt, org_admin, org_member):
"""
Only org admins and org workflow admins are able to add WFJTs, only org admins
are able to copy them
"""
wfjt.admin_role.members.add(org_member)
member_access = WorkflowJobTemplateAccess(org_member)
assert not member_access.can_copy(wfjt)
def test_workflow_copy_warnings_inv(self, wfjt, rando, inventory):
"""
The user `rando` does not have access to the prompted inventory in a
node inside the workflow - test surfacing this information
"""
wfjt.workflow_job_template_nodes.create(inventory=inventory)
access = WorkflowJobTemplateAccess(rando, save_messages=True)
assert not access.can_copy(wfjt)
warnings = access.messages
assert 'inventories_unable_to_copy' in warnings
def test_workflow_copy_no_start(self, wfjt, inventory, admin_user):
# Test that un-startable resource doesn't block copy
inv_src = InventorySource.objects.create(inventory=inventory, source='file')
assert not inv_src.can_update
wfjt.workflow_job_template_nodes.create(unified_job_template=inv_src)
access = WorkflowJobTemplateAccess(admin_user, save_messages=True)
access.can_copy(wfjt)
assert not access.messages
def test_workflow_copy_warnings_jt(self, wfjt, rando, job_template):
wfjt.workflow_job_template_nodes.create(unified_job_template=job_template)
access = WorkflowJobTemplateAccess(rando, save_messages=True)
assert not access.can_copy(wfjt)
warnings = access.messages
assert 'templates_unable_to_copy' in warnings