diff --git a/awx/main/access.py b/awx/main/access.py index 2be67f0849..a0dd395bda 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -275,8 +275,10 @@ class BaseAccess(object): access_method = getattr(self, "can_%s" % method) if method in ['change']: # 3 args user_capabilities[display_method] = access_method(obj, data) - elif method in ['delete', 'start', 'run_ad_hoc_commands']: # 2 args + elif method in ['delete', 'run_ad_hoc_commands']: user_capabilities[display_method] = access_method(obj) + elif method in ['start']: + user_capabilities[display_method] = access_method(obj, validate_license=False) elif method in ['add']: # 2 args with data user_capabilities[display_method] = access_method(data) elif method in ['attach', 'unattach']: # parent/sub-object call @@ -609,10 +611,10 @@ class GroupAccess(BaseAccess): "active_jobs": active_jobs}) return True - def can_start(self, obj): + def can_start(self, obj, validate_license=True): # Used as another alias to inventory_source start access for user_capabilities if obj and obj.inventory_source: - return self.user.can_access(InventorySource, 'start', obj.inventory_source) + return self.user.can_access(InventorySource, 'start', obj.inventory_source, validate_license=validate_license) return False class InventorySourceAccess(BaseAccess): @@ -651,7 +653,7 @@ class InventorySourceAccess(BaseAccess): else: return False - def can_start(self, obj): + def can_start(self, obj, validate_license=True): if obj and obj.group: return obj.can_update and self.user in obj.group.inventory.update_role elif obj and obj.inventory: @@ -683,11 +685,11 @@ class InventoryUpdateAccess(BaseAccess): # Inventory cascade deletes to inventory update, descends from org admin return self.user in obj.inventory_source.inventory.admin_role - def can_start(self, obj): + def can_start(self, obj, validate_license=True): # For relaunching if obj and obj.inventory_source: access = InventorySourceAccess(self.user) - return access.can_start(obj.inventory_source) + return access.can_start(obj.inventory_source, validate_license=validate_license) return False @check_superuser @@ -882,7 +884,7 @@ class ProjectAccess(BaseAccess): return True @check_superuser - def can_start(self, obj): + def can_start(self, obj, validate_license=True): return obj and self.user in obj.update_role class ProjectUpdateAccess(BaseAccess): @@ -911,7 +913,7 @@ class ProjectUpdateAccess(BaseAccess): # Project updates cascade delete with project, admin role descends from org admin return self.user in obj.project.admin_role - def can_start(self, obj): + def can_start(self, obj, validate_license=True): # for relaunching if obj and obj.project: return self.user in obj.project.update_role @@ -1192,8 +1194,9 @@ class JobAccess(BaseAccess): return True return False - def can_start(self, obj): - self.check_license() + def can_start(self, obj, validate_license=True): + if validate_license: + self.check_license() # A super user can relaunch a job if self.user.is_superuser: @@ -1227,7 +1230,7 @@ class SystemJobTemplateAccess(BaseAccess): model = SystemJobTemplate - def can_start(self, obj): + def can_start(self, obj, validate_license=True): return self.can_read(obj) class SystemJobAccess(BaseAccess): @@ -1236,7 +1239,7 @@ class SystemJobAccess(BaseAccess): ''' model = SystemJob - def can_start(self, obj): + def can_start(self, obj, validate_license=True): return False # no relaunching of system jobs # TODO: @@ -1424,11 +1427,12 @@ class AdHocCommandAccess(BaseAccess): inventory_qs = Inventory.accessible_objects(self.user, 'read_role') return qs.filter(inventory__in=inventory_qs) - def can_add(self, data): + def can_add(self, data, validate_license=True): if not data: # So the browseable API will work return True - self.check_license() + if validate_license: + self.check_license() # If a credential is provided, the user should have use access to it. credential_pk = get_pk_from_dict(data, 'credential') @@ -1454,11 +1458,11 @@ class AdHocCommandAccess(BaseAccess): def can_delete(self, obj): return obj.inventory is not None and self.user in obj.inventory.organization.admin_role - def can_start(self, obj): + def can_start(self, obj, validate_license=True): return self.can_add({ 'credential': obj.credential_id, 'inventory': obj.inventory_id, - }) + }, validate_license=validate_license) def can_cancel(self, obj): if not obj.can_cancel: @@ -1749,7 +1753,7 @@ class NotificationTemplateAccess(BaseAccess): return self.can_change(obj, None) @check_superuser - def can_start(self, obj): + def can_start(self, obj, validate_license=True): if obj.organization is None: return False return self.user in obj.organization.admin_role diff --git a/awx/main/tests/functional/api/test_rbac_displays.py b/awx/main/tests/functional/api/test_rbac_displays.py index 45b4a8f832..c92a7190ff 100644 --- a/awx/main/tests/functional/api/test_rbac_displays.py +++ b/awx/main/tests/functional/api/test_rbac_displays.py @@ -14,7 +14,7 @@ from awx.api.serializers import JobTemplateSerializer # This file covers special-cases of displays of user_capabilities # general functionality should be covered fully by unit tests, see: -# awx/main/tests/unit/api/test_serializers.py :: +# awx/main/tests/unit/api/serializers/test_job_template_serializers.py :: # TestJobTemplateSerializerGetSummaryFields.test_copy_edit_standard # awx/main/tests/unit/test_access.py :: # test_user_capabilities_method @@ -327,3 +327,12 @@ def test_group_update_capabilities_impossible(group, inventory_source, admin_use capabilities = get_user_capabilities(admin_user, group, method_list=['start']) assert not capabilities['start'] +@pytest.mark.django_db +def test_license_check_not_called(mocker, job_template, project, org_admin, get): + job_template.project = project + job_template.save() # need this to make the JT visible + mock_license_check = mocker.MagicMock() + with mocker.patch('awx.main.access.BaseAccess.check_license', mock_license_check): + get(reverse('api:job_template_detail', args=[job_template.pk]), org_admin, expect=200) + assert not mock_license_check.called +