mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 03:10:42 -03:30
Ported some old adhoc tests over to the new system
This commit is contained in:
parent
5ea372ae43
commit
aca858f5b2
148
awx/main/tests/functional/api/test_adhoc.py
Normal file
148
awx/main/tests/functional/api/test_adhoc.py
Normal file
@ -0,0 +1,148 @@
|
||||
import mock # noqa
|
||||
import pytest
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
|
||||
|
||||
"""
|
||||
def run_test_ad_hoc_command(self, **kwargs):
|
||||
# Post to list to start a new ad hoc command.
|
||||
expect = kwargs.pop('expect', 201)
|
||||
url = kwargs.pop('url', reverse('api:ad_hoc_command_list'))
|
||||
data = {
|
||||
'inventory': self.inventory.pk,
|
||||
'credential': self.credential.pk,
|
||||
'module_name': 'command',
|
||||
'module_args': 'uptime',
|
||||
}
|
||||
data.update(kwargs)
|
||||
for k,v in data.items():
|
||||
if v is None:
|
||||
del data[k]
|
||||
return self.post(url, data, expect=expect)
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def post_adhoc(post, inventory, machine_credential):
|
||||
def f(url, data, user, expect=201):
|
||||
if not url:
|
||||
url = reverse('api:ad_hoc_command_list')
|
||||
|
||||
if 'module_name' not in data:
|
||||
data['module_name'] = 'command'
|
||||
if 'module_args' not in data:
|
||||
data['module_args'] = 'uptime'
|
||||
if 'inventory' not in data:
|
||||
data['inventory'] = inventory.id
|
||||
if 'credential' not in data:
|
||||
data['credential'] = machine_credential.id
|
||||
|
||||
for k,v in data.items():
|
||||
if v is None:
|
||||
del data[k]
|
||||
|
||||
return post(url, data, user, expect=expect)
|
||||
return f
|
||||
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_admin_post_ad_hoc_command_list(admin, post_adhoc, inventory, machine_credential):
|
||||
res = post_adhoc(reverse('api:ad_hoc_command_list'), {}, admin, expect=201)
|
||||
assert res.data['job_type'] == 'run'
|
||||
assert res.data['inventory'], inventory.id
|
||||
assert res.data['credential'] == machine_credential.id
|
||||
assert res.data['module_name'] == 'command'
|
||||
assert res.data['module_args'] == 'uptime'
|
||||
assert res.data['limit'] == ''
|
||||
assert res.data['forks'] == 0
|
||||
assert res.data['verbosity'] == 0
|
||||
assert res.data['become_enabled'] is False
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_empty_post_403(admin, post):
|
||||
post(reverse('api:ad_hoc_command_list'), {}, admin, expect=400)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_empty_put_405(admin, put):
|
||||
put(reverse('api:ad_hoc_command_list'), {}, admin, expect=405)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_empty_patch_405(admin, patch):
|
||||
patch(reverse('api:ad_hoc_command_list'), {}, admin, expect=405)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_empty_delete_405(admin, delete):
|
||||
delete(reverse('api:ad_hoc_command_list'), admin, expect=405)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_user_post_ad_hoc_command_list(alice, post_adhoc, inventory, machine_credential):
|
||||
inventory.adhoc_role.members.add(alice)
|
||||
machine_credential.use_role.members.add(alice)
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=201)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_user_post_ad_hoc_command_list_xfail(alice, post_adhoc, inventory, machine_credential):
|
||||
inventory.read_role.members.add(alice) # just read access? no dice.
|
||||
machine_credential.use_role.members.add(alice)
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_user_post_ad_hoc_command_list_without_creds(alice, post_adhoc, inventory, machine_credential):
|
||||
inventory.adhoc_role.members.add(alice)
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_user_post_ad_hoc_command_list_without_inventory(alice, post_adhoc, inventory, machine_credential):
|
||||
machine_credential.use_role.members.add(alice)
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {}, alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_admin_post_inventory_ad_hoc_command_list(admin, post_adhoc, inventory):
|
||||
post_adhoc(reverse('api:inventory_ad_hoc_commands_list', args=(inventory.id,)), {'inventory': None}, admin, expect=201)
|
||||
post_adhoc(reverse('api:inventory_ad_hoc_commands_list', args=(inventory.id,)), {}, admin, expect=201)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_inventory_ad_hoc_command_list(admin, alice, post_adhoc, get, inventory_factory, machine_credential):
|
||||
inv1 = inventory_factory('inv1')
|
||||
inv2 = inventory_factory('inv2')
|
||||
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'inventory': inv1.id}, admin, expect=201)
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'inventory': inv2.id}, admin, expect=201)
|
||||
res = get(reverse('api:ad_hoc_command_list'), admin, expect=200)
|
||||
assert res.data['count'] == 2
|
||||
res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), admin, expect=200)
|
||||
assert res.data['count'] == 1
|
||||
res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv2.id,)), admin, expect=200)
|
||||
assert res.data['count'] == 1
|
||||
|
||||
inv1.adhoc_role.members.add(alice)
|
||||
res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), alice, expect=200)
|
||||
assert res.data['count'] == 0
|
||||
|
||||
machine_credential.use_role.members.add(alice)
|
||||
res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv1.id,)), alice, expect=200)
|
||||
assert res.data['count'] == 1
|
||||
res = get(reverse('api:inventory_ad_hoc_commands_list', args=(inv2.id,)), alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data1(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'module_name': 'command', 'module_args': None}, admin, expect=400)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data2(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'job_type': 'baddata'}, admin, expect=400)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data3(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'verbosity': -1}, admin, expect=400)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data4(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'forks': -1}, admin, expect=400)
|
||||
|
||||
@ -404,164 +404,6 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
|
||||
del data[k]
|
||||
return self.post(url, data, expect=expect)
|
||||
|
||||
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
|
||||
def test_ad_hoc_command_list(self, ignore):
|
||||
url = reverse('api:ad_hoc_command_list')
|
||||
|
||||
# Retrieve the empty list of ad hoc commands.
|
||||
qs = AdHocCommand.objects.none()
|
||||
self.check_get_list(url, 'admin', qs)
|
||||
self.check_get_list(url, 'normal', qs)
|
||||
self.check_get_list(url, 'other', qs)
|
||||
self.check_get_list(url, 'nobody', qs)
|
||||
self.check_get_list(url, None, qs, expect=401)
|
||||
|
||||
# Start a new ad hoc command. Only admin and normal user (org admin)
|
||||
# can run commands by default.
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command()
|
||||
self.assertEqual(response['job_type'], 'run')
|
||||
self.assertEqual(response['inventory'], self.inventory.pk)
|
||||
self.assertEqual(response['credential'], self.credential.pk)
|
||||
self.assertEqual(response['module_name'], 'command')
|
||||
self.assertEqual(response['module_args'], 'uptime')
|
||||
self.assertEqual(response['limit'], '')
|
||||
self.assertEqual(response['forks'], 0)
|
||||
self.assertEqual(response['verbosity'], 0)
|
||||
self.assertEqual(response['become_enabled'], False)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user('normal'):
|
||||
self.run_test_ad_hoc_command()
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user('other'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user('nobody'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user(None):
|
||||
self.run_test_ad_hoc_command(expect=401)
|
||||
self.put(url, {}, expect=401)
|
||||
self.patch(url, {}, expect=401)
|
||||
self.delete(url, expect=401)
|
||||
|
||||
# Retrieve the list of ad hoc commands (only admin/normal can see by default).
|
||||
qs = AdHocCommand.objects.all()
|
||||
self.assertEqual(qs.count(), 2)
|
||||
self.check_get_list(url, 'admin', qs)
|
||||
self.check_get_list(url, 'normal', qs)
|
||||
qs = AdHocCommand.objects.none()
|
||||
self.check_get_list(url, 'other', qs)
|
||||
self.check_get_list(url, 'nobody', qs)
|
||||
self.check_get_list(url, None, qs, expect=401)
|
||||
|
||||
# Explicitly give other user updater permission on the inventory (still
|
||||
# not allowed to run ad hoc commands).
|
||||
user_roles_list_url = reverse('api:user_roles_list', args=(self.other_django_user.pk,))
|
||||
with self.current_user('admin'):
|
||||
response = self.post(user_roles_list_url, {"id": self.inventory.update_role.id}, expect=204)
|
||||
with self.current_user('other'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
self.check_get_list(url, 'other', qs)
|
||||
|
||||
# Add executor role permissions to other. Fails
|
||||
# when other user can't read credential.
|
||||
with self.current_user('admin'):
|
||||
response = self.post(user_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204)
|
||||
with self.current_user('other'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
|
||||
# Succeeds once other user has a readable credential. Other user can
|
||||
# only see his own ad hoc command (because of credential permissions).
|
||||
other_cred = self.create_test_credential(user=self.other_django_user)
|
||||
with self.current_user('other'):
|
||||
self.run_test_ad_hoc_command(credential=other_cred.pk)
|
||||
qs = AdHocCommand.objects.filter(created_by=self.other_django_user)
|
||||
self.assertEqual(qs.count(), 1)
|
||||
self.check_get_list(url, 'other', qs)
|
||||
|
||||
# Explicitly give nobody user read permission on the inventory.
|
||||
nobody_roles_list_url = reverse('api:user_roles_list', args=(self.nobody_django_user.pk,))
|
||||
with self.current_user('admin'):
|
||||
response = self.post(nobody_roles_list_url, {"id": self.inventory.read_role.id}, expect=204)
|
||||
with self.current_user('nobody'):
|
||||
self.run_test_ad_hoc_command(credential=other_cred.pk, expect=403)
|
||||
self.check_get_list(url, 'other', qs)
|
||||
|
||||
# Create a cred for the nobody user, run an ad hoc command as the admin
|
||||
# user with that cred. Nobody user can still not see the ad hoc command
|
||||
# without the run_ad_hoc_commands permission flag.
|
||||
nobody_cred = self.create_test_credential(user=self.nobody_django_user)
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(credential=nobody_cred.pk)
|
||||
qs = AdHocCommand.objects.none()
|
||||
self.check_get_list(url, 'nobody', qs)
|
||||
|
||||
# Give the nobody user the run_ad_hoc_commands flag, and can now see
|
||||
# the one ad hoc command previously run.
|
||||
with self.current_user('admin'):
|
||||
response = self.post(nobody_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204)
|
||||
qs = AdHocCommand.objects.filter(credential_id=nobody_cred.pk)
|
||||
self.assertEqual(qs.count(), 1)
|
||||
self.check_get_list(url, 'nobody', qs)
|
||||
|
||||
# Post without inventory (should fail).
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(inventory=None, expect=400)
|
||||
|
||||
# Post without credential (should fail).
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(credential=None, expect=400)
|
||||
|
||||
# Post with empty or unsupported module name (empty defaults to command).
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command(module_name=None)
|
||||
self.assertEqual(response['module_name'], 'command')
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command(module_name='')
|
||||
self.assertEqual(response['module_name'], 'command')
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(module_name='transcombobulator', expect=400)
|
||||
|
||||
# Post with empty module args for shell/command modules (should fail),
|
||||
# empty args for other modules ok.
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(module_args=None, expect=400)
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(module_name='shell', module_args=None, expect=400)
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(module_name='shell', module_args='', expect=400)
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(module_name='ping', module_args=None)
|
||||
|
||||
# Post with invalid values for other parameters.
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(job_type='something', expect=400)
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command(job_type='check')
|
||||
self.assertEqual(response['job_type'], 'check')
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(verbosity=-1, expect=400)
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(forks=-1, expect=400)
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command(become_enabled=True)
|
||||
self.assertEqual(response['become_enabled'], True)
|
||||
|
||||
# Try to run with expired license.
|
||||
self.create_expired_license_file()
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
with self.current_user('normal'):
|
||||
self.run_test_ad_hoc_command(expect=403)
|
||||
|
||||
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
|
||||
def test_ad_hoc_command_detail(self, ignore):
|
||||
@ -953,98 +795,6 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
|
||||
self.patch(url, {}, expect=401)
|
||||
self.delete(url, expect=401)
|
||||
|
||||
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
|
||||
def test_inventory_ad_hoc_commands_list(self, ignore):
|
||||
with self.current_user('admin'):
|
||||
response = self.run_test_ad_hoc_command()
|
||||
response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk)
|
||||
|
||||
# Test the ad hoc commands list for an inventory. Should only return
|
||||
# the ad hoc command(s) run against that inventory. Posting should
|
||||
# start a new ad hoc command and always set the inventory from the URL.
|
||||
url = reverse('api:inventory_ad_hoc_commands_list', args=(self.inventory.pk,))
|
||||
inventory_url = reverse('api:inventory_detail', args=(self.inventory.pk,))
|
||||
with self.current_user('admin'):
|
||||
response = self.get(url, expect=200)
|
||||
self.assertEqual(response['count'], 1)
|
||||
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
|
||||
self.assertEqual(response['inventory'], self.inventory.pk)
|
||||
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, expect=201)
|
||||
self.assertEqual(response['inventory'], self.inventory.pk)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
response = self.get(inventory_url, expect=200)
|
||||
self.assertTrue(response['can_run_ad_hoc_commands'])
|
||||
with self.current_user('normal'):
|
||||
response = self.get(url, expect=200)
|
||||
self.assertEqual(response['count'], 3)
|
||||
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
|
||||
self.assertEqual(response['inventory'], self.inventory.pk)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
response = self.get(inventory_url, expect=200)
|
||||
self.assertTrue(response['can_run_ad_hoc_commands'])
|
||||
with self.current_user('other'):
|
||||
self.get(url, expect=403)
|
||||
self.post(url, {}, expect=403)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user('nobody'):
|
||||
self.get(url, expect=403)
|
||||
self.post(url, {}, expect=403)
|
||||
self.put(url, {}, expect=405)
|
||||
self.patch(url, {}, expect=405)
|
||||
self.delete(url, expect=405)
|
||||
with self.current_user(None):
|
||||
self.get(url, expect=401)
|
||||
self.post(url, {}, expect=401)
|
||||
self.put(url, {}, expect=401)
|
||||
self.patch(url, {}, expect=401)
|
||||
self.delete(url, expect=401)
|
||||
|
||||
# Create another unrelated inventory permission with run_ad_hoc_commands
|
||||
# set; this tests an edge case in the RBAC query where we'll return
|
||||
# can_run_ad_hoc_commands = True when we shouldn't.
|
||||
nobody_roles_list_url = reverse('api:user_roles_list', args=(self.nobody_django_user.pk,))
|
||||
with self.current_user('admin'):
|
||||
response = self.post(nobody_roles_list_url, {"id": self.inventory.execute_role.id}, expect=204)
|
||||
|
||||
# Create a credential for the other user and explicitly give other
|
||||
# user admin permission on the inventory (still not allowed to run ad
|
||||
# hoc commands; can get the list but can't see any items).
|
||||
other_cred = self.create_test_credential(user=self.other_django_user)
|
||||
user_roles_list_url = reverse('api:user_roles_list', args=(self.other_django_user.pk,))
|
||||
with self.current_user('admin'):
|
||||
response = self.post(user_roles_list_url, {"id": self.inventory.update_role.id}, expect=204)
|
||||
with self.current_user('other'):
|
||||
response = self.get(url, expect=200)
|
||||
self.assertEqual(response['count'], 0)
|
||||
response = self.get(inventory_url, expect=200)
|
||||
self.assertFalse(response['can_run_ad_hoc_commands'])
|
||||
self.run_test_ad_hoc_command(url=url, inventory=None, credential=other_cred.pk, expect=403)
|
||||
|
||||
# Update permission to allow other user to run ad hoc commands. Can
|
||||
# only see his own ad hoc commands (because of credential permission).
|
||||
with self.current_user('admin'):
|
||||
response = self.post(user_roles_list_url, {"id": self.inventory.adhoc_role.id}, expect=204)
|
||||
with self.current_user('other'):
|
||||
response = self.get(url, expect=200)
|
||||
self.assertEqual(response['count'], 0)
|
||||
self.run_test_ad_hoc_command(url=url, inventory=None, credential=other_cred.pk, expect=201)
|
||||
response = self.get(url, expect=200)
|
||||
self.assertEqual(response['count'], 1)
|
||||
response = self.get(inventory_url, expect=200)
|
||||
self.assertTrue(response['can_run_ad_hoc_commands'])
|
||||
|
||||
# Try to run with expired license.
|
||||
self.create_expired_license_file()
|
||||
with self.current_user('admin'):
|
||||
self.run_test_ad_hoc_command(url=url, expect=403)
|
||||
with self.current_user('normal'):
|
||||
self.run_test_ad_hoc_command(url=url, expect=403)
|
||||
|
||||
def test_host_ad_hoc_commands_list(self):
|
||||
# TODO: Figure out why this test needs pexpect
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user