diff --git a/awx/api/metadata.py b/awx/api/metadata.py index c5421ff86a..920bddd1ff 100644 --- a/awx/api/metadata.py +++ b/awx/api/metadata.py @@ -234,17 +234,17 @@ class RoleMetadata(Metadata): # TODO: Tower 3.3 remove class and all uses in views.py when API v1 is removed class JobTypeMetadata(Metadata): - def get_field_info(self, field): - res = super(JobTypeMetadata, self).get_field_info(field) + def get_field_info(self, field): + res = super(JobTypeMetadata, self).get_field_info(field) - if field.field_name == 'job_type': - index = 0 - for choice in res['choices']: - if choice[0] == 'scan': - res['choices'].pop(index) - break - index += 1 - return res + if field.field_name == 'job_type': + index = 0 + for choice in res['choices']: + if choice[0] == 'scan': + res['choices'].pop(index) + break + index += 1 + return res class SublistAttachDetatchMetadata(Metadata): diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 262eea1ff9..fb90808e40 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -523,7 +523,7 @@ class AuthView(APIView): not feature_enabled('ldap')) or \ (not feature_enabled('enterprise_auth') and name in ['saml', 'radius']): - continue + continue login_url = reverse('social:begin', args=(name,)) complete_url = request.build_absolute_uri(reverse('social:complete', args=(name,))) diff --git a/awx/main/managers.py b/awx/main/managers.py index f31a532572..341a7e806a 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -38,20 +38,20 @@ class HostManager(models.Manager): hasattr(self.instance, 'host_filter') and hasattr(self.instance, 'kind')): if self.instance.kind == 'smart' and self.instance.host_filter is not None: - q = SmartFilter.query_from_string(self.instance.host_filter) - if self.instance.organization_id: - q = q.filter(inventory__organization=self.instance.organization_id) - # If we are using host_filters, disable the core_filters, this allows - # us to access all of the available Host entries, not just the ones associated - # with a specific FK/relation. - # - # If we don't disable this, a filter of {'inventory': self.instance} gets automatically - # injected by the related object mapper. - self.core_filters = {} + q = SmartFilter.query_from_string(self.instance.host_filter) + if self.instance.organization_id: + q = q.filter(inventory__organization=self.instance.organization_id) + # If we are using host_filters, disable the core_filters, this allows + # us to access all of the available Host entries, not just the ones associated + # with a specific FK/relation. + # + # If we don't disable this, a filter of {'inventory': self.instance} gets automatically + # injected by the related object mapper. + self.core_filters = {} - qs = qs & q - unique_by_name = qs.order_by('name', 'pk').distinct('name') - return qs.filter(pk__in=unique_by_name) + qs = qs & q + unique_by_name = qs.order_by('name', 'pk').distinct('name') + return qs.filter(pk__in=unique_by_name) return qs diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index b8f2e6ce2d..ca3ee97ce8 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -204,7 +204,7 @@ class Role(models.Model): value = description.get('default') if '%s' in value and content_type: - value = value % model_name + value = value % model_name return value diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index db90fa8aa9..a71e4004a9 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -357,10 +357,10 @@ class TaskManager(): return False def get_latest_project_update(self, job): - latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") - if not latest_project_update.exists(): - return None - return latest_project_update.first() + latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") + if not latest_project_update.exists(): + return None + return latest_project_update.first() def should_update_related_project(self, job, latest_project_update): now = tz_now() diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 22ac1ff111..b459241f55 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -82,14 +82,14 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g @pytest.mark.django_db def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker): - wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template - wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) - wfj.status = "pending" - wfj.save() - with mocker.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(wfj, None, [], None) - assert wfj.instance_group is None + wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template + wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) + wfj.status = "pending" + wfj.save() + with mocker.patch("awx.main.scheduler.TaskManager.start_task"): + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(wfj, None, [], None) + assert wfj.instance_group is None @pytest.mark.django_db diff --git a/awx/main/tests/functional/test_rbac_oauth.py b/awx/main/tests/functional/test_rbac_oauth.py index eb6ba6b63d..e45c3ddedc 100644 --- a/awx/main/tests/functional/test_rbac_oauth.py +++ b/awx/main/tests/functional/test_rbac_oauth.py @@ -16,117 +16,117 @@ from awx.api.versioning import reverse @pytest.mark.django_db class TestOAuth2Application: - @pytest.mark.parametrize("user_for_access, can_access_list", [ - (0, [True, True]), - (1, [True, True]), - (2, [True, True]), - (3, [False, False]), - ]) - def test_can_read( - self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization - ): - user_list = [admin, org_admin, org_member, alice] - access = OAuth2ApplicationAccess(user_list[user_for_access]) - app_creation_user_list = [admin, org_admin] - for user, can_access in zip(app_creation_user_list, can_access_list): - app = Application.objects.create( - name='test app for {}'.format(user.username), user=user, - client_type='confidential', authorization_grant_type='password', organization=organization - ) - assert access.can_read(app) is can_access - - def test_admin_only_can_read(self, user, organization): - user = user('org-admin', False) - organization.admin_role.members.add(user) - access = OAuth2ApplicationAccess(user) + @pytest.mark.parametrize("user_for_access, can_access_list", [ + (0, [True, True]), + (1, [True, True]), + (2, [True, True]), + (3, [False, False]), + ]) + def test_can_read( + self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization + ): + user_list = [admin, org_admin, org_member, alice] + access = OAuth2ApplicationAccess(user_list[user_for_access]) + app_creation_user_list = [admin, org_admin] + for user, can_access in zip(app_creation_user_list, can_access_list): app = Application.objects.create( name='test app for {}'.format(user.username), user=user, client_type='confidential', authorization_grant_type='password', organization=organization ) - assert access.can_read(app) is True + assert access.can_read(app) is can_access - def test_app_activity_stream(self, org_admin, alice, organization): + def test_admin_only_can_read(self, user, organization): + user = user('org-admin', False) + organization.admin_role.members.add(user) + access = OAuth2ApplicationAccess(user) + app = Application.objects.create( + name='test app for {}'.format(user.username), user=user, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + assert access.can_read(app) is True + + def test_app_activity_stream(self, org_admin, alice, organization): + app = Application.objects.create( + name='test app for {}'.format(org_admin.username), user=org_admin, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + access = OAuth2ApplicationAccess(org_admin) + assert access.can_read(app) is True + access = ActivityStreamAccess(org_admin) + activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk') + assert access.can_read(activity_stream) is True + access = ActivityStreamAccess(alice) + assert access.can_read(app) is False + assert access.can_read(activity_stream) is False + + + def test_token_activity_stream(self, org_admin, alice, organization, post): + app = Application.objects.create( + name='test app for {}'.format(org_admin.username), user=org_admin, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + response = post( + reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), + {'scope': 'read'}, org_admin, expect=201 + ) + token = AccessToken.objects.get(token=response.data['token']) + access = OAuth2ApplicationAccess(org_admin) + assert access.can_read(app) is True + access = ActivityStreamAccess(org_admin) + activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk') + assert access.can_read(activity_stream) is True + access = ActivityStreamAccess(alice) + assert access.can_read(token) is False + assert access.can_read(activity_stream) is False + + + + def test_can_edit_delete_app_org_admin( + self, admin, org_admin, org_member, alice, organization + ): + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, True, False, False] + for user, can_access in zip(user_list, can_access_list): app = Application.objects.create( - name='test app for {}'.format(org_admin.username), user=org_admin, + name='test app for {}'.format(user.username), user=org_admin, client_type='confidential', authorization_grant_type='password', organization=organization ) - access = OAuth2ApplicationAccess(org_admin) - assert access.can_read(app) is True - access = ActivityStreamAccess(org_admin) - activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk') - assert access.can_read(activity_stream) is True - access = ActivityStreamAccess(alice) - assert access.can_read(app) is False - assert access.can_read(activity_stream) is False + access = OAuth2ApplicationAccess(user) + assert access.can_change(app, {}) is can_access + assert access.can_delete(app) is can_access - - def test_token_activity_stream(self, org_admin, alice, organization, post): + + def test_can_edit_delete_app_admin( + self, admin, org_admin, org_member, alice, organization + ): + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, True, False, False] + for user, can_access in zip(user_list, can_access_list): app = Application.objects.create( - name='test app for {}'.format(org_admin.username), user=org_admin, + name='test app for {}'.format(user.username), user=admin, client_type='confidential', authorization_grant_type='password', organization=organization ) - response = post( - reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), - {'scope': 'read'}, org_admin, expect=201 - ) - token = AccessToken.objects.get(token=response.data['token']) - access = OAuth2ApplicationAccess(org_admin) - assert access.can_read(app) is True - access = ActivityStreamAccess(org_admin) - activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk') - assert access.can_read(activity_stream) is True - access = ActivityStreamAccess(alice) - assert access.can_read(token) is False - assert access.can_read(activity_stream) is False - - - - def test_can_edit_delete_app_org_admin( - self, admin, org_admin, org_member, alice, organization - ): - user_list = [admin, org_admin, org_member, alice] - can_access_list = [True, True, False, False] - for user, can_access in zip(user_list, can_access_list): - app = Application.objects.create( - name='test app for {}'.format(user.username), user=org_admin, - client_type='confidential', authorization_grant_type='password', organization=organization - ) - access = OAuth2ApplicationAccess(user) - assert access.can_change(app, {}) is can_access - assert access.can_delete(app) is can_access - - - def test_can_edit_delete_app_admin( - self, admin, org_admin, org_member, alice, organization - ): - user_list = [admin, org_admin, org_member, alice] - can_access_list = [True, True, False, False] - for user, can_access in zip(user_list, can_access_list): - app = Application.objects.create( - name='test app for {}'.format(user.username), user=admin, - client_type='confidential', authorization_grant_type='password', organization=organization - ) - access = OAuth2ApplicationAccess(user) - assert access.can_change(app, {}) is can_access - assert access.can_delete(app) is can_access - + access = OAuth2ApplicationAccess(user) + assert access.can_change(app, {}) is can_access + assert access.can_delete(app) is can_access - def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization): - access = OAuth2ApplicationAccess(admin) + + def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization): + access = OAuth2ApplicationAccess(admin) + for user in [admin, org_admin, org_member, alice]: + assert access.can_add({ + 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', + 'authorization_grant_type': 'password', 'organization': organization.id + }) + + def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization): + for access_user in [org_member, alice]: + access = OAuth2ApplicationAccess(access_user) for user in [admin, org_admin, org_member, alice]: - assert access.can_add({ + assert not access.can_add({ 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', 'authorization_grant_type': 'password', 'organization': organization.id }) - - def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization): - for access_user in [org_member, alice]: - access = OAuth2ApplicationAccess(access_user) - for user in [admin, org_admin, org_member, alice]: - assert not access.can_add({ - 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', - 'authorization_grant_type': 'password', 'organization': organization.id - }) @pytest.mark.django_db