diff --git a/awx/main/tests/functional/api/test_adhoc.py b/awx/main/tests/functional/api/test_adhoc.py new file mode 100644 index 0000000000..43326afcb4 --- /dev/null +++ b/awx/main/tests/functional/api/test_adhoc.py @@ -0,0 +1,148 @@ +import mock # noqa +import pytest + +from django.core.urlresolvers import reverse + + + +""" + def run_test_ad_hoc_command(self, **kwargs): + # Post to list to start a new ad hoc command. + expect = kwargs.pop('expect', 201) + url = kwargs.pop('url', reverse('api:ad_hoc_command_list')) + data = { + 'inventory': self.inventory.pk, + 'credential': self.credential.pk, + 'module_name': 'command', + 'module_args': 'uptime', + } + data.update(kwargs) + for k,v in data.items(): + if v is None: + del data[k] + return self.post(url, data, expect=expect) +""" + +@pytest.fixture +def post_adhoc(post, inventory, machine_credential): + def f(url, data, user, expect=201): + if not url: + url = reverse('api:ad_hoc_command_list') + + if 'module_name' not in data: + data['module_name'] = 'command' + if 'module_args' not in data: + data['module_args'] = 'uptime' + if 'inventory' not in data: + data['inventory'] = inventory.id + if 'credential' not in data: + data['credential'] = machine_credential.id + + for k,v in data.items(): + if v is None: + del data[k] + + return post(url, data, user, expect=expect) + return f + + + +@pytest.mark.django_db +def test_admin_post_ad_hoc_command_list(admin, post_adhoc, inventory, machine_credential): + res = post_adhoc(reverse('api:ad_hoc_command_list'), {}, admin, expect=201) + assert res.data['job_type'] == 'run' + assert res.data['inventory'], inventory.id + assert res.data['credential'] == machine_credential.id + assert res.data['module_name'] == 'command' + assert res.data['module_args'] == 'uptime' + assert res.data['limit'] == '' + assert res.data['forks'] == 0 + assert res.data['verbosity'] == 0 + assert res.data['become_enabled'] is False + + +@pytest.mark.django_db +def test_empty_post_403(admin, post): + post(reverse('api:ad_hoc_command_list'), {}, admin, expect=400) + +@pytest.mark.django_db +def test_empty_put_405(admin, put): + put(reverse('api:ad_hoc_command_list'), {}, admin, expect=405) + +@pytest.mark.django_db +def test_empty_patch_405(admin, patch): + patch(reverse('api:ad_hoc_command_list'), {}, admin, expect=405) + +@pytest.mark.django_db +def test_empty_delete_405(admin, delete): + delete(reverse('api:ad_hoc_command_list'), admin, expect=405) + +@pytest.mark.django_db +def test_user_post_ad_hoc_command_list(alice, post_adhoc, inventory, machine_credential): + inventory.adhoc_role.members.add(alice) + machine_credential.use_role.members.add(alice) + post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=201) + +@pytest.mark.django_db +def test_user_post_ad_hoc_command_list_xfail(alice, post_adhoc, inventory, machine_credential): + inventory.read_role.members.add(alice) # just read access? no dice. + machine_credential.use_role.members.add(alice) + post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403) + +@pytest.mark.django_db +def test_user_post_ad_hoc_command_list_without_creds(alice, post_adhoc, inventory, machine_credential): + inventory.adhoc_role.members.add(alice) + post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403) + +@pytest.mark.django_db +def test_user_post_ad_hoc_command_list_without_inventory(alice, post_adhoc, inventory, machine_credential): + machine_credential.use_role.members.add(alice) + post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403) + + +@pytest.mark.django_db +def test_admin_post_inventory_ad_hoc_command_list(admin, post_adhoc, inventory): + post_adhoc(reverse('api:inventory_ad_hoc_commands_list', args=(inventory.id,)), {'inventory': None}, admin, expect=201) + post_adhoc(reverse('api:inventory_ad_hoc_commands_list', args=(inventory.id,)), {}, admin, expect=201) + + +@pytest.mark.django_db +def test_get_inventory_ad_hoc_command_list(admin, alice, post_adhoc, get, inventory_factory, machine_credential): + inv1 = inventory_factory('inv1') + inv2 = inventory_factory('inv2') + + post_adhoc(reverse('api:ad_hoc_command_list'), {'inventory': inv1.id}, admin, expect=201) + post_adhoc(reverse('api:ad_hoc_command_list'), {'inventory': inv2.id}, admin, expect=201) + res = get(reverse('api:ad_hoc_command_list'), admin, expect=200) + assert res.data['count'] == 2 + res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), admin, expect=200) + assert res.data['count'] == 1 + res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv2.id,)), admin, expect=200) + assert res.data['count'] == 1 + + inv1.adhoc_role.members.add(alice) + res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), alice, expect=200) + assert res.data['count'] == 0 + + machine_credential.use_role.members.add(alice) + res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), alice, expect=200) + assert res.data['count'] == 1 + res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv2.id,)), alice, expect=403) + + +@pytest.mark.django_db +def test_bad_data1(admin, post_adhoc): + post_adhoc(reverse('api:ad_hoc_command_list'), {'module_name': 'command', 'module_args': None}, admin, expect=400) + +@pytest.mark.django_db +def test_bad_data2(admin, post_adhoc): + post_adhoc(reverse('api:ad_hoc_command_list'), {'job_type': 'baddata'}, admin, expect=400) + +@pytest.mark.django_db +def test_bad_data3(admin, post_adhoc): + post_adhoc(reverse('api:ad_hoc_command_list'), {'verbosity': -1}, admin, expect=400) + +@pytest.mark.django_db +def test_bad_data4(admin, post_adhoc): + post_adhoc(reverse('api:ad_hoc_command_list'), {'forks': -1}, admin, expect=400) + diff --git a/awx/main/tests/old/ad_hoc.py b/awx/main/tests/old/ad_hoc.py index b05dc6df89..2c81ec71a0 100644 --- a/awx/main/tests/old/ad_hoc.py +++ b/awx/main/tests/old/ad_hoc.py @@ -404,164 +404,6 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): del data[k] return self.post(url, data, expect=expect) - @mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock) - def test_ad_hoc_command_list(self, ignore): - url = reverse('api:ad_hoc_command_list') - - # Retrieve the empty list of ad hoc commands. - qs = AdHocCommand.objects.none() - self.check_get_list(url, 'admin', qs) - self.check_get_list(url, 'normal', qs) - self.check_get_list(url, 'other', qs) - self.check_get_list(url, 'nobody', qs) - self.check_get_list(url, None, qs, expect=401) - - # Start a new ad hoc command. Only admin and normal user (org admin) - # can run commands by default. - with self.current_user('admin'): - response = self.run_test_ad_hoc_command() - self.assertEqual(response['job_type'], 'run') - self.assertEqual(response['inventory'], self.inventory.pk) - self.assertEqual(response['credential'], self.credential.pk) - self.assertEqual(response['module_name'], 'command') - self.assertEqual(response['module_args'], 'uptime') - self.assertEqual(response['limit'], '') - self.assertEqual(response['forks'], 0) - self.assertEqual(response['verbosity'], 0) - self.assertEqual(response['become_enabled'], False) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user('normal'): - self.run_test_ad_hoc_command() - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user('other'): - self.run_test_ad_hoc_command(expect=403) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user('nobody'): - self.run_test_ad_hoc_command(expect=403) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user(None): - self.run_test_ad_hoc_command(expect=401) - self.put(url, {}, expect=401) - self.patch(url, {}, expect=401) - self.delete(url, expect=401) - - # Retrieve the list of ad hoc commands (only admin/normal can see by default). - qs = AdHocCommand.objects.all() - self.assertEqual(qs.count(), 2) - self.check_get_list(url, 'admin', qs) - self.check_get_list(url, 'normal', qs) - qs = AdHocCommand.objects.none() - self.check_get_list(url, 'other', qs) - self.check_get_list(url, 'nobody', qs) - self.check_get_list(url, None, qs, expect=401) - - # Explicitly give other user updater permission on the inventory (still - # not allowed to run ad hoc commands). - user_roles_list_url = reverse('api:user_roles_list', args=(self.other_django_user.pk,)) - with self.current_user('admin'): - response = self.post(user_roles_list_url, {"id": self.inventory.update_role.id}, expect=204) - with self.current_user('other'): - self.run_test_ad_hoc_command(expect=403) - self.check_get_list(url, 'other', qs) - - # Add executor role permissions to other. Fails - # when other user can't read credential. - with self.current_user('admin'): - response = self.post(user_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204) - with self.current_user('other'): - self.run_test_ad_hoc_command(expect=403) - - # Succeeds once other user has a readable credential. Other user can - # only see his own ad hoc command (because of credential permissions). - other_cred = self.create_test_credential(user=self.other_django_user) - with self.current_user('other'): - self.run_test_ad_hoc_command(credential=other_cred.pk) - qs = AdHocCommand.objects.filter(created_by=self.other_django_user) - self.assertEqual(qs.count(), 1) - self.check_get_list(url, 'other', qs) - - # Explicitly give nobody user read permission on the inventory. - nobody_roles_list_url = reverse('api:user_roles_list', args=(self.nobody_django_user.pk,)) - with self.current_user('admin'): - response = self.post(nobody_roles_list_url, {"id": self.inventory.read_role.id}, expect=204) - with self.current_user('nobody'): - self.run_test_ad_hoc_command(credential=other_cred.pk, expect=403) - self.check_get_list(url, 'other', qs) - - # Create a cred for the nobody user, run an ad hoc command as the admin - # user with that cred. Nobody user can still not see the ad hoc command - # without the run_ad_hoc_commands permission flag. - nobody_cred = self.create_test_credential(user=self.nobody_django_user) - with self.current_user('admin'): - self.run_test_ad_hoc_command(credential=nobody_cred.pk) - qs = AdHocCommand.objects.none() - self.check_get_list(url, 'nobody', qs) - - # Give the nobody user the run_ad_hoc_commands flag, and can now see - # the one ad hoc command previously run. - with self.current_user('admin'): - response = self.post(nobody_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204) - qs = AdHocCommand.objects.filter(credential_id=nobody_cred.pk) - self.assertEqual(qs.count(), 1) - self.check_get_list(url, 'nobody', qs) - - # Post without inventory (should fail). - with self.current_user('admin'): - self.run_test_ad_hoc_command(inventory=None, expect=400) - - # Post without credential (should fail). - with self.current_user('admin'): - self.run_test_ad_hoc_command(credential=None, expect=400) - - # Post with empty or unsupported module name (empty defaults to command). - with self.current_user('admin'): - response = self.run_test_ad_hoc_command(module_name=None) - self.assertEqual(response['module_name'], 'command') - with self.current_user('admin'): - response = self.run_test_ad_hoc_command(module_name='') - self.assertEqual(response['module_name'], 'command') - with self.current_user('admin'): - self.run_test_ad_hoc_command(module_name='transcombobulator', expect=400) - - # Post with empty module args for shell/command modules (should fail), - # empty args for other modules ok. - with self.current_user('admin'): - self.run_test_ad_hoc_command(module_args=None, expect=400) - with self.current_user('admin'): - self.run_test_ad_hoc_command(module_name='shell', module_args=None, expect=400) - with self.current_user('admin'): - self.run_test_ad_hoc_command(module_name='shell', module_args='', expect=400) - with self.current_user('admin'): - self.run_test_ad_hoc_command(module_name='ping', module_args=None) - - # Post with invalid values for other parameters. - with self.current_user('admin'): - self.run_test_ad_hoc_command(job_type='something', expect=400) - with self.current_user('admin'): - response = self.run_test_ad_hoc_command(job_type='check') - self.assertEqual(response['job_type'], 'check') - with self.current_user('admin'): - self.run_test_ad_hoc_command(verbosity=-1, expect=400) - with self.current_user('admin'): - self.run_test_ad_hoc_command(forks=-1, expect=400) - with self.current_user('admin'): - response = self.run_test_ad_hoc_command(become_enabled=True) - self.assertEqual(response['become_enabled'], True) - - # Try to run with expired license. - self.create_expired_license_file() - with self.current_user('admin'): - self.run_test_ad_hoc_command(expect=403) - with self.current_user('normal'): - self.run_test_ad_hoc_command(expect=403) @mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock) def test_ad_hoc_command_detail(self, ignore): @@ -953,98 +795,6 @@ class AdHocCommandApiTest(BaseAdHocCommandTest): self.patch(url, {}, expect=401) self.delete(url, expect=401) - @mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock) - def test_inventory_ad_hoc_commands_list(self, ignore): - with self.current_user('admin'): - response = self.run_test_ad_hoc_command() - response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk) - - # Test the ad hoc commands list for an inventory. Should only return - # the ad hoc command(s) run against that inventory. Posting should - # start a new ad hoc command and always set the inventory from the URL. - url = reverse('api:inventory_ad_hoc_commands_list', args=(self.inventory.pk,)) - inventory_url = reverse('api:inventory_detail', args=(self.inventory.pk,)) - with self.current_user('admin'): - response = self.get(url, expect=200) - self.assertEqual(response['count'], 1) - response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) - self.assertEqual(response['inventory'], self.inventory.pk) - response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, expect=201) - self.assertEqual(response['inventory'], self.inventory.pk) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - response = self.get(inventory_url, expect=200) - self.assertTrue(response['can_run_ad_hoc_commands']) - with self.current_user('normal'): - response = self.get(url, expect=200) - self.assertEqual(response['count'], 3) - response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201) - self.assertEqual(response['inventory'], self.inventory.pk) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - response = self.get(inventory_url, expect=200) - self.assertTrue(response['can_run_ad_hoc_commands']) - with self.current_user('other'): - self.get(url, expect=403) - self.post(url, {}, expect=403) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user('nobody'): - self.get(url, expect=403) - self.post(url, {}, expect=403) - self.put(url, {}, expect=405) - self.patch(url, {}, expect=405) - self.delete(url, expect=405) - with self.current_user(None): - self.get(url, expect=401) - self.post(url, {}, expect=401) - self.put(url, {}, expect=401) - self.patch(url, {}, expect=401) - self.delete(url, expect=401) - - # Create another unrelated inventory permission with run_ad_hoc_commands - # set; this tests an edge case in the RBAC query where we'll return - # can_run_ad_hoc_commands = True when we shouldn't. - nobody_roles_list_url = reverse('api:user_roles_list', args=(self.nobody_django_user.pk,)) - with self.current_user('admin'): - response = self.post(nobody_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204) - - # Create a credential for the other user and explicitly give other - # user admin permission on the inventory (still not allowed to run ad - # hoc commands; can get the list but can't see any items). - other_cred = self.create_test_credential(user=self.other_django_user) - user_roles_list_url = reverse('api:user_roles_list', args=(self.other_django_user.pk,)) - with self.current_user('admin'): - response = self.post(user_roles_list_url, {"id": self.inventory.update_role.id}, expect=204) - with self.current_user('other'): - response = self.get(url, expect=200) - self.assertEqual(response['count'], 0) - response = self.get(inventory_url, expect=200) - self.assertFalse(response['can_run_ad_hoc_commands']) - self.run_test_ad_hoc_command(url=url, inventory=None, credential=other_cred.pk, expect=403) - - # Update permission to allow other user to run ad hoc commands. Can - # only see his own ad hoc commands (because of credential permission). - with self.current_user('admin'): - response = self.post(user_roles_list_url, {"id": self.inventory.adhoc_role.id}, expect=204) - with self.current_user('other'): - response = self.get(url, expect=200) - self.assertEqual(response['count'], 0) - self.run_test_ad_hoc_command(url=url, inventory=None, credential=other_cred.pk, expect=201) - response = self.get(url, expect=200) - self.assertEqual(response['count'], 1) - response = self.get(inventory_url, expect=200) - self.assertTrue(response['can_run_ad_hoc_commands']) - - # Try to run with expired license. - self.create_expired_license_file() - with self.current_user('admin'): - self.run_test_ad_hoc_command(url=url, expect=403) - with self.current_user('normal'): - self.run_test_ad_hoc_command(url=url, expect=403) def test_host_ad_hoc_commands_list(self): # TODO: Figure out why this test needs pexpect