mirror of
https://github.com/ansible/awx.git
synced 2026-03-06 03:01:06 -03:30
Consolidate related field access checks into BaseAccess method
This commit is contained in:
@@ -192,6 +192,69 @@ class BaseAccess(object):
|
|||||||
def can_unattach(self, obj, sub_obj, relationship, data=None):
|
def can_unattach(self, obj, sub_obj, relationship, data=None):
|
||||||
return self.can_change(obj, data)
|
return self.can_change(obj, data)
|
||||||
|
|
||||||
|
def check_related(self, field, Model, data, role_field='admin_role',
|
||||||
|
obj=None, fail_on_missing=True, mandatory=False):
|
||||||
|
'''
|
||||||
|
Check permission for related field, in scenarios:
|
||||||
|
- creating a new resource, user must have permission to the
|
||||||
|
resource specified in `data`
|
||||||
|
- editing an existing resource, user must have permission to resource
|
||||||
|
in `data`, as well as existing related resource on `obj`
|
||||||
|
If `obj` is not provided, we assume this is a new resource
|
||||||
|
If obj.field is null, this does not block the action
|
||||||
|
If field is not given, return the inverse of `mandatory`
|
||||||
|
'''
|
||||||
|
new = None
|
||||||
|
new_pk = None
|
||||||
|
if data and 'reference_obj' in data:
|
||||||
|
# Use reference object's related fields, if given
|
||||||
|
new = getattr(data['reference_obj'], field)
|
||||||
|
new_pk = getattr(new, 'pk', None)
|
||||||
|
elif data and field in data:
|
||||||
|
# Obtain the resource specified in `data`
|
||||||
|
raw_value = data[field]
|
||||||
|
try:
|
||||||
|
new_pk = int(raw_value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
if type(raw_value) == Model:
|
||||||
|
new = raw_value
|
||||||
|
new_pk = getattr(new, 'pk', None)
|
||||||
|
|
||||||
|
# Obtain existing related resource
|
||||||
|
current = None
|
||||||
|
current_pk = None
|
||||||
|
if obj:
|
||||||
|
current = getattr(obj, field)
|
||||||
|
current_pk = getattr(obj, '%s_id' % field, None)
|
||||||
|
|
||||||
|
# Define special case where resource is not changed
|
||||||
|
changed = True
|
||||||
|
if (new_pk is None or new_pk == current_pk) and (new is None or new == current):
|
||||||
|
changed = False
|
||||||
|
|
||||||
|
# Get the new resource from the database
|
||||||
|
if new is None and new_pk is not None:
|
||||||
|
if fail_on_missing:
|
||||||
|
new = get_object_or_400(Model, pk=new_pk)
|
||||||
|
else:
|
||||||
|
new = Model.objects.filter(pk=new_pk).first()
|
||||||
|
|
||||||
|
if not new and not current and mandatory:
|
||||||
|
return self.user.is_superuser
|
||||||
|
|
||||||
|
if new and changed:
|
||||||
|
new_role = getattr(new, role_field)
|
||||||
|
if self.user not in new_role:
|
||||||
|
return False # User lacks access to provided resource
|
||||||
|
|
||||||
|
if current and (changed or mandatory):
|
||||||
|
current_role = getattr(current, role_field)
|
||||||
|
if self.user not in current_role:
|
||||||
|
return False # User lacks access to existing resource
|
||||||
|
|
||||||
|
# User has access to both, permission check passed
|
||||||
|
return True
|
||||||
|
|
||||||
def check_license(self, add_host=False, feature=None, check_expiration=True):
|
def check_license(self, add_host=False, feature=None, check_expiration=True):
|
||||||
validation_info = TaskEnhancer().validate_enhancements()
|
validation_info = TaskEnhancer().validate_enhancements()
|
||||||
if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''):
|
if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''):
|
||||||
@@ -461,9 +524,7 @@ class InventoryAccess(BaseAccess):
|
|||||||
if not data:
|
if not data:
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data)
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
@@ -473,11 +534,8 @@ class InventoryAccess(BaseAccess):
|
|||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
# Verify that the user has access to the new organization if moving an
|
# Verify that the user has access to the new organization if moving an
|
||||||
# inventory to a new organization.
|
# inventory to a new organization.
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
if not self.check_related('organization', Organization, data, obj=obj):
|
||||||
if obj and org_pk and obj.organization.pk != org_pk:
|
return False
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user not in org.admin_role:
|
|
||||||
return False
|
|
||||||
# Otherwise, just check for admin permission.
|
# Otherwise, just check for admin permission.
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
@@ -523,9 +581,7 @@ class HostAccess(BaseAccess):
|
|||||||
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
||||||
|
|
||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
if not self.check_related('inventory', Inventory, data):
|
||||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
|
||||||
if self.user not in inventory.admin_role:
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Check to see if we have enough licenses
|
# Check to see if we have enough licenses
|
||||||
@@ -574,9 +630,7 @@ class GroupAccess(BaseAccess):
|
|||||||
if not data or 'inventory' not in data:
|
if not data or 'inventory' not in data:
|
||||||
return False
|
return False
|
||||||
# Checks for admin or change permission on inventory.
|
# Checks for admin or change permission on inventory.
|
||||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
return self.check_related('inventory', Inventory, data)
|
||||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
|
||||||
return self.user in inventory.admin_role
|
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a group to a different inventory.
|
# Prevent moving a group to a different inventory.
|
||||||
@@ -795,11 +849,7 @@ class TeamAccess(BaseAccess):
|
|||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data: # So the browseable API will work
|
if not data: # So the browseable API will work
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data)
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user in org.admin_role:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a team to a different organization.
|
# Prevent moving a team to a different organization.
|
||||||
@@ -866,17 +916,12 @@ class ProjectAccess(BaseAccess):
|
|||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data: # So the browseable API will work
|
if not data: # So the browseable API will work
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
organization_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data, mandatory=True)
|
||||||
org = get_object_or_400(Organization, pk=organization_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
if not self.check_related('organization', Organization, data):
|
||||||
if obj and org_pk and obj.organization and obj.organization.pk != org_pk:
|
return False
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user not in org.admin_role:
|
|
||||||
return False
|
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@@ -974,7 +1019,7 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
Inventory.accessible_objects(self.user, 'use_role').exists())
|
Inventory.accessible_objects(self.user, 'use_role').exists())
|
||||||
|
|
||||||
# if reference_obj is provided, determine if it can be coppied
|
# if reference_obj is provided, determine if it can be coppied
|
||||||
reference_obj = data.pop('reference_obj', None)
|
reference_obj = data.get('reference_obj', None)
|
||||||
|
|
||||||
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
|
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
|
||||||
self.check_license(feature='system_tracking')
|
self.check_license(feature='system_tracking')
|
||||||
@@ -996,22 +1041,16 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
# If a credential is provided, the user should have use access to it.
|
# If a credential is provided, the user should have use access to it.
|
||||||
credential = get_value(Credential, 'credential')
|
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
||||||
if credential:
|
return False
|
||||||
if self.user not in credential.use_role:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# If a cloud credential is provided, the user should have use access.
|
# If a cloud credential is provided, the user should have use access.
|
||||||
cloud_credential = get_value(Credential, 'cloud_credential')
|
if not self.check_related('cloud_credential', Credential, data, role_field='use_role'):
|
||||||
if cloud_credential:
|
return False
|
||||||
if self.user not in cloud_credential.use_role:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# If a network credential is provided, the user should have use access.
|
# If a network credential is provided, the user should have use access.
|
||||||
network_credential = get_value(Credential, 'network_credential')
|
if not self.check_related('network_credential', Credential, data, role_field='use_role'):
|
||||||
if network_credential:
|
return False
|
||||||
if self.user not in network_credential.use_role:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# If an inventory is provided, the user should have use access.
|
# If an inventory is provided, the user should have use access.
|
||||||
inventory = get_value(Inventory, 'inventory')
|
inventory = get_value(Inventory, 'inventory')
|
||||||
@@ -1327,28 +1366,17 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
|
|||||||
return qs
|
return qs
|
||||||
|
|
||||||
def can_use_prompted_resources(self, data):
|
def can_use_prompted_resources(self, data):
|
||||||
cred_pk = data.get('credential', None)
|
if not self.check_related('credential', Credential, data):
|
||||||
inv_pk = data.get('inventory', None)
|
return False
|
||||||
if cred_pk:
|
if not self.check_related('inventory', Inventory, data):
|
||||||
credential = get_object_or_400(Credential, pk=cred_pk)
|
return False
|
||||||
if self.user not in credential.use_role:
|
|
||||||
return False
|
|
||||||
if inv_pk:
|
|
||||||
inventory = get_object_or_400(Inventory, pk=inv_pk)
|
|
||||||
if self.user not in inventory.use_role:
|
|
||||||
return False
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data: # So the browseable API will work
|
if not data: # So the browseable API will work
|
||||||
return True
|
return True
|
||||||
wfjt_pk = data.get('workflow_job_template', None)
|
if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False):
|
||||||
if wfjt_pk:
|
|
||||||
wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk)
|
|
||||||
if self.user not in wfjt.admin_role:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return False
|
return False
|
||||||
if not self.can_use_prompted_resources(data):
|
if not self.can_use_prompted_resources(data):
|
||||||
return False
|
return False
|
||||||
@@ -1451,6 +1479,7 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
|||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
return self.user in obj.read_role
|
return self.user in obj.read_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
'''
|
'''
|
||||||
a user can create a job template if they are a superuser, an org admin
|
a user can create a job template if they are a superuser, an org admin
|
||||||
@@ -1491,13 +1520,7 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
|||||||
# if 'survey_enabled' in data and data['survey_enabled']:
|
# if 'survey_enabled' in data and data['survey_enabled']:
|
||||||
# self.check_license(feature='surveys')
|
# self.check_license(feature='surveys')
|
||||||
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data)
|
||||||
if not org_pk:
|
|
||||||
# only superusers can create or manage orphan WFJTs
|
|
||||||
return self.user.is_superuser
|
|
||||||
|
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
def can_start(self, obj, validate_license=True):
|
def can_start(self, obj, validate_license=True):
|
||||||
if validate_license:
|
if validate_license:
|
||||||
@@ -1520,21 +1543,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
|||||||
|
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return True
|
return True
|
||||||
if data is None:
|
|
||||||
return self.user in obj.admin_role
|
|
||||||
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role
|
||||||
if ('organization' not in data or
|
|
||||||
(org_pk is None and obj.organization is None) or
|
|
||||||
(obj.organization and obj.organization.pk == org_pk)):
|
|
||||||
# No organization changes
|
|
||||||
return self.user in obj.admin_role
|
|
||||||
|
|
||||||
# If it already has an organization set, must be admin of the org to change it
|
|
||||||
if obj.organization and self.user not in obj.organization.admin_role:
|
|
||||||
return False
|
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
|
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
|
||||||
@@ -1616,19 +1626,13 @@ class AdHocCommandAccess(BaseAccess):
|
|||||||
self.check_license()
|
self.check_license()
|
||||||
|
|
||||||
# If a credential is provided, the user should have use access to it.
|
# If a credential is provided, the user should have use access to it.
|
||||||
credential_pk = get_pk_from_dict(data, 'credential')
|
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
||||||
if credential_pk:
|
return False
|
||||||
credential = get_object_or_400(Credential, pk=credential_pk)
|
|
||||||
if self.user not in credential.use_role:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check that the user has the run ad hoc command permission on the
|
# Check that the user has the run ad hoc command permission on the
|
||||||
# given inventory.
|
# given inventory.
|
||||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
if not self.check_related('inventory', Inventory, data, role_field='adhoc_role'):
|
||||||
if inventory_pk:
|
return False
|
||||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
|
||||||
if self.user not in inventory.adhoc_role:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -1911,21 +1915,14 @@ class NotificationTemplateAccess(BaseAccess):
|
|||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data:
|
if not data:
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data, mandatory=True)
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if obj.organization is None:
|
if obj.organization is None:
|
||||||
# only superusers are allowed to edit orphan notification templates
|
# only superusers are allowed to edit orphan notification templates
|
||||||
return False
|
return False
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data, obj=obj, mandatory=True)
|
||||||
if obj and org_pk and obj.organization.pk != org_pk:
|
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user not in org.admin_role:
|
|
||||||
return False
|
|
||||||
return self.user in obj.organization.admin_role
|
|
||||||
|
|
||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
return self.can_change(obj, data)
|
return self.can_change(obj, data)
|
||||||
@@ -2087,18 +2084,11 @@ class CustomInventoryScriptAccess(BaseAccess):
|
|||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if not data: # So the browseable API will work
|
if not data: # So the browseable API will work
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data)
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
return self.user in org.admin_role
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_admin(self, obj, data=None):
|
def can_admin(self, obj, data=None):
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
return self.check_related('organization', Organization, data, obj=obj)
|
||||||
if obj and org_pk and obj.organization and obj.organization.pk != org_pk:
|
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user not in org.admin_role:
|
|
||||||
return False
|
|
||||||
return self.user in obj.admin_role
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ def test_create_project_null_organization(post, organization, admin):
|
|||||||
|
|
||||||
@pytest.mark.django_db()
|
@pytest.mark.django_db()
|
||||||
def test_create_project_null_organization_xfail(post, organization, org_admin):
|
def test_create_project_null_organization_xfail(post, organization, org_admin):
|
||||||
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=400)
|
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403)
|
||||||
|
|
||||||
@pytest.mark.django_db()
|
@pytest.mark.django_db()
|
||||||
def test_patch_project_null_organization(patch, organization, project, admin):
|
def test_patch_project_null_organization(patch, organization, project, admin):
|
||||||
|
|||||||
Reference in New Issue
Block a user