group -> deprecated_group, added migrations, added/removed tests

This commit is contained in:
Wayne Witzel III 2017-04-11 14:03:47 -04:00
parent 7458788562
commit 91f3e665cb
8 changed files with 111 additions and 503 deletions

View File

@ -2153,7 +2153,6 @@ class InventoryTreeView(RetrieveAPIView):
group_children_map = inventory.get_group_children_map()
root_group_pks = inventory.root_groups.order_by('name').values_list('pk', flat=True)
groups_qs = inventory.groups
groups_qs = groups_qs.select_related('inventory')
groups_qs = groups_qs.prefetch_related('inventory_sources')
all_group_data = GroupSerializer(groups_qs, many=True).data
all_group_data_map = dict((x['id'], x) for x in all_group_data)

View File

@ -10,6 +10,8 @@ from psycopg2.extensions import AsIs
# AWX
import awx.main.fields
from awx.main.models import FactLatest
from awx.main.migrations import _inventory_source as invsrc
from awx.main.migrations import _migration_utils as migration_utils
class Migration(migrations.Migration):
@ -20,15 +22,24 @@ class Migration(migrations.Migration):
operations = [
# Inventory Refresh
migrations.RemoveField(
migrations.RenameField(
'InventorySource',
'group',
'deprecated_group'
),
migrations.AlterField(
model_name='inventorysource',
name='group',
name='deprecated_group',
field=models.ForeignKey(related_name='deprecated_inventory_source', default=None, null=True, to='main.Group'),
),
migrations.AlterField(
model_name='inventorysource',
name='inventory',
field=models.ForeignKey(related_name='inventory_sources', default=None, to='main.Inventory', null=True),
),
migrations.RunPython(migration_utils.set_current_apps_for_migrations),
migrations.RunPython(invsrc.remove_manual_inventory_sources),
migrations.RunPython(invsrc.rename_inventory_sources),
# Facts Latest
migrations.CreateModel(

View File

@ -0,0 +1,41 @@
import logging
from django.db.models import Q
logger = logging.getLogger('invsrc_migrations')
def remove_manual_inventory_sources(apps, schema_editor):
'''Previously we would automatically create inventory sources after
Group creation and we would use the parent Group as our interface for the user.
During that process we would create InventorySource that had a source of "manual".
'''
InventorySource = apps.get_model('main', 'InventorySource')
# see models/inventory.py SOURCE_CHOICES - ('', _('Manual'))
InventorySource.objects.filter(source='').delete()
def rename_inventory_sources(apps, schema_editor):
'''Rename existing InventorySource entries using the following format.
{{ inventory_source.name }} - {{ inventory.module }} - {{ number }}
The number will be incremented for each InventorySource for the organization.
'''
Organization = apps.get_model('main', 'Organization')
InventorySource = apps.get_model('main', 'InventorySource')
for org in Organization.objects.iterator():
for i, invsrc in enumerate(InventorySource.objects.filter(Q(inventory__organization=org) |
Q(deprecated_group__inventory__organization=org)).distinct().all()):
inventory = invsrc.deprecated_group.inventory if invsrc.deprecated_group else invsrc.inventory
name = '{0} - {1} - {2}'.format(invsrc.name, inventory.name, i)
invsrc.name = name
invsrc.save()
def remove_inventory_source_with_no_inventory_link(apps, schema_editor):
'''If we cannot determine the Inventory for which an InventorySource exists
we can safely remove it.
'''
InventorySource = apps.get_model('main', 'InventorySource')
InventorySource.objects.filter(Q(inventory__organization=None) & Q(deprecated_group__inventory=None)).delete()

View File

@ -1062,6 +1062,15 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
default=None,
on_delete=models.CASCADE,
)
deprecated_group = models.ForeignKey(
'Group',
related_name='deprecated_inventory_source',
null=True,
default=None,
on_delete=models.CASCADE,
)
update_on_launch = models.BooleanField(
default=False,
)
@ -1105,7 +1114,8 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
self.name = self.name.replace(replace_text, str(self.pk))
super(InventorySource, self).save(update_fields=['name'])
if not getattr(_inventory_updates, 'is_updating', False):
self.inventory.update_computed_fields(update_groups=False, update_hosts=False)
if self.inventory is not None:
self.inventory.update_computed_fields(update_groups=False, update_hosts=False)
def _get_current_status(self):
if self.source:

View File

@ -417,9 +417,6 @@ def activity_stream_update(sender, instance, **kwargs):
def activity_stream_delete(sender, instance, **kwargs):
if not activity_stream_enabled:
return
# Skip recording any inventory source directly associated with a group.
if isinstance(instance, InventorySource) and instance.group:
return
changes = model_to_dict(instance)
object1 = camelcase_to_underscore(instance.__class__.__name__)
activity_entry = ActivityStream(

View File

@ -4,10 +4,7 @@ from awx.api.versioning import reverse
from django.test.client import RequestFactory
from awx.main.models import Role, Group, UnifiedJobTemplate, JobTemplate
from awx.main.access import (
access_registry,
get_user_capabilities
)
from awx.main.access import access_registry
from awx.main.utils import cache_list_capabilities
from awx.api.serializers import JobTemplateSerializer

View File

@ -0,0 +1,45 @@
import pytest
from awx.main.migrations import _inventory_source as invsrc
from awx.main.models import InventorySource
from django.apps import apps
@pytest.mark.django_db
def test_inv_src_manual_removal(inventory_source):
inventory_source.source = ''
inventory_source.save()
assert InventorySource.objects.filter(pk=inventory_source.pk).exists()
invsrc.remove_manual_inventory_sources(apps, None)
assert not InventorySource.objects.filter(pk=inventory_source.pk).exists()
@pytest.mark.django_db
def test_inv_src_rename(inventory_source_factory):
inv_src01 = inventory_source_factory('t1')
invsrc.rename_inventory_sources(apps, None)
inv_src01.refresh_from_db()
# inv-is-t1 is generated in the inventory_source_factory
assert inv_src01.name == 't1 - inv-is-t1 - 0'
@pytest.mark.django_db
def test_inv_src_nolink_removal(inventory_source_factory):
inventory_source_factory('t1')
inv_src02 = inventory_source_factory('t2')
inv_src02.inventory = None
inv_src02.deprecated_group = None
inv_src02.save()
assert InventorySource.objects.count() == 2
invsrc.remove_inventory_source_with_no_inventory_link(apps, None)
objs = InventorySource.objects.all()
assert len(objs) == 1
assert 't1' in objs[0].name

View File

@ -293,498 +293,6 @@ class InventoryTest(BaseTest):
organization=self.organizations[0].id)
self.post(inventory_scripts, data=failed_no_shebang, expect=400, auth=self.get_super_credentials())
def test_main_line(self):
# some basic URLs...
reverse('api:inventory_list')
reverse('api:inventory_detail', args=(self.inventory_a.pk,))
reverse('api:inventory_detail', args=(self.inventory_b.pk,))
hosts = reverse('api:host_list')
groups = reverse('api:group_list')
self.create_test_license_file()
# a super user can add hosts (but inventory ID is required)
inv = Inventory.objects.create(
name = 'test inventory',
organization = self.organizations[0]
)
invalid = dict(name='asdf0.example.com')
new_host_a = dict(name=u'asdf\u0162.example.com:1022', inventory=inv.pk)
new_host_b = dict(name='asdf1.example.com', inventory=inv.pk)
new_host_c = dict(name='127.1.2.3:2022', inventory=inv.pk,
variables=json.dumps({'who': 'what?'}))
new_host_d = dict(name='asdf3.example.com', inventory=inv.pk)
new_host_e = dict(name=u'asdf4.example.com:\u0162', inventory=inv.pk)
host_data0 = self.post(hosts, data=invalid, expect=400, auth=self.get_super_credentials())
host_data0 = self.post(hosts, data=new_host_a, expect=201, auth=self.get_super_credentials())
# Port should be split out into host variables.
host_a = Host.objects.get(pk=host_data0['id'])
self.assertEqual(host_a.name, u'asdf\u0162.example.com')
self.assertEqual(host_a.variables_dict, {'ansible_ssh_port': 1022})
# an org admin can add hosts (try first with invalid port #).
self.post(hosts, data=new_host_e, expect=400, auth=self.get_normal_credentials())
new_host_e['name'] = u'asdf4.example.com'
self.post(hosts, data=new_host_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add hosts
self.post(hosts, data=new_host_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create hosts
inv.admin_role.members.add(self.other_django_user)
host_data3 = self.post(hosts, data=new_host_c, expect=201, auth=self.get_other_credentials())
# Port should be split out into host variables, other variables kept intact.
host_c = Host.objects.get(pk=host_data3['id'])
self.assertEqual(host_c.name, '127.1.2.3')
self.assertEqual(host_c.variables_dict, {'ansible_ssh_port': 2022, 'who': 'what?'})
# hostnames must be unique inside an organization
self.post(hosts, data=new_host_c, expect=400, auth=self.get_other_credentials())
# Verify we can update host via PUT.
host_url3 = host_data3['url']
host_data3['variables'] = ''
host_data3 = self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, '')
self.assertEqual(Host.objects.get(id=host_data3['id']).variables_dict, {})
# Should reject invalid data.
host_data3['variables'] = 'foo: [bar'
self.put(host_url3, data=host_data3, expect=400, auth=self.get_other_credentials())
# Should accept valid JSON or YAML.
host_data3['variables'] = 'bad: monkey'
self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, host_data3['variables'])
self.assertEqual(Host.objects.get(id=host_data3['id']).variables_dict, {'bad': 'monkey'})
host_data3['variables'] = '{"angry": "penguin"}'
self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, host_data3['variables'])
self.assertEqual(Host.objects.get(id=host_data3['id']).variables_dict, {'angry': 'penguin'})
###########################################
# GROUPS
invalid = dict(name='web1')
new_group_a = dict(name='web2', inventory=inv.pk)
new_group_b = dict(name='web3', inventory=inv.pk)
new_group_c = dict(name='web4', inventory=inv.pk)
new_group_d = dict(name='web5', inventory=inv.pk)
new_group_e = dict(name='web6', inventory=inv.pk)
groups = reverse('api:group_list')
self.post(groups, data=invalid, expect=400, auth=self.get_super_credentials())
self.post(groups, data=new_group_a, expect=201, auth=self.get_super_credentials())
# an org admin can add groups
self.post(groups, data=new_group_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add groups
self.post(groups, data=new_group_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create groups
# already done!
self.post(groups, data=new_group_c, expect=201, auth=self.get_other_credentials())
# hostnames must be unique inside an organization
self.post(groups, data=new_group_c, expect=400, auth=self.get_other_credentials())
# Check that we don't allow creating reserved group names.
data = dict(name='all', inventory=inv.pk)
with self.current_user(self.super_django_user):
self.post(groups, data=data, expect=400)
data = dict(name='_meta', inventory=inv.pk)
with self.current_user(self.super_django_user):
self.post(groups, data=data, expect=400)
# A new group should not be able to be added a removed group
del_group = inv.groups.create(name='del')
inv.groups.create(name='nondel')
del_children_url = reverse('api:group_children_list', args=(del_group.pk,))
nondel_url = reverse('api:group_detail',
args=(Group.objects.get(name='nondel').pk,))
assert self.normal_django_user in inv.read_role
del_group.delete()
nondel_detail = self.get(nondel_url, expect=200, auth=self.get_normal_credentials())
self.post(del_children_url, data=nondel_detail, expect=400, auth=self.get_normal_credentials())
#################################################
# HOSTS->inventories POST via subcollection
url = reverse('api:inventory_hosts_list', args=(self.inventory_a.pk,))
new_host_a = dict(name='web100.example.com')
new_host_b = dict(name='web101.example.com')
new_host_c = dict(name='web102.example.com')
new_host_d = dict(name='web103.example.com')
new_host_e = dict(name='web104.example.com')
# a super user can associate hosts with inventories
added_by_collection_a = self.post(url, data=new_host_a, expect=201, auth=self.get_super_credentials())
# an org admin can associate hosts with inventories
self.post(url, data=new_host_b, expect=201, auth=self.get_normal_credentials())
# a normal user cannot associate hosts with inventories
self.post(url, data=new_host_c, expect=403, auth=self.get_nobody_credentials())
# a normal user with edit permission on the inventory can associate hosts with inventories
url5 = reverse('api:inventory_hosts_list', args=(inv.pk,))
added_by_collection_d = self.post(url5, data=new_host_d, expect=201, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 4)
# now remove the host from inventory (still keeps the record)
added_by_collection_d['disassociate'] = 1
self.post(url5, data=added_by_collection_d, expect=204, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 3)
##################################################
# GROUPS->inventories POST via subcollection
root_groups = reverse('api:inventory_root_groups_list', args=(self.inventory_a.pk,))
url = reverse('api:inventory_groups_list', args=(self.inventory_a.pk,))
new_group_a = dict(name='web100')
new_group_b = dict(name='web101')
new_group_c = dict(name='web102')
new_group_d = dict(name='web103')
new_group_e = dict(name='web104')
# a super user can associate groups with inventories
added_by_collection = self.post(url, data=new_group_a, expect=201, auth=self.get_super_credentials())
# an org admin can associate groups with inventories
added_by_collection = self.post(url, data=new_group_b, expect=201, auth=self.get_normal_credentials())
# a normal user cannot associate groups with inventories
added_by_collection = self.post(url, data=new_group_c, expect=403, auth=self.get_nobody_credentials())
# a normal user with edit permissions on the inventory can associate groups with inventories
url5 = reverse('api:inventory_groups_list', args=(inv.pk,))
added_by_collection = self.post(url5, data=new_group_d, expect=201, auth=self.get_other_credentials())
# make sure duplicates give 400s
self.post(url5, data=new_group_d, expect=400, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 5)
# side check: see if root groups URL is operational. These are groups without parents.
root_groups = self.get(root_groups, expect=200, auth=self.get_super_credentials())
self.assertEquals(root_groups['count'], 2)
remove_me = added_by_collection
remove_me['disassociate'] = 1
self.post(url5, data=remove_me, expect=204, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 4)
###################################################
# VARIABLES
vars_a = dict(asdf=1234, dog='fido', cat='fluffy', unstructured=dict(a=[1,2,3],b=dict(x=2,y=3)))
vars_b = dict(asdf=4321, dog='barky', cat='snarf', unstructured=dict(a=[1,2,3],b=dict(x=2,y=3)))
vars_c = dict(asdf=5555, dog='mouse', cat='mogwai', unstructured=dict(a=[3,0,3],b=dict(z=2600)))
# attempting to get a variable object creates it, even though it does not already exist
vdata_url = reverse('api:host_variable_data', args=(added_by_collection_a['id'],))
got = self.get(vdata_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, {})
# super user can create variable objects
# an org admin can create variable objects (defers to inventory permissions)
got = self.put(vdata_url, data=vars_a, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_a)
# verify that we can update things and get them back
got = self.put(vdata_url, data=vars_c, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_c)
got = self.get(vdata_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_c)
# a normal user cannot edit variable objects
self.put(vdata_url, data=vars_a, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory write permissions can edit variable objects...
got = self.put(vdata_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
self.assertEquals(got, vars_b)
###################################################
# VARIABLES -> GROUPS
vars_a = dict(asdf=7777, dog='droopy', cat='battlecat', unstructured=dict(a=[1,1,1],b=dict(x=1,y=2)))
vars_b = dict(asdf=8888, dog='snoopy', cat='cheshire', unstructured=dict(a=[2,2,2],b=dict(x=3,y=4)))
vars_c = dict(asdf=9999, dog='pluto', cat='five', unstructured=dict(a=[3,3,3],b=dict(z=5)))
group = Group.objects.order_by('pk')[0]
vdata1_url = reverse('api:group_variable_data', args=(group.pk,))
# a super user can associate variable objects with groups
got = self.get(vdata1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, {})
put = self.put(vdata1_url, data=vars_a, expect=200, auth=self.get_super_credentials())
self.assertEquals(put, vars_a)
# an org admin can associate variable objects with groups
put = self.put(vdata1_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
# a normal user cannot associate variable objects with groups
put = self.put(vdata1_url, data=vars_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate variable objects with groups
put = self.put(vdata1_url, data=vars_c, expect=200, auth=self.get_normal_credentials())
self.assertEquals(put, vars_c)
###################################################
# VARIABLES -> INVENTORY
vars_a = dict(asdf=9873, dog='lassie', cat='heathcliff', unstructured=dict(a=[1,1,1],b=dict(x=1,y=2)))
vars_b = dict(asdf=2736, dog='benji', cat='garfield', unstructured=dict(a=[2,2,2],b=dict(x=3,y=4)))
vars_c = dict(asdf=7692, dog='buck', cat='sylvester', unstructured=dict(a=[3,3,3],b=dict(z=5)))
vdata_url = reverse('api:inventory_variable_data', args=(self.inventory_a.pk,))
# a super user can associate variable objects with inventory
got = self.get(vdata_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, {})
put = self.put(vdata_url, data=vars_a, expect=200, auth=self.get_super_credentials())
self.assertEquals(put, vars_a)
# an org admin can associate variable objects with inventory
put = self.put(vdata_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
# a normal user cannot associate variable objects with inventory
put = self.put(vdata_url, data=vars_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate variable objects with inventory
put = self.put(vdata_url, data=vars_c, expect=200, auth=self.get_normal_credentials())
self.assertEquals(put, vars_c)
# repeat but request variables in yaml
got = self.get(vdata_url, expect=200,
auth=self.get_normal_credentials(),
accept='application/yaml')
self.assertEquals(got, vars_c)
# repeat but updates variables in yaml
put = self.put(vdata_url, data=vars_c, expect=200,
auth=self.get_normal_credentials(), data_type='yaml',
accept='application/yaml')
self.assertEquals(put, vars_c)
####################################################
# ADDING HOSTS TO GROUPS
groups = Group.objects.order_by('pk')
hosts = Host.objects.order_by('pk')
host1 = hosts[0]
host2 = hosts[1]
host3 = hosts[2]
groups[0].hosts.add(host1)
groups[0].hosts.add(host3)
groups[0].save()
# access
url1 = reverse('api:group_hosts_list', args=(groups[0].pk,))
alt_group_hosts = reverse('api:group_hosts_list', args=(groups[1].pk,))
other_alt_group_hosts = reverse('api:group_hosts_list', args=(groups[2].pk,))
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 2)
self.assertTrue(host1.pk in [x['id'] for x in data['results']])
self.assertTrue(host3.pk in [x['id'] for x in data['results']])
# addition
url = reverse('api:host_detail', args=(host2.pk,))
got = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(got['id'], host2.pk)
self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertTrue(host2.pk in [x['id'] for x in data['results']])
# now add one new completely new host, to test creation+association in one go
new_host = dict(inventory=got['inventory'], name='completelynewhost.example.com', description='...')
self.post(url1, data=new_host, expect=201, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 4)
# You should be able to add an existing host to a group as a new host and have it be copied
existing_host = new_host
self.post(alt_group_hosts, data=existing_host, expect=204, auth=self.get_normal_credentials())
# Not if the variables are different though
existing_host['variables'] = '{"booh": "bah"}'
self.post(other_alt_group_hosts, data=existing_host, expect=400, auth=self.get_normal_credentials())
# removal
got['disassociate'] = 1
self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertFalse(host2.pk in [x['id'] for x in data['results']])
####################################################
# SUBGROUPS
groups = Group.objects.all()
# just some more groups for kicks
inva = Inventory.objects.get(pk=self.inventory_a.pk)
gx1 = Group.objects.create(name='group-X1', inventory=inva)
gx2 = Group.objects.create(name='group-X2', inventory=inva)
gx2.parents.add(gx1)
gx3 = Group.objects.create(name='group-X3', inventory=inva)
gx3.parents.add(gx2)
gx4 = Group.objects.create(name='group-X4', inventory=inva)
gx4.parents.add(gx3)
gx5 = Group.objects.create(name='group-X5', inventory=inva)
gx5.parents.add(gx4)
inva.admin_role.members.add(self.other_django_user)
# data used for testing listing all hosts that are transitive members of a group
g2 = Group.objects.get(name='web4')
nh = Host.objects.create(name='newhost.example.com', inventory=g2.inventory,
created_by=self.super_django_user)
g2.hosts.add(nh)
g2.save()
# a super user can set subgroups
subgroups_url = reverse('api:group_children_list',
args=(Group.objects.get(name='web2').pk,))
child_url = reverse('api:group_detail',
args=(Group.objects.get(name='web4').pk,))
subgroups_url2 = reverse('api:group_children_list',
args=(Group.objects.get(name='web6').pk,))
subgroups_url3 = reverse('api:group_children_list',
args=(Group.objects.get(name='web100').pk,))
reverse('api:group_children_list',
args=(Group.objects.get(name='web101').pk,))
got = self.get(child_url, expect=200, auth=self.get_super_credentials())
self.post(subgroups_url, data=got, expect=204, auth=self.get_super_credentials())
kids = Group.objects.get(name='web2').children.all()
self.assertEqual(len(kids), 1)
checked = self.get(subgroups_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(checked['count'], 1)
# an org admin can set subgroups
self.post(subgroups_url2, data=got, expect=204, auth=self.get_normal_credentials())
# see if we can post a completely new subgroup
new_data = dict(inventory=inv.pk, name='completely new', description='blarg?')
kids = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
self.assertEqual(kids['count'], 1)
posted2 = self.post(subgroups_url2, data=new_data, expect=201, auth=self.get_normal_credentials())
# a group can't be it's own grandparent
subsub = posted2['related']['children']
# this is the grandparent
original_url = reverse('api:group_detail', args=(Group.objects.get(name='web6').pk,))
parent_data = self.get(original_url, expect=200, auth=self.get_super_credentials())
# now posting to kid's children collection...
self.post(subsub, data=parent_data, expect=403, auth=self.get_super_credentials())
with_one_more_kid = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
self.assertEqual(with_one_more_kid['count'], 2)
# double post causes conflict error (actually, should it? -- just got a 204, already associated)
# self.post(subgroups_url2, data=got, expect=409, auth=self.get_normal_credentials())
checked = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
# a normal user cannot set subgroups
self.post(subgroups_url3, data=got, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate subgroups (but not when they belong to different inventories!)
#self.post(subgroups_url3, data=got, expect=204, auth=self.get_other_credentials())
#checked = self.get(subgroups_url3, expect=200, auth=self.get_normal_credentials())
#self.assertEqual(checked['count'], 1)
# slight detour
# can see all hosts under a group, even if it has subgroups
# this URL is NOT postable
all_hosts = reverse('api:group_all_hosts_list',
args=(Group.objects.get(name='web2').pk,))
self.assertEqual(Group.objects.get(name='web2').hosts.count(), 3)
data = self.get(all_hosts, expect=200, auth=self.get_normal_credentials())
self.post(all_hosts, data=dict(id=123456, msg='spam'), expect=405, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 4)
# now post it back to remove it, by adding the disassociate bit
result = checked['results'][0]
result['disassociate'] = 1
self.post(subgroups_url3, data=result, expect=204, auth=self.get_other_credentials())
checked = self.get(subgroups_url3, expect=200, auth=self.get_normal_credentials())
self.assertEqual(checked['count'], 0)
# try to double disassociate to see what happens (should no-op)
self.post(subgroups_url3, data=result, expect=204, auth=self.get_other_credentials())
# removed group should be automatically marked inactive once it no longer has any parents.
removed_group = Group.objects.get(pk=result['id'])
self.assertTrue(removed_group.parents.count())
for parent in removed_group.parents.all():
parent_children_url = reverse('api:group_children_list', args=(parent.pk,))
data = {'id': removed_group.pk, 'disassociate': 1}
self.post(parent_children_url, data, expect=204, auth=self.get_super_credentials())
removed_group = Group.objects.get(pk=result['id'])
# Removing a group from a hierarchy should migrate its children to the
# parent. The group itself will be deleted (marked inactive), and all
# relationships removed.
url = reverse('api:group_children_list', args=(gx2.pk,))
data = {
'id': gx3.pk,
'disassociate': 1,
}
with self.current_user(self.super_django_user):
self.post(url, data, expect=204)
gx3 = Group.objects.get(pk=gx3.pk)
self.assertFalse(gx3 in gx2.children.all())
#self.assertTrue(gx4 in gx2.children.all())
# Try with invalid hostnames and invalid IPs.
hosts = reverse('api:host_list')
invalid_expect = 400 # hostname validation is disabled for now.
data = dict(name='', inventory=inv.pk)
with self.current_user(self.super_django_user):
self.post(hosts, data=data, expect=400)
#data = dict(name='not a valid host name', inventory=inv.pk)
#with self.current_user(self.super_django_user):
# response = self.post(hosts, data=data, expect=invalid_expect)
data = dict(name='validhost:99999', inventory=inv.pk)
with self.current_user(self.super_django_user):
self.post(hosts, data=data, expect=invalid_expect)
#data = dict(name='123.234.345.456', inventory=inv.pk)
#with self.current_user(self.super_django_user):
# response = self.post(hosts, data=data, expect=invalid_expect)
#data = dict(name='2001::1::3F', inventory=inv.pk)
#with self.current_user(self.super_django_user):
# response = self.post(hosts, data=data, expect=invalid_expect)
#########################################################
# FIXME: TAGS
# the following objects can be tagged and the tags can be read
# inventory
# host records
# group records
# variable records
# this may just be in a seperate test file called 'tags'
#########################################################
# FIXME: RELATED FIELDS
# on an inventory resource, I can see related resources for hosts and groups and permissions
# and these work
# on a host resource, I can see related resources variables and inventories
# and these work
# on a group resource, I can see related resources for variables, inventories, and children
# and these work
def test_get_inventory_script_view(self):
i_a = self.inventory_a
i_a.variables = json.dumps({'i-vars': 123})