mirror of
https://github.com/ansible/awx.git
synced 2026-07-04 04:48:02 -02:30
AC-505. Initial working task/tests for cloud inventory import.
This commit is contained in:
@@ -1,13 +1,22 @@
|
||||
# Copyright (c) 2013 AnsibleWorks, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test.utils import override_settings
|
||||
|
||||
# AWX
|
||||
from awx.main.models import *
|
||||
from awx.main.tests.base import BaseTest
|
||||
from awx.main.tests.base import BaseTest, BaseTransactionTest
|
||||
|
||||
__all__ = ['InventoryTest', 'InventoryUpdatesTest']
|
||||
|
||||
class InventoryTest(BaseTest):
|
||||
|
||||
@@ -931,3 +940,85 @@ class InventoryTest(BaseTest):
|
||||
set([h_e.pk]))
|
||||
self.assertEqual(set(h_e.all_groups.values_list('pk', flat=True)),
|
||||
set([g_e.pk]))
|
||||
|
||||
@override_settings(CELERY_ALWAYS_EAGER=True,
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
|
||||
class InventoryUpdatesTest(BaseTransactionTest):
|
||||
|
||||
def setUp(self):
|
||||
super(InventoryUpdatesTest, self).setUp()
|
||||
self.setup_users()
|
||||
self.organization = self.make_organizations(self.super_django_user, 1)[0]
|
||||
self.organization.admins.add(self.normal_django_user)
|
||||
self.organization.users.add(self.other_django_user)
|
||||
self.organization.users.add(self.normal_django_user)
|
||||
self.inventory = self.organization.inventories.create(name='Cloud Inventory')
|
||||
self.group = self.inventory.groups.create(name='Cloud Group')
|
||||
# Pass test database name in environment for use by the inventory_import
|
||||
# management command.
|
||||
os.environ['AWX_TEST_DATABASE_NAME'] = settings.DATABASES['default']['NAME']
|
||||
|
||||
def update_inventory_source(self, group, **kwargs):
|
||||
inventory_source = group.inventory_source
|
||||
update_fields = []
|
||||
for field, value in kwargs.items():
|
||||
if getattr(inventory_source, field) != value:
|
||||
setattr(inventory_source, field, value)
|
||||
update_fields.append(field)
|
||||
if update_fields:
|
||||
inventory_source.save(update_fields=update_fields)
|
||||
return inventory_source
|
||||
|
||||
def check_inventory_update(self, inventory_source, should_fail=False,
|
||||
**kwargs):
|
||||
inventory_update = kwargs.pop('inventory_update', None)
|
||||
should_error = kwargs.pop('should_error', False)
|
||||
if not inventory_update:
|
||||
inventory_update = inventory_source.update(**kwargs)
|
||||
self.assertTrue(inventory_update)
|
||||
inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk)
|
||||
#print inventory_update.result_stdout
|
||||
if should_error:
|
||||
self.assertEqual(inventory_update.status, 'error',
|
||||
inventory_update.result_stdout + \
|
||||
inventory_update.result_traceback)
|
||||
elif should_fail:
|
||||
self.assertEqual(inventory_update.status, 'failed',
|
||||
inventory_update.result_stdout + \
|
||||
inventory_update.result_traceback)
|
||||
elif should_fail is False:
|
||||
self.assertEqual(inventory_update.status, 'successful',
|
||||
inventory_update.result_stdout + \
|
||||
inventory_update.result_traceback)
|
||||
else:
|
||||
pass # If should_fail is None, we don't care.
|
||||
|
||||
def check_inventory_source(self, inventory_source):
|
||||
inventory_source = InventorySource.objects.get(pk=inventory_source.pk)
|
||||
inventory = inventory_source.group.inventory
|
||||
self.assertTrue(inventory_source.can_update)
|
||||
self.assertEqual(inventory.groups.count(), 1)
|
||||
self.assertEqual(inventory.hosts.count(), 0)
|
||||
self.check_inventory_update(inventory_source)
|
||||
self.assertNotEqual(inventory.groups.count(), 1)
|
||||
self.assertNotEqual(inventory.hosts.count(), 0)
|
||||
|
||||
def test_update_from_ec2(self):
|
||||
source_username = getattr(settings, 'TEST_AWS_ACCESS_KEY_ID', '')
|
||||
source_password = getattr(settings, 'TEST_AWS_SECRET_ACCESS_KEY', '')
|
||||
if not all([source_username, source_password]):
|
||||
self.skipTest('no test ec2 credentials defined!')
|
||||
inventory_source = self.update_inventory_source(self.group,
|
||||
source='ec2', source_username=source_username,
|
||||
source_password=source_password)
|
||||
self.check_inventory_source(inventory_source)
|
||||
|
||||
def test_update_from_rackspace(self):
|
||||
source_username = getattr(settings, 'TEST_RACKSPACE_USERNAME', '')
|
||||
source_password = getattr(settings, 'TEST_RACKSPACE_API_KEY', '')
|
||||
if not all([source_username, source_password]):
|
||||
self.skipTest('no test rackspace credentials defined!')
|
||||
inventory_source = self.update_inventory_source(self.group,
|
||||
source='rackspace', source_username=source_username,
|
||||
source_password=source_password)
|
||||
self.check_inventory_source(inventory_source)
|
||||
|
||||
Reference in New Issue
Block a user