Finish ad hoc command unit tests.

This commit is contained in:
Chris Church 2015-04-05 18:31:51 -04:00
parent 0fdb9703ea
commit e2a6f100e1
8 changed files with 563 additions and 134 deletions

View File

@ -8,7 +8,7 @@ import logging
from django.http import Http404
# Django REST Framework
from rest_framework.exceptions import PermissionDenied
from rest_framework.exceptions import MethodNotAllowed, PermissionDenied
from rest_framework import permissions
# AWX
@ -45,7 +45,10 @@ class ModelAccessPermission(permissions.BasePermission):
def check_post_permissions(self, request, view, obj=None):
if hasattr(view, 'parent_model'):
get_object_or_400(view.parent_model, pk=view.kwargs['pk'])
parent_obj = get_object_or_400(view.parent_model, pk=view.kwargs['pk'])
if not check_user_access(request.user, view.parent_model, 'read',
parent_obj):
return False
return True
elif getattr(view, 'is_job_start', False):
if not obj:
@ -108,6 +111,11 @@ class ModelAccessPermission(permissions.BasePermission):
if request.user.is_superuser:
return True
# Check if view supports the request method before checking permission
# based on request method.
if request.method.upper() not in view.allowed_methods:
raise MethodNotAllowed(request.method)
# Check permissions for the given view and object, based on the request
# method used.
check_method = getattr(self, 'check_%s_permissions' % request.method.lower(), None)

View File

@ -1525,15 +1525,13 @@ class AdHocCommandSerializer(UnifiedJobSerializer):
if not (obj and obj.pk) and view and hasattr(view, '_raw_data_form_marker'):
if not obj:
obj = self.opts.model()
if parent_model in (Host, Group):
parent_obj = parent_model.objects.get(pk=view.kwargs['pk'])
obj.limit = parent_obj.name
ret = super(AdHocCommandSerializer, self).to_native(obj)
# Hide inventory field from raw data, since it will be set automatically
# by sub list create view.
# Hide inventory and limit fields from raw data, since they will be set
# automatically by sub list create view.
if not (obj and obj.pk) and view and hasattr(view, '_raw_data_form_marker'):
if parent_model in (Host, Group):
ret.pop('inventory', None)
ret.pop('limit', None)
return ret

View File

@ -2189,7 +2189,7 @@ class AdHocCommandList(ListCreateAPIView):
new_in_220 = True
def create(self, request, *args, **kwargs):
# Inject inventory ID if parent objects is a host/group.
# Inject inventory ID and limit if parent objects is a host/group.
if hasattr(self, 'get_parent_object') and not getattr(self, 'parent_key', None):
data = request.DATA
# HACK: Make request data mutable.
@ -2198,6 +2198,7 @@ class AdHocCommandList(ListCreateAPIView):
parent_obj = self.get_parent_object()
if isinstance(parent_obj, (Host, Group)):
data['inventory'] = parent_obj.inventory_id
data['limit'] = parent_obj.name
# Check for passwords needed before creating ad hoc command.
credential_pk = get_pk_from_dict(request.DATA, 'credential')
@ -2351,6 +2352,8 @@ class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList):
# Post allowed for ad hoc event callback only.
def post(self, request, *args, **kwargs):
if request.user:
raise PermissionDenied()
parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
data = request.DATA.copy()
data['ad_hoc_command'] = parent_obj.pk

View File

@ -1525,6 +1525,10 @@ class ActivityStreamAccess(BaseAccess):
job_qs = self.user.get_queryset(Job)
qs.filter(job__in=job_qs)
# Ad Hoc Command Filter
ad_hoc_command_qs = self.user.get_queryset(AdHocCommand)
qs.filter(ad_hoc_command__in=ad_hoc_command_qs)
# organization_qs = self.user.get_queryset(Organization)
# user_qs = self.user.get_queryset(User)
# inventory_qs = self.user.get_queryset(Inventory)

View File

@ -207,7 +207,7 @@ class AdHocCommand(UnifiedJob):
def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', [])
if not self.name:
self.name = Truncator(u'%s: %s' % (self.module_name, self.module_args)).chars(512)
self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512)
if 'name' not in update_fields:
update_fields.append('name')
super(AdHocCommand, self).save(*args, **kwargs)

View File

@ -293,17 +293,20 @@ def disable_activity_stream():
activity_stream_enabled.enabled = previous_value
model_serializer_mapping = {Organization: OrganizationSerializer,
Inventory: InventorySerializer,
Host: HostSerializer,
Group: GroupSerializer,
InventorySource: InventorySourceSerializer,
Credential: CredentialSerializer,
Team: TeamSerializer,
Project: ProjectSerializer,
Permission: PermissionSerializer,
JobTemplate: JobTemplateSerializer,
Job: JobSerializer}
model_serializer_mapping = {
Organization: OrganizationSerializer,
Inventory: InventorySerializer,
Host: HostSerializer,
Group: GroupSerializer,
InventorySource: InventorySourceSerializer,
Credential: CredentialSerializer,
Team: TeamSerializer,
Project: ProjectSerializer,
Permission: PermissionSerializer,
JobTemplate: JobTemplateSerializer,
Job: JobSerializer,
AdHocCommand: AdHocCommandSerializer,
}
def activity_stream_create(sender, instance, created, **kwargs):
if created and activity_stream_enabled:

View File

@ -40,6 +40,8 @@ class BaseAdHocCommandTest(BaseJobExecutionTest):
self.group2 = self.inventory.groups.create(name='test-group2')
self.group.hosts.add(self.host)
self.group2.hosts.add(self.host, self.host2)
self.inventory2 = self.organization.inventories.create(name='test-inventory2')
self.host3 = self.inventory2.hosts.create(name='host3.example.com')
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
settings.CALLBACK_CONSUMER_PORT = ''
@ -410,6 +412,22 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
super(AdHocCommandApiTest, self).setUp()
self.create_test_credential(user=self.normal_django_user)
def run_test_ad_hoc_command(self, **kwargs):
# Post to list to start a new ad hoc command.
expect = kwargs.pop('expect', 201)
url = kwargs.pop('url', reverse('api:ad_hoc_command_list'))
data = {
'inventory': self.inventory.pk,
'credential': self.credential.pk,
'module_name': 'command',
'module_args': 'uptime',
}
data.update(kwargs)
for k,v in data.items():
if v is None:
del data[k]
return self.post(url, data, expect=expect)
def test_ad_hoc_command_list(self):
url = reverse('api:ad_hoc_command_list')
@ -421,16 +439,10 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.check_get_list(url, 'nobody', qs)
self.check_get_list(url, None, qs, expect=401)
# Post to list to start a new ad hoc command. Only admin and normal
# user (org admin) can run commands by default.
data = {
'inventory': self.inventory.pk,
'credential': self.credential.pk,
'module_name': 'command',
'module_args': 'uptime',
}
# Start a new ad hoc command. Only admin and normal user (org admin)
# can run commands by default.
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command()
self.assertEqual(response['job_type'], 'run')
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['credential'], self.credential.pk)
@ -440,28 +452,28 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.assertEqual(response['forks'], 0)
self.assertEqual(response['verbosity'], 0)
self.assertEqual(response['privilege_escalation'], '')
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.post(url, data, expect=201)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.run_test_ad_hoc_command()
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
response = self.post(url, data, expect=403)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.run_test_ad_hoc_command(expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
response = self.post(url, data, expect=403)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.run_test_ad_hoc_command(expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
response = self.post(url, data, expect=401)
self.put(url, data, expect=401)
self.patch(url, data, expect=401)
self.run_test_ad_hoc_command(expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Retrieve the list of ad hoc commands (only admin/normal can see by default).
@ -486,7 +498,7 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
response = self.post(user_perm_url, user_perm_data, expect=201)
user_perm_id = response['id']
with self.current_user('other'):
response = self.post(url, data, expect=403)
self.run_test_ad_hoc_command(expect=403)
self.check_get_list(url, 'other', qs)
# Update permission to allow other user to run ad hoc commands. Fails
@ -499,15 +511,13 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
with self.current_user('admin'):
response = self.patch(user_perm_url, user_perm_data, expect=200)
with self.current_user('other'):
response = self.post(url, data, expect=403)
self.run_test_ad_hoc_command(expect=403)
# Succeeds once other user has a readable credential. Other user can
# only see his own ad hoc command (because of credential permissions).
other_cred = self.create_test_credential(user=self.other_django_user)
credential_id = data.pop('credential')
data['credential'] = other_cred.pk
with self.current_user('other'):
response = self.post(url, data, expect=201)
self.run_test_ad_hoc_command(credential=other_cred.pk)
qs = AdHocCommand.objects.filter(created_by=self.other_django_user)
self.assertEqual(qs.count(), 1)
self.check_get_list(url, 'other', qs)
@ -523,17 +533,15 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
response = self.post(user_perm_url, user_perm_data, expect=201)
user_perm_id = response['id']
with self.current_user('nobody'):
response = self.post(url, data, expect=403)
self.run_test_ad_hoc_command(credential=other_cred.pk, expect=403)
self.check_get_list(url, 'other', qs)
# Create a cred for the nobody user, run an ad hoc command as the admin
# user with that cred. Nobody user can still not see the ad hoc command
# without the run_ad_hoc_commands permission flag.
nobody_cred = self.create_test_credential(user=self.nobody_django_user)
credential_id = data.pop('credential')
data['credential'] = nobody_cred.pk
with self.current_user('admin'):
response = self.post(url, data, expect=201)
self.run_test_ad_hoc_command(credential=nobody_cred.pk)
qs = AdHocCommand.objects.none()
self.check_get_list(url, 'nobody', qs)
@ -546,94 +554,61 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
})
with self.current_user('admin'):
response = self.patch(user_perm_url, user_perm_data, expect=200)
qs = AdHocCommand.objects.filter(credential_id=data['credential'])
qs = AdHocCommand.objects.filter(credential_id=nobody_cred.pk)
self.assertEqual(qs.count(), 1)
self.check_get_list(url, 'nobody', qs)
data['credential'] = credential_id
# Post without inventory (should fail).
inventory_id = data.pop('inventory')
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['inventory'] = inventory_id
self.run_test_ad_hoc_command(inventory=None, expect=400)
# Post without credential (should fail).
credential_id = data.pop('credential')
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['credential'] = credential_id
self.run_test_ad_hoc_command(credential=None, expect=400)
# Post with empty or unsupported module name (empty defaults to command).
module_name = data.pop('module_name')
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command(module_name=None)
self.assertEqual(response['module_name'], 'command')
data['module_name'] = ''
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command(module_name='')
self.assertEqual(response['module_name'], 'command')
data['module_name'] = 'transcombobulator'
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['module_name'] = module_name
self.run_test_ad_hoc_command(module_name='transcombobulator', expect=400)
# Post with empty module args for shell/command modules (should fail),
# empty args for other modules ok.
module_args = data.pop('module_args')
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['module_name'] = 'shell'
self.run_test_ad_hoc_command(module_args=None, expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['module_args'] = ''
self.run_test_ad_hoc_command(module_name='shell', module_args=None, expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['module_name'] = 'ping'
self.run_test_ad_hoc_command(module_name='shell', module_args='', expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=201)
data['module_name'] = module_name
data['module_args'] = module_args
self.run_test_ad_hoc_command(module_name='ping', module_args=None)
# Post with invalid values for other parameters.
data['job_type'] = 'something'
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['job_type'] = 'check'
self.run_test_ad_hoc_command(job_type='something', expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command(job_type='check')
self.assertEqual(response['job_type'], 'check')
data.pop('job_type')
data['verbosity'] = -1
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data.pop('verbosity')
data['forks'] = -1
self.run_test_ad_hoc_command(verbosity=-1, expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data.pop('forks')
data['privilege_escalation'] = 'telekinesis'
self.run_test_ad_hoc_command(forks=-1, expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=400)
data['privilege_escalation'] = 'su'
self.run_test_ad_hoc_command(privilege_escalation='telekinesis', expect=400)
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command(privilege_escalation='su')
self.assertEqual(response['privilege_escalation'], 'su')
data['privilege_escalation'] = 'sudo'
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command(privilege_escalation='sudo')
self.assertEqual(response['privilege_escalation'], 'sudo')
def test_ad_hoc_command_detail(self):
# Post to list to start a new ad hoc command.
url = reverse('api:ad_hoc_command_list')
data = {
'inventory': self.inventory.pk,
'credential': self.credential.pk,
'module_name': 'command',
'module_args': 'uptime',
}
with self.current_user('admin'):
response = self.post(url, data, expect=201)
response = self.run_test_ad_hoc_command()
# Retrieve detail for ad hoc command. Only GET is supported.
url = reverse('api:ad_hoc_command_detail', args=(response['id'],))
@ -649,67 +624,505 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.assertTrue(response['related']['relaunch'])
self.assertTrue(response['related']['events'])
self.assertTrue(response['related']['activity_stream'])
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
response = self.get(url, expect=403)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
response = self.get(url, expect=403)
self.put(url, data, expect=405)
self.patch(url, data, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
response = self.get(url, expect=401)
self.put(url, data, expect=401)
self.patch(url, data, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_cancel(self):
# Post to list to start a new ad hoc command.
url = reverse('api:ad_hoc_command_list')
data = {
'inventory': self.inventory.pk,
'credential': self.credential.pk,
'module_name': 'command',
'module_args': 'uptime',
}
with self.current_user('admin'):
response = self.post(url, data, expect=201)
# Override setting so that ad hoc command isn't actually started.
with self.settings(CELERY_UNIT_TEST=False):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Retrieve the cancel URL, should indicate it can be canceled.
url = reverse('api:ad_hoc_command_cancel', args=(response['id'],))
self.assertEqual(url, response['related']['cancel'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], True)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], True)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# FIXME: Finish test.
# Cancel ad hoc command (before it starts) and verify the can_cancel
# flag is False and attempts to cancel again fail.
with self.current_user('normal'):
self.post(url, {}, expect=202)
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], False)
self.post(url, {}, expect=403)
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['can_cancel'], False)
self.post(url, {}, expect=405)
def test_ad_hoc_command_relaunch(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Retrieve the relaunch URL, should indicate no passwords are needed
# and it can be relaunched. Relaunch and fetch the new command.
url = reverse('api:ad_hoc_command_relaunch', args=(response['id'],))
self.assertEqual(url, response['related']['relaunch'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=201)
self.assertTrue(response['ad_hoc_command'])
self.get(response['url'], expect=200)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=201)
self.assertTrue(response['ad_hoc_command'])
self.get(response['url'], expect=200)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_events_list(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command()
# Check list of ad hoc command events for a specific ad hoc command.
ad_hoc_command_id = response['id']
url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', args=(ad_hoc_command_id,))
self.assertEqual(url, response['related']['events'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], self.inventory.hosts.count())
for result in response['results']:
self.assertEqual(result['ad_hoc_command'], ad_hoc_command_id)
self.assertTrue(result['id'])
self.assertTrue(result['url'])
self.assertEqual(result['event'], 'runner_on_ok')
self.assertFalse(result['failed'])
self.assertTrue(result['changed'])
self.assertTrue(result['host'] in set(self.inventory.hosts.values_list('pk', flat=True)))
self.assertTrue(result['host_name'] in set(self.inventory.hosts.values_list('name', flat=True)))
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], self.inventory.hosts.count())
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
# Test top level ad hoc command events list.
url = reverse('api:ad_hoc_command_event_list')
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2 * self.inventory.hosts.count())
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2 * self.inventory.hosts.count())
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 0)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 0)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_event_detail(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Check ad hoc command event detail view.
ad_hoc_command_event_ids = AdHocCommandEvent.objects.values_list('pk', flat=True)
with self.current_user('admin'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
response = self.get(url, expect=200)
self.assertTrue(response['ad_hoc_command'])
self.assertEqual(response['id'], ahce_id)
self.assertEqual(response['url'], url)
self.assertEqual(response['event'], 'runner_on_ok')
self.assertFalse(response['failed'])
self.assertTrue(response['changed'])
self.assertTrue(response['host'] in set(self.inventory.hosts.values_list('pk', flat=True)))
self.assertTrue(response['host_name'] in set(self.inventory.hosts.values_list('name', flat=True)))
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=200)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
for ahce_id in ad_hoc_command_event_ids:
url = reverse('api:ad_hoc_command_event_detail', args=(ahce_id,))
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_activity_stream(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Check activity stream for ad hoc command. There should only be one
# entry when it was created; other changes made while running should
# not show up.
url = reverse('api:ad_hoc_command_activity_stream_list', args=(response['id'],))
self.assertEqual(url, response['related']['activity_stream'])
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
result = response['results'][0]
self.assertTrue(result['id'])
self.assertTrue(result['url'])
self.assertEqual(result['operation'], 'create')
self.assertTrue(result['changes'])
self.assertTrue(result['timestamp'])
self.assertEqual(result['object1'], 'ad_hoc_command')
self.assertEqual(result['object2'], '')
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
#self.assertEqual(response['count'], 1) # FIXME: Enable once activity stream RBAC is fixed.
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_inventory_ad_hoc_commands_list(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk)
# Test the ad hoc commands list for an inventory. Should only return
# the ad hoc command(s) run against that inventory. Posting should
# start a new ad hoc command and always set the inventory from the URL.
url = reverse('api:inventory_ad_hoc_commands_list', args=(self.inventory.pk,))
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 3)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_host_ad_hoc_commands_list(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command(limit=self.host2.name)
# Test the ad hoc commands list for a host. Should only return the ad
# hoc command(s) run against that host. Posting should start a new ad
# hoc command and always set the inventory and limit based on URL.
url = reverse('api:host_ad_hoc_commands_list', args=(self.host.pk,))
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.host2.name, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 3)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.host.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_group_ad_hoc_commands_list(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command() # self.host + self.host2
response = self.run_test_ad_hoc_command(limit=self.group.name) # self.host
response = self.run_test_ad_hoc_command(limit=self.host2.name) # self.host2
# Test the ad hoc commands list for a group. Should return the ad
# hoc command(s) run against any hosts in that group. Posting should
# start a new ad hoc command and always set the inventory and limit
# based on URL.
url = reverse('api:group_ad_hoc_commands_list', args=(self.group.pk,)) # only self.host
url2 = reverse('api:group_ad_hoc_commands_list', args=(self.group2.pk,)) # self.host + self.host2
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 2)
response = self.get(url2, expect=200)
self.assertEqual(response['count'], 3)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
response = self.run_test_ad_hoc_command(url=url, inventory=self.inventory2.pk, limit=self.group2.name, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 4)
response = self.run_test_ad_hoc_command(url=url, inventory=None, expect=201)
self.assertEqual(response['inventory'], self.inventory.pk)
self.assertEqual(response['limit'], self.group.name)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=403)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_host_ad_hoc_command_events_list(self):
self.skipTest('Not yet implemented')
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
# Test the ad hoc command events list for a host. Should return the
# events only for that particular host.
url = reverse('api:host_ad_hoc_command_events_list', args=(self.host.pk,))
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('normal'):
response = self.get(url, expect=200)
self.assertEqual(response['count'], 1)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('other'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user('nobody'):
self.get(url, expect=403)
self.post(url, {}, expect=405)
self.put(url, {}, expect=405)
self.patch(url, {}, expect=405)
self.delete(url, expect=405)
with self.current_user(None):
self.get(url, expect=401)
self.post(url, {}, expect=401)
self.put(url, {}, expect=401)
self.patch(url, {}, expect=401)
self.delete(url, expect=401)

View File

@ -95,13 +95,13 @@ class AuthTokenProxyTest(BaseTest):
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff)
# should use ip address from other headers when HTTP_X_FORARDED_FOR is blank
# should use ip address from other headers when HTTP_X_FORWARDED_FOR is blank
def test_blank_header_fallback(self):
remote_addr = '192.168.75.1'
auth_token = self._request_auth_token(remote_addr)
client_kwargs = {'HTTP_X_FORARDED_FOR': ''}
client_kwargs = {'HTTP_X_FORWARDED_FOR': ''}
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self.check_me_is_admin(response)