Merge branch 'rbac' of github.com:ansible/ansible-tower into rbac

This commit is contained in:
Wayne Witzel III
2016-03-22 09:24:56 -04:00
92 changed files with 2947 additions and 157966 deletions

View File

@@ -5,6 +5,7 @@
from django.db import connection
from django.db.models.signals import (
post_init,
pre_save,
post_save,
post_delete,
)
@@ -83,69 +84,8 @@ def resolve_role_field(obj, field):
return ret
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
"""Descriptor Implict Role Fields. Auto-creates the appropriate role entry on first access"""
def __init__(self, role_name, role_description, permissions, parent_role, *args, **kwargs):
self.role_name = role_name
self.role_description = role_description if role_description else ""
self.permissions = permissions
self.parent_role = parent_role
super(ImplicitRoleDescriptor, self).__init__(*args, **kwargs)
def __get__(self, instance, instance_type=None):
role = super(ImplicitRoleDescriptor, self).__get__(instance, instance_type)
if role:
return role
if not self.role_name:
raise FieldError('Implicit role missing `role_name`')
if connection.needs_rollback:
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance)
setattr(instance, self.field.name, role)
if instance.pk:
instance.save(update_fields=[self.field.name,])
if self.parent_role:
# Add all non-null parent roles as parents
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
for path in paths:
if path.startswith("singleton:"):
parents = [Role.singleton(path[10:])]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
role.parents.add(parent)
if self.permissions is not None:
permissions = RolePermission(
role=role,
resource=instance,
auto_generated=True
)
if 'all' in self.permissions and self.permissions['all']:
del self.permissions['all']
self.permissions['create'] = True
self.permissions['read'] = True
self.permissions['write'] = True
self.permissions['update'] = True
self.permissions['delete'] = True
self.permissions['scm_update'] = True
self.permissions['use'] = True
self.permissions['execute'] = True
for k,v in self.permissions.items():
setattr(permissions, k, v)
permissions.save()
return role
pass
class ImplicitRoleField(models.ForeignKey):
@@ -153,7 +93,7 @@ class ImplicitRoleField(models.ForeignKey):
def __init__(self, role_name=None, role_description=None, permissions=None, parent_role=None, *args, **kwargs):
self.role_name = role_name
self.role_description = role_description
self.role_description = role_description if role_description else ""
self.permissions = permissions
self.parent_role = parent_role
@@ -164,18 +104,15 @@ class ImplicitRoleField(models.ForeignKey):
def contribute_to_class(self, cls, name):
super(ImplicitRoleField, self).contribute_to_class(cls, name)
setattr(cls,
self.name,
ImplicitRoleDescriptor(
self.role_name,
self.role_description,
self.permissions,
self.parent_role,
self
)
)
post_init.connect(self._post_init, cls, True)
post_save.connect(self._post_save, cls, True)
setattr(cls, self.name, ImplicitRoleDescriptor(self))
if not hasattr(cls, '__implicit_role_fields'):
setattr(cls, '__implicit_role_fields', [])
getattr(cls, '__implicit_role_fields').append(self)
post_init.connect(self._post_init, cls, True, dispatch_uid='implicit-role-post-init')
pre_save.connect(self._pre_save, cls, True, dispatch_uid='implicit-role-pre-save')
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True)
add_lazy_relation(cls, self, "self", self.bind_m2m_changed)
@@ -233,24 +170,82 @@ class ImplicitRoleField(models.ForeignKey):
def _post_init(self, instance, *args, **kwargs):
if not self.parent_role:
return
original_parent_roles = dict()
if instance.pk:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
original_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance)
if not instance.pk:
return
setattr(instance, '__original_parent_roles', original_parent_roles)
self._calc_original_parents(instance)
def _create_role_instance_if_not_exists(self, instance):
role = getattr(instance, self.name, None)
if role:
return role
role = Role.objects.create(
name=self.role_name,
description=self.role_description)
setattr(instance, self.name, role)
def _patch_role_content_object_and_grant_permissions(self, instance):
role = getattr(instance, self.name)
role.content_object = instance
role.save()
if self.permissions is not None:
permissions = RolePermission(
role=role,
resource=instance,
auto_generated=True
)
if 'all' in self.permissions and self.permissions['all']:
del self.permissions['all']
self.permissions['create'] = True
self.permissions['read'] = True
self.permissions['write'] = True
self.permissions['update'] = True
self.permissions['delete'] = True
self.permissions['scm_update'] = True
self.permissions['use'] = True
self.permissions['execute'] = True
for k,v in self.permissions.items():
setattr(permissions, k, v)
permissions.save()
def _pre_save(self, instance, *args, **kwargs):
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._create_role_instance_if_not_exists(instance)
def _post_save(self, instance, created, *args, **kwargs):
if created:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._patch_role_content_object_and_grant_permissions(instance)
original_parent_roles = getattr(instance, '__original_parent_roles')
if created:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
original_parent_roles[implicit_role_field.name] = set()
new_parent_roles = dict()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
new_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance)
setattr(instance, '__original_parent_roles', new_parent_roles)
with batch_role_ancestor_rebuilding():
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = original_parent_roles[implicit_role_field.name]
new_parents = new_parent_roles[implicit_role_field.name]
cur_role.parents.remove(*list(original_parents - new_parents))
cur_role.parents.add(*list(new_parents - original_parents))
def _calc_original_parents(self, instance):
if not hasattr(self, '__original_parent_roles'):
setattr(self, '__original_parent_roles', set()) # do not just self.__original_parent_roles=[], it's not the same here, apparently.
# NOTE: The above setattr is required to be called bofore
# _resolve_parent_roles because we can end up recursing, so the enclosing
# if not hasattr protects against this.
original_parent_roles = self._resolve_parent_roles(instance)
setattr(self, '__original_parent_roles', original_parent_roles)
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
@@ -262,35 +257,10 @@ class ImplicitRoleField(models.ForeignKey):
parent_roles.add(parent)
return parent_roles
def _post_save(self, instance, created, *args, **kwargs):
# Ensure that our field gets initialized after our first save
this_role = getattr(instance, self.name)
# As object relations change, the role hierarchy might also change if the relations
# that changed were referenced in our magic parent_role field. This code synchronizes
# these changes.
if not self.parent_role:
return
if created:
self._calc_original_parents(instance)
return
original_parents = getattr(self, '__original_parent_roles')
new_parents = self._resolve_parent_roles(instance)
with batch_role_ancestor_rebuilding():
for role in original_parents - new_parents:
this_role.parents.remove(role)
for role in new_parents - original_parents:
this_role.parents.add(role)
setattr(self, '__original_parent_roles', new_parents)
def _post_delete(self, instance, *args, **kwargs):
this_role = getattr(instance, self.name)
children = [c for c in this_role.children.all()]
this_role.delete()
for child in children:
child.rebuild_role_ancestor_list()
with batch_role_ancestor_rebuilding():
for child in children:
child.rebuild_role_ancestor_list()

View File

@@ -67,12 +67,12 @@ class FactCacheReceiver(object):
self.timestamp = datetime.fromtimestamp(date_key, None)
# Update existing Fact entry
fact_obj = Fact.objects.filter(host__id=host_obj.id, module=module_name, timestamp=self.timestamp)
if fact_obj:
try:
fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp)
fact_obj.facts = facts
fact_obj.save()
logger.info('Updated existing fact <%s>' % (fact_obj.id))
else:
except Fact.DoesNotExist:
# Create new Fact entry
fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts)
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))

View File

@@ -108,6 +108,8 @@ class SimpleDAG(object):
return "inventory_update"
elif type(obj) == ProjectUpdate:
return "project_update"
elif type(obj) == SystemJob:
return "system_job"
return "unknown"
def get_dependencies(self, obj):

View File

@@ -13,7 +13,7 @@ class Migration(migrations.Migration):
dependencies = [
('taggit', '0002_auto_20150616_2121'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('main', '0002_v300_changes'),
('main', '0002_v300_tower_settings_changes'),
]
operations = [

View File

@@ -8,7 +8,7 @@ import jsonbfield.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0003_v300_changes'),
('main', '0003_v300_notification_changes'),
]
operations = [

View File

@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from awx.main.migrations import _system_tracking as system_tracking
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0004_v300_fact_changes'),
]
operations = [
migrations.RunPython(system_tracking.migrate_facts),
]

View File

@@ -8,7 +8,7 @@ from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0004_v300_changes'),
('main', '0005_v300_migrate_facts'),
]
operations = [

View File

@@ -14,7 +14,7 @@ class Migration(migrations.Migration):
('taggit', '0002_auto_20150616_2121'),
('contenttypes', '0002_remove_content_type_name'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('main', '0005_v300_active_flag_removal'),
('main', '0006_v300_active_flag_removal'),
]
operations = [

View File

@@ -8,7 +8,7 @@ from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0006_v300_rbac_changes'),
('main', '0007_v300_rbac_changes'),
]
operations = [

View File

@@ -107,7 +107,7 @@ def create_system_job_templates(apps, schema_editor):
class Migration(migrations.Migration):
dependencies = [
('main', '0007_v300_rbac_migrations'),
('main', '0008_v300_rbac_migrations'),
]
operations = [

View File

@@ -0,0 +1,47 @@
from awx.fact.models import FactVersion
from mongoengine.connection import ConnectionError
from pymongo.errors import OperationFailure
from django.conf import settings
def drop_system_tracking_db():
try:
db = FactVersion._get_db()
db.connection.drop_database(settings.MONGO_DB)
except ConnectionError:
# TODO: Log this. Not a deal-breaker. Just let the user know they
# may need to manually drop/delete the database.
pass
except OperationFailure:
# TODO: This means the database was up but something happened when we tried to query it
pass
def migrate_facts(apps, schema_editor):
Fact = apps.get_model('main', "Fact")
Host = apps.get_model('main', "Host")
try:
n = FactVersion.objects.all().count()
except ConnectionError:
# TODO: Let the user know about the error. Likely this is
# a new install and we just don't need to do this
return (0, 0)
except OperationFailure:
# TODO: This means the database was up but something happened when we tried to query it
return (0, 0)
migrated_count = 0
not_migrated_count = 0
for factver in FactVersion.objects.all():
fact_obj = factver.fact
try:
host = Host.objects.only('id').get(inventory__id=factver.host.inventory_id, name=factver.host.hostname)
Fact.objects.create(host_id=host.id, timestamp=fact_obj.timestamp, module=fact_obj.module, facts=fact_obj.fact).save()
migrated_count += 1
except Host.DoesNotExist:
# TODO: Log this. No host was found to migrate the facts to.
# This isn't a hard error. Just something the user would want to know.
not_migrated_count += 1
drop_system_tracking_db()
return (migrated_count, not_migrated_count)

View File

@@ -1085,6 +1085,13 @@ class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions):
def cache_timeout_blocked(self):
return False
@property
def notifiers(self):
base_notifiers = Notifier.objects.filter(active=True)
error_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_errors__in=[self]))
success_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_success__in=[self]))
any_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_any__in=[self]))
return dict(error=list(error_notifiers), success=list(success_notifiers), any=list(any_notifiers))
class SystemJob(UnifiedJob, SystemJobOptions):

View File

@@ -462,9 +462,11 @@ def activity_stream_associate(sender, instance, **kwargs):
obj2_id = entity_acted
obj2_actual = obj2.objects.get(id=obj2_id)
object2 = camelcase_to_underscore(obj2.__name__)
# Skip recording any inventory source changes here.
# Skip recording any inventory source, or system job template changes here.
if isinstance(obj1, InventorySource) or isinstance(obj2_actual, InventorySource):
continue
if isinstance(obj1, SystemJobTemplate) or isinstance(obj2_actual, SystemJobTemplate):
continue
activity_entry = ActivityStream(
operation=action,
object1=object1,

View File

@@ -14,7 +14,6 @@ import pipes
import re
import shutil
import stat
import subprocess
import tempfile
import thread
import time
@@ -169,30 +168,6 @@ def notify_task_runner(metadata_dict):
queue = FifoQueue('tower_task_manager')
queue.push(metadata_dict)
@task()
def mongodb_control(cmd):
# Sanity check: Do not send arbitrary commands.
if cmd not in ('start', 'stop'):
raise ValueError('Only "start" and "stop" are allowed.')
# Either start or stop mongo, as requested.
p = subprocess.Popen('sudo service mongod %s' % cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
p.wait()
# Check to make sure the stop actually succeeded
p = subprocess.Popen('pidof mongod', shell=True)
shutdown_failed = p.wait() == 0
# If there was an error, log it.
if err:
logger.error(err)
if cmd == 'stop' and shutdown_failed:
p = subprocess.Popen('sudo mongod --shutdown -f /etc/mongod.conf', shell=True)
p.wait()
@task(bind=True)
def handle_work_success(self, result, task_actual):
if task_actual['type'] == 'project_update':
@@ -215,6 +190,11 @@ def handle_work_success(self, result, task_actual):
instance_name = instance.module_name
notifiers = [] # TODO: Ad-hoc commands need to notify someone
friendly_name = "AdHoc Command"
elif task_actual['type'] == 'system_job':
instance = SystemJob.objects.get(id=task_actual['id'])
instance_name = instance.system_job_template.name
notifiers = instance.system_job_template.notifiers
friendly_name = "System Job"
else:
return
notification_body = instance.notification_data()
@@ -258,6 +238,11 @@ def handle_work_error(self, task_id, subtasks=None):
instance_name = instance.module_name
notifiers = []
friendly_name = "AdHoc Command"
elif task_actual['type'] == 'system_job':
instance = SystemJob.objects.get(id=task_actual['id'])
instance_name = instance.system_job_template.name
notifiers = instance.system_job_template.notifiers
friendly_name = "System Job"
else:
# Unknown task type
break

View File

@@ -0,0 +1,144 @@
import pytest
from django.core.urlresolvers import reverse
@pytest.fixture
def resourced_organization(organization, project, team, inventory, user):
admin_user = user('test-admin', True)
member_user = user('org-member')
# Associate one resource of every type with the organization
organization.users.add(member_user)
organization.admins.add(admin_user)
organization.projects.add(project)
# organization.teams.create(name='org-team')
# inventory = organization.inventories.create(name="associated-inv")
project.jobtemplates.create(name="test-jt",
description="test-job-template-desc",
inventory=inventory,
playbook="test_playbook.yml")
return organization
@pytest.mark.django_db
def test_org_counts_admin(resourced_organization, user, get):
# Check that all types of resources are counted by a superuser
external_admin = user('admin', True)
response = get(reverse('api:organization_list', args=[]), external_admin)
assert response.status_code == 200
counts = response.data['results'][0]['summary_fields']['related_field_counts']
assert counts == {
'users': 1,
'admins': 1,
'job_templates': 1,
'projects': 1,
'inventories': 1,
'teams': 1
}
@pytest.mark.django_db
def test_org_counts_member(resourced_organization, get):
# Check that a non-admin user can only see the full project and
# user count, consistent with the RBAC rules
member_user = resourced_organization.users.get(username='org-member')
response = get(reverse('api:organization_list', args=[]), member_user)
assert response.status_code == 200
counts = response.data['results'][0]['summary_fields']['related_field_counts']
assert counts == {
'users': 1, # User can see themselves
'admins': 0,
'job_templates': 0,
'projects': 1, # Projects are shared with all the organization
'inventories': 0,
'teams': 0
}
@pytest.mark.django_db
def test_new_org_zero_counts(user, post):
# Check that a POST to the organization list endpoint returns
# correct counts, including the new record
org_list_url = reverse('api:organization_list', args=[])
post_response = post(url=org_list_url, data={'name': 'test organization',
'description': ''}, user=user('admin', True))
assert post_response.status_code == 201
new_org_list = post_response.render().data
counts_dict = new_org_list['summary_fields']['related_field_counts']
assert counts_dict == {
'users': 0,
'admins': 0,
'job_templates': 0,
'projects': 0,
'inventories': 0,
'teams': 0
}
@pytest.mark.django_db
def test_two_organizations(resourced_organization, organizations, user, get):
# Check correct results for two organizations are returned
external_admin = user('admin', True)
organization_zero = organizations(1)[0]
response = get(reverse('api:organization_list', args=[]), external_admin)
assert response.status_code == 200
org_id_full = resourced_organization.id
org_id_zero = organization_zero.id
counts = {}
for i in range(2):
org_id = response.data['results'][i]['id']
counts[org_id] = response.data['results'][i]['summary_fields']['related_field_counts']
assert counts[org_id_full] == {
'users': 1,
'admins': 1,
'job_templates': 1,
'projects': 1,
'inventories': 1,
'teams': 1
}
assert counts[org_id_zero] == {
'users': 0,
'admins': 0,
'job_templates': 0,
'projects': 0,
'inventories': 0,
'teams': 0
}
@pytest.mark.django_db
def test_JT_associated_with_project(organizations, project, user, get):
# Check that adding a project to an organization gets the project's JT
# included in the organization's JT count
external_admin = user('admin', True)
two_orgs = organizations(2)
organization = two_orgs[0]
other_org = two_orgs[1]
unrelated_inv = other_org.inventories.create(name='not-in-organization')
project.jobtemplates.create(name="test-jt",
description="test-job-template-desc",
inventory=unrelated_inv,
playbook="test_playbook.yml")
organization.projects.add(project)
response = get(reverse('api:organization_list', args=[]), external_admin)
assert response.status_code == 200
org_id = organization.id
counts = {}
for i in range(2):
working_id = response.data['results'][i]['id']
counts[working_id] = response.data['results'][i]['summary_fields']['related_field_counts']
assert counts[org_id] == {
'users': 0,
'admins': 0,
'job_templates': 1,
'projects': 1,
'inventories': 0,
'teams': 0
}

View File

@@ -0,0 +1,84 @@
# Python
import pytest
from datetime import timedelta
# Django
from django.utils import timezone
from django.conf import settings
# AWX
from awx.fact.models.fact import Fact, FactHost
# MongoEngine
from mongoengine.connection import ConnectionError
@pytest.fixture(autouse=True)
def mongo_db(request):
marker = request.keywords.get('mongo_db', None)
if marker:
# Drop mongo database
try:
db = Fact._get_db()
db.connection.drop_database(settings.MONGO_DB)
except ConnectionError:
raise
@pytest.fixture
def inventories(organization):
def rf(inventory_count=1):
invs = []
for i in xrange(0, inventory_count):
inv = organization.inventories.create(name="test-inv-%d" % i, description="test-inv-desc")
invs.append(inv)
return invs
return rf
'''
hosts naming convension should align with hosts_mongo
'''
@pytest.fixture
def hosts(organization):
def rf(host_count=1, inventories=[]):
hosts = []
for inv in inventories:
for i in xrange(0, host_count):
name = '%s-host-%s' % (inv.name, i)
host = inv.hosts.create(name=name)
hosts.append(host)
return hosts
return rf
@pytest.fixture
def hosts_mongo(organization):
def rf(host_count=1, inventories=[]):
hosts = []
for inv in inventories:
for i in xrange(0, host_count):
name = '%s-host-%s' % (inv.name, i)
(host, created) = FactHost.objects.get_or_create(hostname=name, inventory_id=inv.id)
hosts.append(host)
return hosts
return rf
@pytest.fixture
def fact_scans(organization, fact_ansible_json, fact_packages_json, fact_services_json):
def rf(fact_scans=1, inventories=[], timestamp_epoch=timezone.now()):
facts_json = {}
facts = []
module_names = ['ansible', 'services', 'packages']
facts_json['ansible'] = fact_ansible_json
facts_json['packages'] = fact_packages_json
facts_json['services'] = fact_services_json
for inv in inventories:
for host_obj in FactHost.objects.filter(inventory_id=inv.id):
timestamp_current = timestamp_epoch
for i in xrange(0, fact_scans):
for module_name in module_names:
facts.append(Fact.add_fact(timestamp_current, facts_json[module_name], host_obj, module_name))
timestamp_current += timedelta(days=1)
return facts
return rf

View File

@@ -0,0 +1,79 @@
import pytest
import datetime
from django.apps import apps
from awx.main.models.inventory import Host
from awx.main.models.fact import Fact
from awx.main.migrations import _system_tracking as system_tracking
from awx.fact.models.fact import Fact as FactMongo
from awx.fact.models.fact import FactVersion, FactHost
def micro_to_milli(micro):
return micro - (((int)(micro / 1000)) * 1000)
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_migrate_facts(inventories, hosts, hosts_mongo, fact_scans):
inventory_objs = inventories(2)
hosts(2, inventory_objs)
hosts_mongo(2, inventory_objs)
facts_known = fact_scans(2, inventory_objs)
(migrated_count, not_migrated_count) = system_tracking.migrate_facts(apps, None)
# 4 hosts w/ 2 fact scans each, 3 modules each scan
assert migrated_count == 24
assert not_migrated_count == 0
for fact_mongo, fact_version in facts_known:
host = Host.objects.get(inventory_id=fact_mongo.host.inventory_id, name=fact_mongo.host.hostname)
t = fact_mongo.timestamp - datetime.timedelta(microseconds=micro_to_milli(fact_mongo.timestamp.microsecond))
fact = Fact.objects.filter(host_id=host.id, timestamp=t, module=fact_mongo.module)
assert len(fact) == 1
assert fact[0] is not None
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_migrate_facts_hostname_does_not_exist(inventories, hosts, hosts_mongo, fact_scans):
inventory_objs = inventories(2)
host_objs = hosts(1, inventory_objs)
hosts_mongo(2, inventory_objs)
facts_known = fact_scans(2, inventory_objs)
(migrated_count, not_migrated_count) = system_tracking.migrate_facts(apps, None)
assert migrated_count == 12
assert not_migrated_count == 12
for fact_mongo, fact_version in facts_known:
# Facts that don't match the only host will not be migrated
if fact_mongo.host.hostname != host_objs[0].name:
continue
host = Host.objects.get(inventory_id=fact_mongo.host.inventory_id, name=fact_mongo.host.hostname)
t = fact_mongo.timestamp - datetime.timedelta(microseconds=micro_to_milli(fact_mongo.timestamp.microsecond))
fact = Fact.objects.filter(host_id=host.id, timestamp=t, module=fact_mongo.module)
assert len(fact) == 1
assert fact[0] is not None
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_drop_system_tracking_db(inventories, hosts, hosts_mongo, fact_scans):
inventory_objs = inventories(1)
hosts_mongo(1, inventory_objs)
fact_scans(1, inventory_objs)
assert FactMongo.objects.all().count() > 0
assert FactVersion.objects.all().count() > 0
assert FactHost.objects.all().count() > 0
system_tracking.drop_system_tracking_db()
assert FactMongo.objects.all().count() == 0
assert FactVersion.objects.all().count() == 0
assert FactHost.objects.all().count() == 0

View File

@@ -4,6 +4,7 @@ from awx.main.models import (
Role,
RolePermission,
Organization,
Project,
)
@@ -195,3 +196,48 @@ def test_hierarchy_rebuilding():
assert X.is_ancestor_of(D) is False
@pytest.mark.django_db
def test_auto_parenting():
org1 = Organization.objects.create(name='org1')
org2 = Organization.objects.create(name='org2')
prj1 = Project.objects.create(name='prj1')
prj2 = Project.objects.create(name='prj2')
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj1.organization = org1
prj1.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role)
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj2.organization = org1
prj2.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role)
assert org1.admin_role.is_ancestor_of(prj2.admin_role)
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj1.organization = org2
prj1.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role)
assert org2.admin_role.is_ancestor_of(prj1.admin_role)
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj2.organization = org2
prj2.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role)
assert org2.admin_role.is_ancestor_of(prj2.admin_role)