AWX Collections for DAB RBAC

Adds new modules for CRUD operations on the
following endpoints:

- api/v2/role_definitions
- api/v2/role_user_assignments
- api/v2/role_team_assignments

Note: assignment is Create or Delete only

Additional changes:
- Currently DAB endpoints do not have "type"
field on the resource list items. So this modifies
the create_or_update_if_needed to allow manually
specifying item type.

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster 2024-04-02 15:26:07 -04:00 committed by Alan Rominger
parent 389a729b75
commit 3bb559dd09
13 changed files with 807 additions and 6 deletions

View File

@ -68,6 +68,7 @@ Notable releases of the `awx.awx` collection:
- 7.0.0 is intended to be identical to the content prior to the migration, aside from changes necessary to function as a collection.
- 11.0.0 has no non-deprecated modules that depend on the deprecated `tower-cli` [PyPI](https://pypi.org/project/ansible-tower-cli/).
- 19.2.1 large renaming purged "tower" names (like options and module names), adding redirects for old names
- 21.11.0 "tower" modules deprecated and symlinks removed.
- X.X.X added support of named URLs to all modules. Anywhere that previously accepted name or id can also support named URLs
- 0.0.1-devel is the version you should see if installing from source, which is intended for development and expected to be unstable.
@ -112,7 +113,7 @@ Ansible source, set up a dedicated virtual environment:
```
mkvirtualenv my_new_venv
# may need to replace psycopg2 with psycopg2-binary in requirements/requirements.txt
# may need to replace psycopg3 with psycopg3-binary in requirements/requirements.txt
pip install -r requirements/requirements.txt -r requirements/requirements_dev.txt -r requirements/requirements_git.txt
make clean-api
pip install -e <path to your Ansible>

View File

@ -35,6 +35,9 @@ action_groups:
- project
- project_update
- role
- role_definition
- role_team_assignment
- role_user_assignment
- schedule
- settings
- subscriptions

View File

@ -652,7 +652,7 @@ class ControllerAPIModule(ControllerModule):
# If we have neither of these, then we can try un-authenticated access
self.authenticated = True
def delete_if_needed(self, existing_item, on_delete=None, auto_exit=True):
def delete_if_needed(self, existing_item, item_type=None, on_delete=None, auto_exit=True):
# This will exit from the module on its own.
# If the method successfully deletes an item and on_delete param is defined,
# the on_delete parameter will be called as a method pasing in this object and the json from the response
@ -664,8 +664,9 @@ class ControllerAPIModule(ControllerModule):
# If we have an item, we can try to delete it
try:
item_url = existing_item['url']
item_type = existing_item['type']
item_id = existing_item['id']
if not item_type:
item_type = existing_item['type']
item_name = self.get_item_name(existing_item, allow_unknown=True)
except KeyError as ke:
self.fail_json(msg="Unable to process delete of item due to missing data {0}".format(ke))
@ -907,7 +908,7 @@ class ControllerAPIModule(ControllerModule):
return True
return False
def update_if_needed(self, existing_item, new_item, on_update=None, auto_exit=True, associations=None):
def update_if_needed(self, existing_item, new_item, item_type=None, on_update=None, auto_exit=True, associations=None):
# This will exit from the module on its own
# If the method successfully updates an item and on_update param is defined,
# the on_update parameter will be called as a method pasing in this object and the json from the response
@ -921,7 +922,8 @@ class ControllerAPIModule(ControllerModule):
# If we have an item, we can see if it needs an update
try:
item_url = existing_item['url']
item_type = existing_item['type']
if not item_type:
item_type = existing_item['type']
if item_type == 'user':
item_name = existing_item['username']
elif item_type == 'workflow_job_template_node':
@ -990,7 +992,7 @@ class ControllerAPIModule(ControllerModule):
new_item.pop(key)
if existing_item:
return self.update_if_needed(existing_item, new_item, on_update=on_update, auto_exit=auto_exit, associations=associations)
return self.update_if_needed(existing_item, new_item, item_type=item_type, on_update=on_update, auto_exit=auto_exit, associations=associations)
else:
return self.create_if_needed(
existing_item, new_item, endpoint, on_create=on_create, item_type=item_type, auto_exit=auto_exit, associations=associations

View File

@ -0,0 +1,114 @@
#!/usr/bin/python
# coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], 'supported_by': 'community'}
DOCUMENTATION = '''
---
module: role_definition
author: "Seth Foster (@fosterseth)"
short_description: Add role definition to Automation Platform Controller
description:
- Contains a list of permissions and a resource type that can then be assigned to users or teams.
options:
name:
description:
- Name of this role definition.
required: True
type: str
permissions:
description:
- List of permissions to include in the role definition.
required: True
type: list
elements: str
content_type:
description:
- The type of resource this applies to.
required: True
type: str
description:
description:
- Optional description of this role definition.
type: str
state:
description:
- The desired state of the role definition.
default: present
choices:
- present
- absent
type: str
extends_documentation_fragment: awx.awx.auth
'''
EXAMPLES = '''
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to view and execute jt
state: present
'''
from ..module_utils.controller_api import ControllerAPIModule
def main():
# Any additional arguments that are not fields of the item can be added here
argument_spec = dict(
name=dict(required=True, type='str'),
permissions=dict(required=True, type='list', elements='str'),
content_type=dict(required=True, type='str'),
description=dict(required=False, type='str'),
state=dict(default='present', choices=['present', 'absent']),
)
module = ControllerAPIModule(argument_spec=argument_spec)
name = module.params.get('name')
permissions = module.params.get('permissions')
content_type = module.params.get('content_type')
description = module.params.get('description')
state = module.params.get('state')
if description is None:
description = ''
role_definition = module.get_one('role_definitions', name_or_id=name)
if state == 'absent':
module.delete_if_needed(
role_definition,
item_type='role_definition',
)
post_kwargs = {
'name': name,
'permissions': permissions,
'content_type': content_type,
'description': description
}
if state == 'present':
module.create_or_update_if_needed(
role_definition,
post_kwargs,
endpoint='role_definitions',
item_type='role_definition',
)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,123 @@
#!/usr/bin/python
# coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], 'supported_by': 'community'}
DOCUMENTATION = '''
---
module: role_team_assignment
author: "Seth Foster (@fosterseth)"
short_description: Gives a team permission to a resource or an organization.
description:
- Use this endpoint to give a team permission to a resource or an organization.
- After creation, the assignment cannot be edited, but can be deleted to remove those permissions.
options:
role_definition:
description:
- The name or id of the role definition to assign to the team.
required: True
type: str
object_id:
description:
- Primary key of the object this assignment applies to.
required: True
type: int
team:
description:
- The name or id of the team to assign to the object.
required: False
type: str
object_ansible_id:
description:
- Resource id of the object this role applies to. Alternative to the object_id field.
required: False
type: int
team_ansible_id:
description:
- Resource id of the team who will receive permissions from this assignment. Alternative to team field.
required: False
type: int
state:
description:
- The desired state of the role definition.
default: present
choices:
- present
- absent
type: str
extends_documentation_fragment: awx.awx.auth
'''
EXAMPLES = '''
- name: Give Team A JT permissions
role_team_assignment:
role_definition: launch JT
object_id: 1
team: Team A
state: present
'''
from ..module_utils.controller_api import ControllerAPIModule
def main():
# Any additional arguments that are not fields of the item can be added here
argument_spec = dict(
team=dict(required=False, type='str'),
object_id=dict(required=True, type='int'),
role_definition=dict(required=True, type='str'),
object_ansible_id=dict(required=False, type='int'),
team_ansible_id=dict(required=False, type='int'),
state=dict(default='present', choices=['present', 'absent']),
)
module = ControllerAPIModule(argument_spec=argument_spec)
team = module.params.get('team')
object_id = module.params.get('object_id')
role_definition_str = module.params.get('role_definition')
object_ansible_id = module.params.get('object_ansible_id')
team_ansible_id = module.params.get('team_ansible_id')
state = module.params.get('state')
role_definition = module.get_one('role_definitions', allow_none=False, name_or_id=role_definition_str)
team = module.get_one('teams', allow_none=False, name_or_id=team)
kwargs = {
'role_definition': role_definition['id'],
'object_id': object_id,
'team': team['id'],
'object_ansible_id': object_ansible_id,
'team_ansible_id': team_ansible_id,
}
# get rid of None type values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
role_team_assignment = module.get_one('role_team_assignments', **{'data': kwargs})
if state == 'absent':
module.delete_if_needed(
role_team_assignment,
item_type='role_team_assignment',
)
if state == 'present':
module.create_if_needed(
role_team_assignment,
kwargs,
endpoint='role_team_assignments',
item_type='role_team_assignment',
)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,124 @@
#!/usr/bin/python
# coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], 'supported_by': 'community'}
DOCUMENTATION = '''
---
module: role_user_assignment
author: "Seth Foster (@fosterseth)"
short_description: Gives a user permission to a resource or an organization.
description:
- Use this endpoint to give a user permission to a resource or an organization.
- After creation, the assignment cannot be edited, but can be deleted to remove those permissions.
options:
role_definition:
description:
- The name or id of the role definition to assign to the user.
required: True
type: str
object_id:
description:
- Primary key of the object this assignment applies to.
required: True
type: int
user:
description:
- The name or id of the user to assign to the object.
required: False
type: str
object_ansible_id:
description:
- Resource id of the object this role applies to. Alternative to the object_id field.
required: False
type: int
user_ansible_id:
description:
- Resource id of the user who will receive permissions from this assignment. Alternative to user field.
required: False
type: int
state:
description:
- The desired state of the role definition.
default: present
choices:
- present
- absent
type: str
extends_documentation_fragment: awx.awx.auth
'''
EXAMPLES = '''
- name: Give Bob JT permissions
role_user_assignment:
role_definition: launch JT
object_id: 1
user: bob
state: present
'''
from ..module_utils.controller_api import ControllerAPIModule
def main():
# Any additional arguments that are not fields of the item can be added here
argument_spec = dict(
user=dict(required=False, type='str'),
object_id=dict(required=True, type='int'),
role_definition=dict(required=True, type='str'),
object_ansible_id=dict(required=False, type='int'),
user_ansible_id=dict(required=False, type='int'),
state=dict(default='present', choices=['present', 'absent']),
)
module = ControllerAPIModule(argument_spec=argument_spec)
user = module.params.get('user')
object_id = module.params.get('object_id')
role_definition_str = module.params.get('role_definition')
object_ansible_id = module.params.get('object_ansible_id')
user_ansible_id = module.params.get('user_ansible_id')
state = module.params.get('state')
role_definition = module.get_one('role_definitions', allow_none=False, name_or_id=role_definition_str)
user = module.get_one('users', allow_none=False, name_or_id=user)
kwargs = {
'role_definition': role_definition['id'],
'object_id': object_id,
'user': user['id'],
'object_ansible_id': object_ansible_id,
'user_ansible_id': user_ansible_id,
}
# get rid of None type values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
role_user_assignment = module.get_one('role_user_assignments', **{'data': kwargs})
if state == 'absent':
module.delete_if_needed(
role_user_assignment,
item_type='role_user_assignment',
)
if state == 'present':
module.create_if_needed(
role_user_assignment,
kwargs,
endpoint='role_user_assignments',
item_type='role_user_assignment',
)
if __name__ == '__main__':
main()

View File

@ -17,6 +17,7 @@ import pytest
from ansible.module_utils.six import raise_from
from ansible_base.rbac.models import RoleDefinition, DABPermission
from awx.main.tests.functional.conftest import _request
from awx.main.tests.functional.conftest import credentialtype_scm, credentialtype_ssh # noqa: F401; pylint: disable=unused-variable
from awx.main.models import (
@ -31,9 +32,11 @@ from awx.main.models import (
WorkflowJobTemplate,
NotificationTemplate,
Schedule,
Team,
)
from django.db import transaction
from django.contrib.contenttypes.models import ContentType
HAS_TOWER_CLI = False
@ -258,6 +261,11 @@ def job_template(project, inventory):
return JobTemplate.objects.create(name='test-jt', project=project, inventory=inventory, playbook='helloworld.yml')
@pytest.fixture
def team(organization):
return Team.objects.create(name='test-team', organization=organization)
@pytest.fixture
def machine_credential(credentialtype_ssh, organization): # noqa: F811
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred', inputs={'username': 'test_user', 'password': 'pas4word'})
@ -331,6 +339,15 @@ def notification_template(organization):
)
@pytest.fixture
def job_template_role_definition():
rd = RoleDefinition.objects.create(name='test_view_jt', content_type=ContentType.objects.get_for_model(JobTemplate))
permission_codenames = ['view_jobtemplate', 'execute_jobtemplate']
permissions = DABPermission.objects.filter(codename__in=permission_codenames)
rd.permissions.add(*permissions)
return rd
@pytest.fixture
def scm_credential(credentialtype_scm, organization): # noqa: F811
return Credential.objects.create(

View File

@ -0,0 +1,122 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_base.rbac.models import RoleDefinition
@pytest.mark.django_db
def test_create_new(run_module, admin_user):
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert result['changed']
role_definition = RoleDefinition.objects.get(name='test_view_jt')
assert role_definition
permission_codenames = [p.codename for p in role_definition.permissions.all()]
assert set(permission_codenames) == set(['view_jobtemplate', 'execute_jobtemplate'])
assert role_definition.content_type.model == 'jobtemplate'
@pytest.mark.django_db
def test_update_existing(run_module, admin_user):
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert result['changed']
role_definition = RoleDefinition.objects.get(name='test_view_jt')
permission_codenames = [p.codename for p in role_definition.permissions.all()]
assert set(permission_codenames) == set(['view_jobtemplate'])
assert role_definition.content_type.model == 'jobtemplate'
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert result['changed']
role_definition.refresh_from_db()
permission_codenames = [p.codename for p in role_definition.permissions.all()]
assert set(permission_codenames) == set(['view_jobtemplate', 'execute_jobtemplate'])
assert role_definition.content_type.model == 'jobtemplate'
@pytest.mark.django_db
def test_delete_existing(run_module, admin_user):
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert result['changed']
role_definition = RoleDefinition.objects.get(name='test_view_jt')
assert role_definition
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
'state': 'absent',
},
admin_user)
assert result['changed']
with pytest.raises(RoleDefinition.DoesNotExist):
role_definition.refresh_from_db()
@pytest.mark.django_db
def test_idempotence(run_module, admin_user):
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert result['changed']
result = run_module(
'role_definition',
{
'name': 'test_view_jt',
'permissions': ['awx.view_jobtemplate', 'awx.execute_jobtemplate'],
'content_type': 'awx.jobtemplate',
},
admin_user)
assert not result['changed']
role_definition = RoleDefinition.objects.get(name='test_view_jt')
permission_codenames = [p.codename for p in role_definition.permissions.all()]
assert set(permission_codenames) == set(['view_jobtemplate', 'execute_jobtemplate'])

View File

@ -0,0 +1,70 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_base.rbac.models import RoleTeamAssignment
@pytest.mark.django_db
def test_create_new(run_module, admin_user, team, job_template, job_template_role_definition):
result = run_module(
'role_team_assignment',
{
'team': team.name,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
assert RoleTeamAssignment.objects.filter(team=team, object_id=job_template.id, role_definition=job_template_role_definition).exists()
@pytest.mark.django_db
def test_idempotence(run_module, admin_user, team, job_template, job_template_role_definition):
result = run_module(
'role_team_assignment',
{
'team': team.name,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
result = run_module(
'role_team_assignment',
{
'team': team.name,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert not result['changed']
@pytest.mark.django_db
def test_delete_existing(run_module, admin_user, team, job_template, job_template_role_definition):
result = run_module(
'role_team_assignment',
{
'team': team.name,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
assert RoleTeamAssignment.objects.filter(team=team, object_id=job_template.id, role_definition=job_template_role_definition).exists()
result = run_module(
'role_team_assignment',
{
'team': team.name,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
'state': 'absent'
},
admin_user)
assert result['changed']
assert not RoleTeamAssignment.objects.filter(team=team, object_id=job_template.id, role_definition=job_template_role_definition).exists()

View File

@ -0,0 +1,70 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_base.rbac.models import RoleUserAssignment
@pytest.mark.django_db
def test_create_new(run_module, admin_user, job_template, job_template_role_definition):
result = run_module(
'role_user_assignment',
{
'user': admin_user.username,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
assert RoleUserAssignment.objects.filter(user=admin_user, object_id=job_template.id, role_definition=job_template_role_definition).exists()
@pytest.mark.django_db
def test_idempotence(run_module, admin_user, job_template, job_template_role_definition):
result = run_module(
'role_user_assignment',
{
'user': admin_user.username,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
result = run_module(
'role_user_assignment',
{
'user': admin_user.username,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert not result['changed']
@pytest.mark.django_db
def test_delete_existing(run_module, admin_user, job_template, job_template_role_definition):
result = run_module(
'role_user_assignment',
{
'user': admin_user.username,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
},
admin_user)
assert result['changed']
assert RoleUserAssignment.objects.filter(user=admin_user, object_id=job_template.id, role_definition=job_template_role_definition).exists()
result = run_module(
'role_user_assignment',
{
'user': admin_user.username,
'object_id': job_template.id,
'role_definition': job_template_role_definition.name,
'state': 'absent'
},
admin_user)
assert result['changed']
assert not RoleUserAssignment.objects.filter(user=admin_user, object_id=job_template.id, role_definition=job_template_role_definition).exists()

View File

@ -0,0 +1,30 @@
---
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
state: present
register: result
- assert:
that:
- result is changed
- name: Delete Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
state: absent
register: result
- assert:
that:
- result is changed

View File

@ -0,0 +1,62 @@
---
- name: Create Team
team:
name: All Stars
organization: Default
- name: Create Job Template
job_template:
name: Demo Job Template
job_type: run
inventory: Demo Inventory
project: Demo Project
playbook: hello_world.yml
register: job_template
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
- name: Create Role Team Assignment
role_team_assignment:
role_definition: test_view_jt
team: All Stars
object_id: "{{ job_template.id }}"
register: result
- assert:
that:
- result is changed
- name: Delete Role Team Assigment
role_team_assignment:
role_definition: test_view_jt
team: All Stars
object_id: "{{ job_template.id }}"
state: absent
register: result
- assert:
that:
- result is changed
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
state: absent
- name: Delete Team
team:
name: All Stars
organization: Default
state: absent

View File

@ -0,0 +1,63 @@
---
- name: Create User
user:
username: testing_user
first_name: testing
last_name: user
password: password
- name: Create Job Template
job_template:
name: Demo Job Template
job_type: run
inventory: Demo Inventory
project: Demo Project
playbook: hello_world.yml
register: job_template
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
- name: Create Role User Assignment
role_user_assignment:
role_definition: test_view_jt
user: testing_user
object_id: "{{ job_template.id }}"
register: result
- assert:
that:
- result is changed
- name: Delete Role User Assigment
role_user_assignment:
role_definition: test_view_jt
user: testing_user
object_id: "{{ job_template.id }}"
state: absent
register: result
- assert:
that:
- result is changed
- name: Create Role Definition
role_definition:
name: test_view_jt
permissions:
- awx.view_jobtemplate
- awx.execute_jobtemplate
content_type: awx.jobtemplate
description: role definition to launch job
state: absent
- name: Delete User
user:
username: testing_user
state: absent