mirror of
https://github.com/ansible/awx.git
synced 2026-05-09 02:17:37 -02:30
Merge remote-tracking branch 'ansible/devel' into merge-devel
This commit is contained in:
144
awx/main/tests/functional/api/test_organization_counts.py
Normal file
144
awx/main/tests/functional/api/test_organization_counts.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import pytest
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
@pytest.fixture
|
||||
def resourced_organization(organization, project, team, inventory, user):
|
||||
admin_user = user('test-admin', True)
|
||||
member_user = user('org-member')
|
||||
|
||||
# Associate one resource of every type with the organization
|
||||
organization.users.add(member_user)
|
||||
organization.admins.add(admin_user)
|
||||
organization.projects.add(project)
|
||||
# organization.teams.create(name='org-team')
|
||||
# inventory = organization.inventories.create(name="associated-inv")
|
||||
project.jobtemplates.create(name="test-jt",
|
||||
description="test-job-template-desc",
|
||||
inventory=inventory,
|
||||
playbook="test_playbook.yml")
|
||||
|
||||
return organization
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_org_counts_admin(resourced_organization, user, get):
|
||||
# Check that all types of resources are counted by a superuser
|
||||
external_admin = user('admin', True)
|
||||
response = get(reverse('api:organization_list', args=[]), external_admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
counts = response.data['results'][0]['summary_fields']['related_field_counts']
|
||||
assert counts == {
|
||||
'users': 1,
|
||||
'admins': 1,
|
||||
'job_templates': 1,
|
||||
'projects': 1,
|
||||
'inventories': 1,
|
||||
'teams': 1
|
||||
}
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_org_counts_member(resourced_organization, get):
|
||||
# Check that a non-admin user can only see the full project and
|
||||
# user count, consistent with the RBAC rules
|
||||
member_user = resourced_organization.users.get(username='org-member')
|
||||
response = get(reverse('api:organization_list', args=[]), member_user)
|
||||
assert response.status_code == 200
|
||||
|
||||
counts = response.data['results'][0]['summary_fields']['related_field_counts']
|
||||
|
||||
assert counts == {
|
||||
'users': 1, # User can see themselves
|
||||
'admins': 0,
|
||||
'job_templates': 0,
|
||||
'projects': 1, # Projects are shared with all the organization
|
||||
'inventories': 0,
|
||||
'teams': 0
|
||||
}
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_new_org_zero_counts(user, post):
|
||||
# Check that a POST to the organization list endpoint returns
|
||||
# correct counts, including the new record
|
||||
org_list_url = reverse('api:organization_list', args=[])
|
||||
post_response = post(url=org_list_url, data={'name': 'test organization',
|
||||
'description': ''}, user=user('admin', True))
|
||||
assert post_response.status_code == 201
|
||||
|
||||
new_org_list = post_response.render().data
|
||||
counts_dict = new_org_list['summary_fields']['related_field_counts']
|
||||
assert counts_dict == {
|
||||
'users': 0,
|
||||
'admins': 0,
|
||||
'job_templates': 0,
|
||||
'projects': 0,
|
||||
'inventories': 0,
|
||||
'teams': 0
|
||||
}
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_two_organizations(resourced_organization, organizations, user, get):
|
||||
# Check correct results for two organizations are returned
|
||||
external_admin = user('admin', True)
|
||||
organization_zero = organizations(1)[0]
|
||||
response = get(reverse('api:organization_list', args=[]), external_admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
org_id_full = resourced_organization.id
|
||||
org_id_zero = organization_zero.id
|
||||
counts = {}
|
||||
for i in range(2):
|
||||
org_id = response.data['results'][i]['id']
|
||||
counts[org_id] = response.data['results'][i]['summary_fields']['related_field_counts']
|
||||
|
||||
assert counts[org_id_full] == {
|
||||
'users': 1,
|
||||
'admins': 1,
|
||||
'job_templates': 1,
|
||||
'projects': 1,
|
||||
'inventories': 1,
|
||||
'teams': 1
|
||||
}
|
||||
assert counts[org_id_zero] == {
|
||||
'users': 0,
|
||||
'admins': 0,
|
||||
'job_templates': 0,
|
||||
'projects': 0,
|
||||
'inventories': 0,
|
||||
'teams': 0
|
||||
}
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_JT_associated_with_project(organizations, project, user, get):
|
||||
# Check that adding a project to an organization gets the project's JT
|
||||
# included in the organization's JT count
|
||||
external_admin = user('admin', True)
|
||||
two_orgs = organizations(2)
|
||||
organization = two_orgs[0]
|
||||
other_org = two_orgs[1]
|
||||
|
||||
unrelated_inv = other_org.inventories.create(name='not-in-organization')
|
||||
project.jobtemplates.create(name="test-jt",
|
||||
description="test-job-template-desc",
|
||||
inventory=unrelated_inv,
|
||||
playbook="test_playbook.yml")
|
||||
organization.projects.add(project)
|
||||
|
||||
response = get(reverse('api:organization_list', args=[]), external_admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
org_id = organization.id
|
||||
counts = {}
|
||||
for i in range(2):
|
||||
working_id = response.data['results'][i]['id']
|
||||
counts[working_id] = response.data['results'][i]['summary_fields']['related_field_counts']
|
||||
|
||||
assert counts[org_id] == {
|
||||
'users': 0,
|
||||
'admins': 0,
|
||||
'job_templates': 1,
|
||||
'projects': 1,
|
||||
'inventories': 0,
|
||||
'teams': 0
|
||||
}
|
||||
|
||||
0
awx/main/tests/functional/migrations/__init__.py
Normal file
0
awx/main/tests/functional/migrations/__init__.py
Normal file
84
awx/main/tests/functional/migrations/conftest.py
Normal file
84
awx/main/tests/functional/migrations/conftest.py
Normal file
@@ -0,0 +1,84 @@
|
||||
# Python
|
||||
import pytest
|
||||
from datetime import timedelta
|
||||
|
||||
# Django
|
||||
from django.utils import timezone
|
||||
from django.conf import settings
|
||||
|
||||
# AWX
|
||||
from awx.fact.models.fact import Fact, FactHost
|
||||
|
||||
# MongoEngine
|
||||
from mongoengine.connection import ConnectionError
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mongo_db(request):
|
||||
marker = request.keywords.get('mongo_db', None)
|
||||
if marker:
|
||||
# Drop mongo database
|
||||
try:
|
||||
db = Fact._get_db()
|
||||
db.connection.drop_database(settings.MONGO_DB)
|
||||
except ConnectionError:
|
||||
raise
|
||||
|
||||
@pytest.fixture
|
||||
def inventories(organization):
|
||||
def rf(inventory_count=1):
|
||||
invs = []
|
||||
for i in xrange(0, inventory_count):
|
||||
inv = organization.inventories.create(name="test-inv-%d" % i, description="test-inv-desc")
|
||||
invs.append(inv)
|
||||
return invs
|
||||
return rf
|
||||
|
||||
'''
|
||||
hosts naming convension should align with hosts_mongo
|
||||
'''
|
||||
@pytest.fixture
|
||||
def hosts(organization):
|
||||
def rf(host_count=1, inventories=[]):
|
||||
hosts = []
|
||||
for inv in inventories:
|
||||
for i in xrange(0, host_count):
|
||||
name = '%s-host-%s' % (inv.name, i)
|
||||
host = inv.hosts.create(name=name)
|
||||
hosts.append(host)
|
||||
return hosts
|
||||
return rf
|
||||
|
||||
@pytest.fixture
|
||||
def hosts_mongo(organization):
|
||||
def rf(host_count=1, inventories=[]):
|
||||
hosts = []
|
||||
for inv in inventories:
|
||||
for i in xrange(0, host_count):
|
||||
name = '%s-host-%s' % (inv.name, i)
|
||||
(host, created) = FactHost.objects.get_or_create(hostname=name, inventory_id=inv.id)
|
||||
hosts.append(host)
|
||||
return hosts
|
||||
return rf
|
||||
|
||||
@pytest.fixture
|
||||
def fact_scans(organization, fact_ansible_json, fact_packages_json, fact_services_json):
|
||||
def rf(fact_scans=1, inventories=[], timestamp_epoch=timezone.now()):
|
||||
facts_json = {}
|
||||
facts = []
|
||||
module_names = ['ansible', 'services', 'packages']
|
||||
|
||||
facts_json['ansible'] = fact_ansible_json
|
||||
facts_json['packages'] = fact_packages_json
|
||||
facts_json['services'] = fact_services_json
|
||||
|
||||
for inv in inventories:
|
||||
for host_obj in FactHost.objects.filter(inventory_id=inv.id):
|
||||
timestamp_current = timestamp_epoch
|
||||
for i in xrange(0, fact_scans):
|
||||
for module_name in module_names:
|
||||
facts.append(Fact.add_fact(timestamp_current, facts_json[module_name], host_obj, module_name))
|
||||
timestamp_current += timedelta(days=1)
|
||||
return facts
|
||||
return rf
|
||||
|
||||
|
||||
79
awx/main/tests/functional/migrations/test_fact.py
Normal file
79
awx/main/tests/functional/migrations/test_fact.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import pytest
|
||||
import datetime
|
||||
|
||||
from django.apps import apps
|
||||
|
||||
from awx.main.models.inventory import Host
|
||||
from awx.main.models.fact import Fact
|
||||
|
||||
from awx.main.migrations import _system_tracking as system_tracking
|
||||
|
||||
from awx.fact.models.fact import Fact as FactMongo
|
||||
from awx.fact.models.fact import FactVersion, FactHost
|
||||
|
||||
def micro_to_milli(micro):
|
||||
return micro - (((int)(micro / 1000)) * 1000)
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.mongo_db
|
||||
def test_migrate_facts(inventories, hosts, hosts_mongo, fact_scans):
|
||||
inventory_objs = inventories(2)
|
||||
hosts(2, inventory_objs)
|
||||
hosts_mongo(2, inventory_objs)
|
||||
facts_known = fact_scans(2, inventory_objs)
|
||||
|
||||
(migrated_count, not_migrated_count) = system_tracking.migrate_facts(apps, None)
|
||||
# 4 hosts w/ 2 fact scans each, 3 modules each scan
|
||||
assert migrated_count == 24
|
||||
assert not_migrated_count == 0
|
||||
|
||||
|
||||
for fact_mongo, fact_version in facts_known:
|
||||
host = Host.objects.get(inventory_id=fact_mongo.host.inventory_id, name=fact_mongo.host.hostname)
|
||||
t = fact_mongo.timestamp - datetime.timedelta(microseconds=micro_to_milli(fact_mongo.timestamp.microsecond))
|
||||
fact = Fact.objects.filter(host_id=host.id, timestamp=t, module=fact_mongo.module)
|
||||
|
||||
assert len(fact) == 1
|
||||
assert fact[0] is not None
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.mongo_db
|
||||
def test_migrate_facts_hostname_does_not_exist(inventories, hosts, hosts_mongo, fact_scans):
|
||||
inventory_objs = inventories(2)
|
||||
host_objs = hosts(1, inventory_objs)
|
||||
hosts_mongo(2, inventory_objs)
|
||||
facts_known = fact_scans(2, inventory_objs)
|
||||
|
||||
(migrated_count, not_migrated_count) = system_tracking.migrate_facts(apps, None)
|
||||
assert migrated_count == 12
|
||||
assert not_migrated_count == 12
|
||||
|
||||
|
||||
for fact_mongo, fact_version in facts_known:
|
||||
# Facts that don't match the only host will not be migrated
|
||||
if fact_mongo.host.hostname != host_objs[0].name:
|
||||
continue
|
||||
|
||||
host = Host.objects.get(inventory_id=fact_mongo.host.inventory_id, name=fact_mongo.host.hostname)
|
||||
t = fact_mongo.timestamp - datetime.timedelta(microseconds=micro_to_milli(fact_mongo.timestamp.microsecond))
|
||||
fact = Fact.objects.filter(host_id=host.id, timestamp=t, module=fact_mongo.module)
|
||||
|
||||
assert len(fact) == 1
|
||||
assert fact[0] is not None
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.mongo_db
|
||||
def test_drop_system_tracking_db(inventories, hosts, hosts_mongo, fact_scans):
|
||||
inventory_objs = inventories(1)
|
||||
hosts_mongo(1, inventory_objs)
|
||||
fact_scans(1, inventory_objs)
|
||||
|
||||
assert FactMongo.objects.all().count() > 0
|
||||
assert FactVersion.objects.all().count() > 0
|
||||
assert FactHost.objects.all().count() > 0
|
||||
|
||||
system_tracking.drop_system_tracking_db()
|
||||
|
||||
assert FactMongo.objects.all().count() == 0
|
||||
assert FactVersion.objects.all().count() == 0
|
||||
assert FactHost.objects.all().count() == 0
|
||||
Reference in New Issue
Block a user