diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 1458ca1c45..cb128bd0dc 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -8,7 +8,7 @@ import logging from django.http import Http404 # Django REST Framework -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework import permissions # AWX @@ -45,7 +45,10 @@ class ModelAccessPermission(permissions.BasePermission): def check_post_permissions(self, request, view, obj=None): if hasattr(view, 'parent_model'): - get_object_or_400(view.parent_model, pk=view.kwargs['pk']) + parent_obj = get_object_or_400(view.parent_model, pk=view.kwargs['pk']) + if not check_user_access(request.user, view.parent_model, 'read', + parent_obj): + return False return True elif getattr(view, 'is_job_start', False): if not obj: @@ -108,6 +111,11 @@ class ModelAccessPermission(permissions.BasePermission): if request.user.is_superuser: return True + # Check if view supports the request method before checking permission + # based on request method. + if request.method.upper() not in view.allowed_methods: + raise MethodNotAllowed(request.method) + # Check permissions for the given view and object, based on the request # method used. check_method = getattr(self, 'check_%s_permissions' % request.method.lower(), None) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index b34a38617f..5a826eeeae 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1525,15 +1525,13 @@ class AdHocCommandSerializer(UnifiedJobSerializer): if not (obj and obj.pk) and view and hasattr(view, '_raw_data_form_marker'): if not obj: obj = self.opts.model() - if parent_model in (Host, Group): - parent_obj = parent_model.objects.get(pk=view.kwargs['pk']) - obj.limit = parent_obj.name ret = super(AdHocCommandSerializer, self).to_native(obj) - # Hide inventory field from raw data, since it will be set automatically - # by sub list create view. + # Hide inventory and limit fields from raw data, since they will be set + # automatically by sub list create view. if not (obj and obj.pk) and view and hasattr(view, '_raw_data_form_marker'): if parent_model in (Host, Group): ret.pop('inventory', None) + ret.pop('limit', None) return ret diff --git a/awx/api/views.py b/awx/api/views.py index 7bbb6d8885..b842a77d93 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2189,7 +2189,7 @@ class AdHocCommandList(ListCreateAPIView): new_in_220 = True def create(self, request, *args, **kwargs): - # Inject inventory ID if parent objects is a host/group. + # Inject inventory ID and limit if parent objects is a host/group. if hasattr(self, 'get_parent_object') and not getattr(self, 'parent_key', None): data = request.DATA # HACK: Make request data mutable. @@ -2198,6 +2198,7 @@ class AdHocCommandList(ListCreateAPIView): parent_obj = self.get_parent_object() if isinstance(parent_obj, (Host, Group)): data['inventory'] = parent_obj.inventory_id + data['limit'] = parent_obj.name # Check for passwords needed before creating ad hoc command. credential_pk = get_pk_from_dict(request.DATA, 'credential') @@ -2351,6 +2352,8 @@ class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList): # Post allowed for ad hoc event callback only. def post(self, request, *args, **kwargs): + if request.user: + raise PermissionDenied() parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk']) data = request.DATA.copy() data['ad_hoc_command'] = parent_obj.pk diff --git a/awx/main/access.py b/awx/main/access.py index 738506b442..5245750574 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1525,6 +1525,10 @@ class ActivityStreamAccess(BaseAccess): job_qs = self.user.get_queryset(Job) qs.filter(job__in=job_qs) + # Ad Hoc Command Filter + ad_hoc_command_qs = self.user.get_queryset(AdHocCommand) + qs.filter(ad_hoc_command__in=ad_hoc_command_qs) + # organization_qs = self.user.get_queryset(Organization) # user_qs = self.user.get_queryset(User) # inventory_qs = self.user.get_queryset(Inventory) diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index e67b898bcd..2ca5316e25 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -207,7 +207,7 @@ class AdHocCommand(UnifiedJob): def save(self, *args, **kwargs): update_fields = kwargs.get('update_fields', []) if not self.name: - self.name = Truncator(u'%s: %s' % (self.module_name, self.module_args)).chars(512) + self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512) if 'name' not in update_fields: update_fields.append('name') super(AdHocCommand, self).save(*args, **kwargs) diff --git a/awx/main/signals.py b/awx/main/signals.py index b3e99a647d..9265d0024e 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -293,17 +293,20 @@ def disable_activity_stream(): activity_stream_enabled.enabled = previous_value -model_serializer_mapping = {Organization: OrganizationSerializer, - Inventory: InventorySerializer, - Host: HostSerializer, - Group: GroupSerializer, - InventorySource: InventorySourceSerializer, - Credential: CredentialSerializer, - Team: TeamSerializer, - Project: ProjectSerializer, - Permission: PermissionSerializer, - JobTemplate: JobTemplateSerializer, - Job: JobSerializer} +model_serializer_mapping = { + Organization: OrganizationSerializer, + Inventory: InventorySerializer, + Host: HostSerializer, + Group: GroupSerializer, + InventorySource: InventorySourceSerializer, + Credential: CredentialSerializer, + Team: TeamSerializer, + Project: ProjectSerializer, + Permission: PermissionSerializer, + JobTemplate: JobTemplateSerializer, + Job: JobSerializer, + AdHocCommand: AdHocCommandSerializer, +} def activity_stream_create(sender, instance, created, **kwargs): if created and activity_stream_enabled: diff --git a/awx/main/tests/ad_hoc.py b/awx/main/tests/ad_hoc.py index 2061627df1..723b833656 100644 --- a/awx/main/tests/ad_hoc.py +++ b/awx/main/tests/ad_hoc.py @@ -40,6 +40,8 @@ class BaseAdHocCommandTest(BaseJobExecutionTest): self.group2 = self.inventory.groups.create(name='test-group2') self.group.hosts.add(self.host) self.group2.hosts.add(self.host, self.host2) + self.inventory2 = self.organization.inventories.create(name='test-inventory2') + self.host3 = self.inventory2.hosts.create(name='host3.example.com') self.credential = None settings.INTERNAL_API_URL = self.live_server_url settings.CALLBACK_CONSUMER_PORT = '' @@ -410,6 +412,22 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): super(AdHocCommandApiTest, self).setUp() self.create_test_credential(user=self.normal_django_user) + def run_test_ad_hoc_command(self, **kwargs): + # Post to list to start a new ad hoc command. + expect = kwargs.pop('expect', 201) + url = kwargs.pop('url', reverse('api:ad_hoc_command_list')) + data = { + 'inventory': self.inventory.pk, + 'credential': self.credential.pk, + 'module_name': 'command', + 'module_args': 'uptime', + } + data.update(kwargs) + for k,v in data.items(): + if v is None: + del data[k] + return self.post(url, data, expect=expect) + def test_ad_hoc_command_list(self): url = reverse('api:ad_hoc_command_list') @@ -421,16 +439,10 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): self.check_get_list(url, 'nobody', qs) self.check_get_list(url, None, qs, expect=401) - # Post to list to start a new ad hoc command. Only admin and normal - # user (org admin) can run commands by default. - data = { - 'inventory': self.inventory.pk, - 'credential': self.credential.pk, - 'module_name': 'command', - 'module_args': 'uptime', - } + # Start a new ad hoc command. Only admin and normal user (org admin) + # can run commands by default. with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command() self.assertEqual(response['job_type'], 'run') self.assertEqual(response['inventory'], self.inventory.pk) self.assertEqual(response['credential'], self.credential.pk) @@ -440,28 +452,28 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): self.assertEqual(response['forks'], 0) self.assertEqual(response['verbosity'], 0) self.assertEqual(response['privilege_escalation'], '') - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('normal'): - response = self.post(url, data, expect=201) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.run_test_ad_hoc_command() + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('other'): - response = self.post(url, data, expect=403) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.run_test_ad_hoc_command(expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('nobody'): - response = self.post(url, data, expect=403) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.run_test_ad_hoc_command(expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user(None): - response = self.post(url, data, expect=401) - self.put(url, data, expect=401) - self.patch(url, data, expect=401) + self.run_test_ad_hoc_command(expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) self.delete(url, expect=401) # Retrieve the list of ad hoc commands (only admin/normal can see by default). @@ -486,7 +498,7 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): response = self.post(user_perm_url, user_perm_data, expect=201) user_perm_id = response['id'] with self.current_user('other'): - response = self.post(url, data, expect=403) + self.run_test_ad_hoc_command(expect=403) self.check_get_list(url, 'other', qs) # Update permission to allow other user to run ad hoc commands. Fails @@ -499,15 +511,13 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): with self.current_user('admin'): response = self.patch(user_perm_url, user_perm_data, expect=200) with self.current_user('other'): - response = self.post(url, data, expect=403) + self.run_test_ad_hoc_command(expect=403) # Succeeds once other user has a readable credential. Other user can # only see his own ad hoc command (because of credential permissions). other_cred = self.create_test_credential(user=self.other_django_user) - credential_id = data.pop('credential') - data['credential'] = other_cred.pk with self.current_user('other'): - response = self.post(url, data, expect=201) + self.run_test_ad_hoc_command(credential=other_cred.pk) qs = AdHocCommand.objects.filter(created_by=self.other_django_user) self.assertEqual(qs.count(), 1) self.check_get_list(url, 'other', qs) @@ -523,17 +533,15 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): response = self.post(user_perm_url, user_perm_data, expect=201) user_perm_id = response['id'] with self.current_user('nobody'): - response = self.post(url, data, expect=403) + self.run_test_ad_hoc_command(credential=other_cred.pk, expect=403) self.check_get_list(url, 'other', qs) # Create a cred for the nobody user, run an ad hoc command as the admin # user with that cred. Nobody user can still not see the ad hoc command # without the run_ad_hoc_commands permission flag. nobody_cred = self.create_test_credential(user=self.nobody_django_user) - credential_id = data.pop('credential') - data['credential'] = nobody_cred.pk with self.current_user('admin'): - response = self.post(url, data, expect=201) + self.run_test_ad_hoc_command(credential=nobody_cred.pk) qs = AdHocCommand.objects.none() self.check_get_list(url, 'nobody', qs) @@ -546,94 +554,61 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): }) with self.current_user('admin'): response = self.patch(user_perm_url, user_perm_data, expect=200) - qs = AdHocCommand.objects.filter(credential_id=data['credential']) + qs = AdHocCommand.objects.filter(credential_id=nobody_cred.pk) self.assertEqual(qs.count(), 1) self.check_get_list(url, 'nobody', qs) - data['credential'] = credential_id # Post without inventory (should fail). - inventory_id = data.pop('inventory') with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['inventory'] = inventory_id + self.run_test_ad_hoc_command(inventory=None, expect=400) # Post without credential (should fail). - credential_id = data.pop('credential') with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['credential'] = credential_id + self.run_test_ad_hoc_command(credential=None, expect=400) # Post with empty or unsupported module name (empty defaults to command). - module_name = data.pop('module_name') with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command(module_name=None) self.assertEqual(response['module_name'], 'command') - data['module_name'] = '' with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command(module_name='') self.assertEqual(response['module_name'], 'command') - data['module_name'] = 'transcombobulator' with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['module_name'] = module_name + self.run_test_ad_hoc_command(module_name='transcombobulator', expect=400) # Post with empty module args for shell/command modules (should fail), # empty args for other modules ok. - module_args = data.pop('module_args') with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['module_name'] = 'shell' + self.run_test_ad_hoc_command(module_args=None, expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['module_args'] = '' + self.run_test_ad_hoc_command(module_name='shell', module_args=None, expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['module_name'] = 'ping' + self.run_test_ad_hoc_command(module_name='shell', module_args='', expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=201) - data['module_name'] = module_name - data['module_args'] = module_args + self.run_test_ad_hoc_command(module_name='ping', module_args=None) # Post with invalid values for other parameters. - data['job_type'] = 'something' with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['job_type'] = 'check' + self.run_test_ad_hoc_command(job_type='something', expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command(job_type='check') self.assertEqual(response['job_type'], 'check') - data.pop('job_type') - data['verbosity'] = -1 with self.current_user('admin'): - response = self.post(url, data, expect=400) - data.pop('verbosity') - data['forks'] = -1 + self.run_test_ad_hoc_command(verbosity=-1, expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=400) - data.pop('forks') - data['privilege_escalation'] = 'telekinesis' + self.run_test_ad_hoc_command(forks=-1, expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=400) - data['privilege_escalation'] = 'su' + self.run_test_ad_hoc_command(privilege_escalation='telekinesis', expect=400) with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command(privilege_escalation='su') self.assertEqual(response['privilege_escalation'], 'su') - data['privilege_escalation'] = 'sudo' with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command(privilege_escalation='sudo') self.assertEqual(response['privilege_escalation'], 'sudo') def test_ad_hoc_command_detail(self): - # Post to list to start a new ad hoc command. - url = reverse('api:ad_hoc_command_list') - data = { - 'inventory': self.inventory.pk, - 'credential': self.credential.pk, - 'module_name': 'command', - 'module_args': 'uptime', - } with self.current_user('admin'): - response = self.post(url, data, expect=201) + response = self.run_test_ad_hoc_command() # Retrieve detail for ad hoc command. Only GET is supported. url = reverse('api:ad_hoc_command_detail', args=(response['id'],)) @@ -649,67 +624,505 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): self.assertTrue(response['related']['relaunch']) self.assertTrue(response['related']['events']) self.assertTrue(response['related']['activity_stream']) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('normal'): response = self.get(url, expect=200) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('other'): response = self.get(url, expect=403) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user('nobody'): response = self.get(url, expect=403) - self.put(url, data, expect=405) - self.patch(url, data, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) self.delete(url, expect=405) with self.current_user(None): response = self.get(url, expect=401) - self.put(url, data, expect=401) - self.patch(url, data, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) self.delete(url, expect=401) def test_ad_hoc_command_cancel(self): - # Post to list to start a new ad hoc command. - url = reverse('api:ad_hoc_command_list') - data = { - 'inventory': self.inventory.pk, - 'credential': self.credential.pk, - 'module_name': 'command', - 'module_args': 'uptime', - } - with self.current_user('admin'): - response = self.post(url, data, expect=201) + # Override setting so that ad hoc command isn't actually started. + with self.settings(CELERY_UNIT_TEST=False): + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + # Retrieve the cancel URL, should indicate it can be canceled. url = reverse('api:ad_hoc_command_cancel', args=(response['id'],)) self.assertEqual(url, response['related']['cancel']) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['can_cancel'], True) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['can_cancel'], True) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) - # FIXME: Finish test. + # Cancel ad hoc command (before it starts) and verify the can_cancel + # flag is False and attempts to cancel again fail. + with self.current_user('normal'): + self.post(url, {}, expect=202) + response = self.get(url, expect=200) + self.assertEqual(response['can_cancel'], False) + self.post(url, {}, expect=403) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['can_cancel'], False) + self.post(url, {}, expect=405) def test_ad_hoc_command_relaunch(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + + # Retrieve the relaunch URL, should indicate no passwords are needed + # and it can be relaunched. Relaunch and fetch the new command. + url = reverse('api:ad_hoc_command_relaunch', args=(response['id'],)) + self.assertEqual(url, response['related']['relaunch']) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['passwords_needed_to_start'], []) + response = self.post(url, {}, expect=201) + self.assertTrue(response['ad_hoc_command']) + self.get(response['url'], expect=200) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['passwords_needed_to_start'], []) + response = self.post(url, {}, expect=201) + self.assertTrue(response['ad_hoc_command']) + self.get(response['url'], expect=200) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_ad_hoc_command_events_list(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + response = self.run_test_ad_hoc_command() + + # Check list of ad hoc command events for a specific ad hoc command. + ad_hoc_command_id = response['id'] + url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', args=(ad_hoc_command_id,)) + self.assertEqual(url, response['related']['events']) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], self.inventory.hosts.count()) + for result in response['results']: + self.assertEqual(result['ad_hoc_command'], ad_hoc_command_id) + self.assertTrue(result['id']) + self.assertTrue(result['url']) + self.assertEqual(result['event'], 'runner_on_ok') + self.assertFalse(result['failed']) + self.assertTrue(result['changed']) + self.assertTrue(result['host'] in set(self.inventory.hosts.values_list('pk', flat=True))) + self.assertTrue(result['host_name'] in set(self.inventory.hosts.values_list('name', flat=True))) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], self.inventory.hosts.count()) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) + + # Test top level ad hoc command events list. + url = reverse('api:ad_hoc_command_event_list') + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 2 * self.inventory.hosts.count()) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 2 * self.inventory.hosts.count()) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 0) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 0) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_ad_hoc_command_event_detail(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + + # Check ad hoc command event detail view. + ad_hoc_command_event_ids = AdHocCommandEvent.objects.values_list('pk', flat=True) + with self.current_user('admin'): + for ahce_id in ad_hoc_command_event_ids: + url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,)) + response = self.get(url, expect=200) + self.assertTrue(response['ad_hoc_command']) + self.assertEqual(response['id'], ahce_id) + self.assertEqual(response['url'], url) + self.assertEqual(response['event'], 'runner_on_ok') + self.assertFalse(response['failed']) + self.assertTrue(response['changed']) + self.assertTrue(response['host'] in set(self.inventory.hosts.values_list('pk', flat=True))) + self.assertTrue(response['host_name'] in set(self.inventory.hosts.values_list('name', flat=True))) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + for ahce_id in ad_hoc_command_event_ids: + url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,)) + self.get(url, expect=200) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + for ahce_id in ad_hoc_command_event_ids: + url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,)) + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + for ahce_id in ad_hoc_command_event_ids: + url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,)) + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + for ahce_id in ad_hoc_command_event_ids: + url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,)) + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_ad_hoc_command_activity_stream(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + + # Check activity stream for ad hoc command. There should only be one + # entry when it was created; other changes made while running should + # not show up. + url = reverse('api:ad_hoc_command_activity_stream_list', args=(response['id'],)) + self.assertEqual(url, response['related']['activity_stream']) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 1) + result = response['results'][0] + self.assertTrue(result['id']) + self.assertTrue(result['url']) + self.assertEqual(result['operation'], 'create') + self.assertTrue(result['changes']) + self.assertTrue(result['timestamp']) + self.assertEqual(result['object1'], 'ad_hoc_command') + self.assertEqual(result['object2'], '') + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + #self.assertEqual(response['count'], 1) # FIXME: Enable once activity stream RBAC is fixed. + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_inventory_ad_hoc_commands_list(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk) + + # Test the ad hoc commands list for an inventory. Should only return + # the ad hoc command(s) run against that inventory. Posting should + # start a new ad hoc command and always set the inventory from the URL. + url = reverse('api:inventory_ad_hoc_commands_list', args=(self.inventory.pk,)) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 1) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 3) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_host_ad_hoc_commands_list(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + response = self.run_test_ad_hoc_command(limit=self.host2.name) + + # Test the ad hoc commands list for a host. Should only return the ad + # hoc command(s) run against that host. Posting should start a new ad + # hoc command and always set the inventory and limit based on URL. + url = reverse('api:host_ad_hoc_commands_list', args=(self.host.pk,)) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 1) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.host.name) + response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.host2.name, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.host.name) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 3) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.host.name) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_group_ad_hoc_commands_list(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() # self.host + self.host2 + response = self.run_test_ad_hoc_command(limit=self.group.name) # self.host + response = self.run_test_ad_hoc_command(limit=self.host2.name) # self.host2 + + # Test the ad hoc commands list for a group. Should return the ad + # hoc command(s) run against any hosts in that group. Posting should + # start a new ad hoc command and always set the inventory and limit + # based on URL. + url = reverse('api:group_ad_hoc_commands_list', args=(self.group.pk,)) # only self.host + url2 = reverse('api:group_ad_hoc_commands_list', args=(self.group2.pk,)) # self.host + self.host2 + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 2) + response = self.get(url2, expect=200) + self.assertEqual(response['count'], 3) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.group.name) + response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.group2.name, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.group.name) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 4) + response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) + self.assertEqual(response['inventory'], self.inventory.pk) + self.assertEqual(response['limit'], self.group.name) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=403) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) def test_host_ad_hoc_command_events_list(self): - self.skipTest('Not yet implemented') + with self.current_user('admin'): + response = self.run_test_ad_hoc_command() + + # Test the ad hoc command events list for a host. Should return the + # events only for that particular host. + url = reverse('api:host_ad_hoc_command_events_list', args=(self.host.pk,)) + with self.current_user('admin'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 1) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('normal'): + response = self.get(url, expect=200) + self.assertEqual(response['count'], 1) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('other'): + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user('nobody'): + self.get(url, expect=403) + self.post(url, {}, expect=405) + self.put(url, {}, expect=405) + self.patch(url, {}, expect=405) + self.delete(url, expect=405) + with self.current_user(None): + self.get(url, expect=401) + self.post(url, {}, expect=401) + self.put(url, {}, expect=401) + self.patch(url, {}, expect=401) + self.delete(url, expect=401) diff --git a/awx/main/tests/users.py b/awx/main/tests/users.py index ac980c525f..cf565115de 100644 --- a/awx/main/tests/users.py +++ b/awx/main/tests/users.py @@ -95,13 +95,13 @@ class AuthTokenProxyTest(BaseTest): self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs) self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff) - # should use ip address from other headers when HTTP_X_FORARDED_FOR is blank + # should use ip address from other headers when HTTP_X_FORWARDED_FOR is blank def test_blank_header_fallback(self): remote_addr = '192.168.75.1' auth_token = self._request_auth_token(remote_addr) - client_kwargs = {'HTTP_X_FORARDED_FOR': ''} + client_kwargs = {'HTTP_X_FORWARDED_FOR': ''} response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs) self.check_me_is_admin(response)