Merge pull request #1607 from anoek/performance

RBAC insert performance improvements and various unit test performance improvements
This commit is contained in:
Wayne Witzel III 2016-04-20 08:56:47 -04:00
commit 8b1cd00c42
13 changed files with 127 additions and 203 deletions

View File

@ -5,7 +5,6 @@ import json
# Django
from django.db.models.signals import (
pre_save,
post_save,
post_delete,
)
@ -118,9 +117,8 @@ class ImplicitRoleField(models.ForeignKey):
setattr(cls, '__implicit_role_fields', [])
getattr(cls, '__implicit_role_fields').append(self)
pre_save.connect(self._pre_save, cls, True, dispatch_uid='implicit-role-pre-save')
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True)
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
add_lazy_relation(cls, self, "self", self.bind_m2m_changed)
def bind_m2m_changed(self, _self, _role_class, cls):
@ -175,39 +173,44 @@ class ImplicitRoleField(models.ForeignKey):
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _create_role_instance_if_not_exists(self, instance):
role = getattr(instance, self.name, None)
if role:
return
Role_ = get_current_apps().get_model('main', 'Role')
role = Role_.objects.create(
created=now(),
modified=now(),
role_field=self.name,
name=self.role_name,
description=self.role_description
)
setattr(instance, self.name, role)
def _patch_role_content_object(self, instance):
role = getattr(instance, self.name)
role.content_object = instance
role.save()
def _pre_save(self, instance, *args, **kwargs):
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._create_role_instance_if_not_exists(instance)
def _post_save(self, instance, created, *args, **kwargs):
if created:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._patch_role_content_object(instance)
Role_ = get_current_apps().get_model('main', 'Role')
ContentType_ = get_current_apps().get_model('contenttypes', 'ContentType')
ct_id = ContentType_.objects.get_for_model(instance).id
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
cur_role = getattr(instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(
Role_(
created=now(),
modified=now(),
role_field=implicit_role_field.name,
name=implicit_role_field.role_name,
description=implicit_role_field.role_description,
content_type_id=ct_id,
object_id=instance.id
)
)
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=instance.id):
setattr(instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(instance).objects.filter(pk=instance.pk).update(**updates)
Role_._simultaneous_ancestry_rebuild(role_ids)
# Update parentage if necessary
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
new_parents = implicit_role_field._resolve_parent_roles(instance)
cur_role.parents.remove(*list(original_parents - new_parents))
cur_role.parents.add(*list(new_parents - original_parents))
new_parents_list = list(new_parents)
@ -246,9 +249,11 @@ class ImplicitRoleField(models.ForeignKey):
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
this_role = getattr(instance, self.name)
children = [c for c in this_role.children.all()]
this_role.delete()
with batch_role_ancestor_rebuilding():
for child in children:
child.rebuild_role_ancestor_list()
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
Role_._simultaneous_ancestry_rebuild(child_ids)

View File

@ -121,14 +121,6 @@ class Role(CommonModelNameNotUnique):
Note that this method relies on any parents' ancestor list being correct.
'''
global tls
batch_role_rebuilding = getattr(tls, 'batch_role_rebuilding', False)
if batch_role_rebuilding:
roles_needing_rebuilding = getattr(tls, 'roles_needing_rebuilding')
roles_needing_rebuilding.add(self.id)
return
Role._simultaneous_ancestry_rebuild([self.id])
@ -216,6 +208,15 @@ class Role(CommonModelNameNotUnique):
if len(role_ids_to_rebuild) == 0:
return
global tls
batch_role_rebuilding = getattr(tls, 'batch_role_rebuilding', False)
if batch_role_rebuilding:
roles_needing_rebuilding = getattr(tls, 'roles_needing_rebuilding')
roles_needing_rebuilding.update(set(role_ids_to_rebuild))
return
cursor = connection.cursor()
loop_ct = 0
@ -225,19 +226,10 @@ class Role(CommonModelNameNotUnique):
'roles_table': Role._meta.db_table,
}
def split_ids_for_sqlite(role_ids):
for i in xrange(0, len(role_ids), 999):
yield role_ids[i:i + 999]
for ids in split_ids_for_sqlite(role_ids_to_rebuild):
sql_params['ids'] = ','.join(str(x) for x in ids)
cursor.execute('''
DELETE FROM %(ancestors_table)s
WHERE ancestor_id IN (%(ids)s)
''' % sql_params)
while role_ids_to_rebuild:
if loop_ct > 1000:
raise Exception('Ancestry role rebuilding error: infinite loop detected')
@ -371,4 +363,3 @@ def get_roles_on_resource(resource, accessor):
object_id=resource.id
).values_list('role_field', flat=True)
}

View File

@ -4,10 +4,12 @@ import pytest
from awx.main.middleware import ActivityStreamMiddleware
from awx.main.models.activity_stream import ActivityStream
from django.core.urlresolvers import reverse
from django.conf import settings
def mock_feature_enabled(feature, bypass_database=None):
return True
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_get_activity_stream_list(monkeypatch, organization, get, user):
@ -16,6 +18,7 @@ def test_get_activity_stream_list(monkeypatch, organization, get, user):
assert response.status_code == 200
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(monkeypatch, organization, get, user):
@ -35,6 +38,7 @@ def test_basic_fields(monkeypatch, organization, get, user):
assert 'organization' in response.data['summary_fields']
assert response.data['summary_fields']['organization'][0]['name'] == 'test-org'
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_middleware_actor_added(monkeypatch, post, get, user):

View File

@ -94,7 +94,7 @@ def test_content(hosts, fact_scans, get, user, fact_ansible_json):
(fact_known, response) = setup_common(hosts, fact_scans, get, user)
assert fact_known.host_id == response.data['host']
assert fact_ansible_json == json.loads(response.data['facts'])
assert fact_ansible_json == (json.loads(response.data['facts']) if isinstance(response.data['facts'], unicode) else response.data['facts']) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert fact_known.module == response.data['module']
@ -104,7 +104,7 @@ def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name)
}
(fact_known, response) = setup_common(hosts, fact_scans, get, user, module_name=module_name, get_params=params)
assert fact_json == json.loads(response.data['facts'])
assert fact_json == (json.loads(response.data['facts']) if isinstance(response.data['facts'], unicode) else response.data['facts']) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert module_name == response.data['module']

View File

@ -19,9 +19,7 @@ def organization_resource_creator(organization, user):
inventory = organization.inventories.create(name="associated-inv %s" % i)
for i in range(projects):
organization.projects.create(name="test-proj %s" % i,
description="test-proj-desc",
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks")
description="test-proj-desc")
# Mix up the inventories and projects used by the job templates
i_proj = 0
i_inv = 0

View File

@ -80,12 +80,11 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible):
fact_obj = Fact.objects.get(id=fact_returned.id)
assert key in fact_obj.facts
assert json.loads(fact_obj.facts) == fact_msg_ansible['facts']
assert value == json.loads(fact_obj.facts)[key]
assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
# Ensure that the message flows from the socket through to process_fact_message()
@pytest.mark.django_db
def test_run_receiver(mocker, fact_msg_ansible):
def test_run_receiver(mocker, fact_msg_ansible):
mocker.patch("awx.main.socket.Socket.listen", return_value=[fact_msg_ansible])
receiver = FactCacheReceiver()

View File

@ -106,8 +106,6 @@ def team_member(user, team):
def project(instance, organization):
prj = Project.objects.create(name="test-proj",
description="test-proj-desc",
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks",
organization=organization
)
return prj
@ -120,8 +118,6 @@ def project_factory(organization):
except Project.DoesNotExist:
prj = Project.objects.create(name=name,
description="description for " + name,
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks",
organization=organization
)
return prj

View File

@ -2,6 +2,7 @@ import pytest
import datetime
from django.apps import apps
from django.conf import settings
from awx.main.models.inventory import Host
from awx.main.models.fact import Fact
@ -14,6 +15,7 @@ from awx.fact.models.fact import FactVersion, FactHost
def micro_to_milli(micro):
return micro - (((int)(micro / 1000)) * 1000)
@pytest.mark.skipif(not getattr(settings, 'MONGO_DB', None), reason="MongoDB not configured")
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_migrate_facts(inventories, hosts, hosts_mongo, fact_scans):
@ -27,7 +29,7 @@ def test_migrate_facts(inventories, hosts, hosts_mongo, fact_scans):
assert migrated_count == 24
assert not_migrated_count == 0
for fact_mongo, fact_version in facts_known:
host = Host.objects.get(inventory_id=fact_mongo.host.inventory_id, name=fact_mongo.host.hostname)
t = fact_mongo.timestamp - datetime.timedelta(microseconds=micro_to_milli(fact_mongo.timestamp.microsecond))
@ -36,6 +38,7 @@ def test_migrate_facts(inventories, hosts, hosts_mongo, fact_scans):
assert len(fact) == 1
assert fact[0] is not None
@pytest.mark.skipif(not getattr(settings, 'MONGO_DB', None), reason="MongoDB not configured")
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_migrate_facts_hostname_does_not_exist(inventories, hosts, hosts_mongo, fact_scans):
@ -60,7 +63,8 @@ def test_migrate_facts_hostname_does_not_exist(inventories, hosts, hosts_mongo,
assert len(fact) == 1
assert fact[0] is not None
@pytest.mark.skipif(not getattr(settings, 'MONGO_DB', None), reason="MongoDB not configured")
@pytest.mark.django_db
@pytest.mark.mongo_db
def test_drop_system_tracking_db(inventories, hosts, hosts_mongo, fact_scans):
@ -73,7 +77,7 @@ def test_drop_system_tracking_db(inventories, hosts, hosts_mongo, fact_scans):
assert FactHost.objects.all().count() > 0
system_tracking.drop_system_tracking_db()
assert FactMongo.objects.all().count() == 0
assert FactVersion.objects.all().count() == 0
assert FactHost.objects.all().count() == 0

View File

@ -21,7 +21,7 @@ def test_basic_parameterization(get, post, user, organization):
response = post(url,
dict(name="test-webhook",
description="test webhook",
organization=1,
organization=organization.id,
notification_type="webhook",
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"})),
@ -72,7 +72,7 @@ def test_inherited_notifiers(get, post, user, organization, project):
response = post(url,
dict(name="test-webhook-{}".format(nfiers),
description="test webhook {}".format(nfiers),
organization=1,
organization=organization.id,
notification_type="webhook",
notification_configuration=dict(url="http://localhost",
headers={"Test": "Header"})),

View File

@ -1,16 +1,16 @@
import mock # noqa
import pytest
from django.db import transaction
from django.core.urlresolvers import reverse
from awx.main.models import Project
#
# Project listing and visibility tests
#
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db
def test_user_project_list(get, project_factory, organization, admin, alice, bob):
'List of projects a user has access to, filtered by projects you can also see'
@ -43,9 +43,7 @@ def test_user_project_list(get, project_factory, organization, admin, alice, bob
assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2
@pytest.mark.django_db(transaction=True)
def test_team_project_list(get, project_factory, team_factory, admin, alice, bob):
'List of projects a team has access to, filtered by projects you can also see'
def setup_test_team_project_list(project_factory, team_factory, admin, alice, bob):
team1 = team_factory('team1')
team2 = team_factory('team2')
@ -61,6 +59,12 @@ def test_team_project_list(get, project_factory, team_factory, admin, alice, bob
team1.member_role.members.add(alice)
team2.member_role.members.add(bob)
return team1, team2
@pytest.mark.django_db
def test_team_project_list(get, project_factory, team_factory, admin, alice, bob):
'List of projects a team has access to, filtered by projects you can also see'
team1, team2 = setup_test_team_project_list(project_factory, team_factory, admin, alice, bob)
# admins can see all projects on a team
assert get(reverse('api:team_projects_list', args=(team1.pk,)), admin).data['count'] == 2
@ -69,17 +73,16 @@ def test_team_project_list(get, project_factory, team_factory, admin, alice, bob
# users can see all projects on teams they are a member of
assert get(reverse('api:team_projects_list', args=(team1.pk,)), alice).data['count'] == 2
# alice should not be able to see team2 projects because she doesn't have access to team2
res = get(reverse('api:team_projects_list', args=(team2.pk,)), alice)
assert res.status_code == 403
# but if she does, then she should only see the shared project
team2.auditor_role.members.add(alice)
assert get(reverse('api:team_projects_list', args=(team2.pk,)), alice).data['count'] == 1
team2.auditor_role.members.remove(alice)
# Test user endpoints first, very similar tests to test_user_project_list
# but permissions are being derived from team membership instead.
with transaction.atomic():
res = get(reverse('api:user_projects_list', args=(bob.pk,)), alice)
assert res.status_code == 403
# admins can see all projects
assert get(reverse('api:user_projects_list', args=(admin.pk,)), admin).data['count'] == 3
@ -91,28 +94,43 @@ def test_team_project_list(get, project_factory, team_factory, admin, alice, bob
# users can see their own projects
assert get(reverse('api:user_projects_list', args=(alice.pk,)), alice).data['count'] == 2
# alice should not be able to see bob
res = get(reverse('api:user_projects_list', args=(bob.pk,)), alice)
assert res.status_code == 403
# alice should see all projects they can see when viewing an admin
assert get(reverse('api:user_projects_list', args=(admin.pk,)), alice).data['count'] == 2
@pytest.mark.django_db
def test_team_project_list_fail1(get, project_factory, team_factory, admin, alice, bob):
# alice should not be able to see team2 projects because she doesn't have access to team2
team1, team2 = setup_test_team_project_list(project_factory, team_factory, admin, alice, bob)
res = get(reverse('api:team_projects_list', args=(team2.pk,)), alice)
assert res.status_code == 403
@pytest.mark.django_db
def test_team_project_list_fail2(get, project_factory, team_factory, admin, alice, bob):
team1, team2 = setup_test_team_project_list(project_factory, team_factory, admin, alice, bob)
# alice should not be able to see bob
@pytest.mark.django_db(transaction=True)
def test_create_project(post, organization, org_admin, org_member, admin, rando):
test_list = [rando, org_member, org_admin, admin]
expected_status_codes = [403, 403, 201, 201]
@pytest.mark.parametrize("u,expected_status_code", [
('rando', 403),
('org_member', 403),
('org_admin', 201),
('admin', 201)
])
@pytest.mark.django_db()
def test_create_project(post, organization, org_admin, org_member, admin, rando, u, expected_status_code):
if u == 'rando':
u = rando
elif u == 'org_member':
u = org_member
elif u == 'org_admin':
u = org_admin
elif u == 'admin':
u = admin
for i, u in enumerate(test_list):
result = post(reverse('api:project_list'), {
'name': 'Project %d' % i,
'organization': organization.id,
}, u)
print(result.data)
assert result.status_code == expected_status_codes[i]
if expected_status_codes[i] == 201:
assert Project.objects.filter(name='Project %d' % i, organization=organization).exists()
else:
assert not Project.objects.filter(name='Project %d' % i, organization=organization).exists()
result = post(reverse('api:project_list'), {
'name': 'Project',
'organization': organization.id,
}, u)
print(result.data)
assert result.status_code == expected_status_code
if expected_status_code == 201:
assert Project.objects.filter(name='Project', organization=organization).exists()

View File

@ -1,6 +1,7 @@
import mock # noqa
import pytest
from django.db import transaction
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
@ -266,7 +267,7 @@ def test_remove_user_to_role(post, admin, role):
post(url, {'disassociate': True, 'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 0
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db
def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
@ -280,8 +281,8 @@ def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplat
assert joe in check_jobtemplate.execute_role
@pytest.mark.django_db(transaction=True)
def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemplate, user):
@pytest.mark.django_db
def test_org_admin_remove_user_from_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
joe = user('joe')
@ -295,7 +296,7 @@ def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemp
assert joe not in check_jobtemplate.execute_role
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db
def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
@ -304,13 +305,14 @@ def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemp
assert rando not in check_jobtemplate.admin_role
assert joe not in check_jobtemplate.execute_role
res = post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'id': joe.id}, rando)
with transaction.atomic():
res = post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'id': joe.id}, rando)
assert res.status_code == 403
assert joe not in check_jobtemplate.execute_role
@pytest.mark.django_db(transaction=True)
@pytest.mark.django_db
def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
@ -320,7 +322,8 @@ def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobt
assert rando not in check_jobtemplate.admin_role
assert joe in check_jobtemplate.execute_role
res = post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
with transaction.atomic():
res = post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
assert res.status_code == 403
assert joe in check_jobtemplate.execute_role

View File

@ -175,100 +175,6 @@ def test_hierarchy_rebuilding_multi_path():
assert X.is_ancestor_of(D) is False
@pytest.mark.django_db
def test_hierarchy_rebuilding_loops1(organization, team):
'Tests ancestry rebuilding loops are involved'
assert team.admin_role.is_ancestor_of(organization.admin_role) is False
assert organization.admin_role.is_ancestor_of(team.admin_role)
team.admin_role.children.add(organization.admin_role)
assert team.admin_role.is_ancestor_of(organization.admin_role)
assert organization.admin_role.is_ancestor_of(team.admin_role)
team.admin_role.children.remove(organization.admin_role)
assert team.admin_role.is_ancestor_of(organization.admin_role) is False
assert organization.admin_role.is_ancestor_of(team.admin_role)
team.admin_role.children.add(organization.admin_role)
X = Role.objects.create(name='X')
X.children.add(organization.admin_role)
assert X.is_ancestor_of(team.admin_role)
assert X.is_ancestor_of(organization.admin_role)
assert organization.admin_role.is_ancestor_of(X) is False
assert team.admin_role.is_ancestor_of(X) is False
#print(X.descendents.filter(id=organization.admin_role.id).count())
#print(X.children.filter(id=organization.admin_role.id).count())
X.children.remove(organization.admin_role)
X.rebuild_role_ancestor_list()
#print(X.descendents.filter(id=organization.admin_role.id).count())
#print(X.children.filter(id=organization.admin_role.id).count())
assert X.is_ancestor_of(team.admin_role) is False
assert X.is_ancestor_of(organization.admin_role) is False
@pytest.mark.django_db
def test_hierarchy_rebuilding_loops():
'Tests ancestry rebuilding loops are involved'
X = Role.objects.create(name='X')
A = Role.objects.create(name='A')
B = Role.objects.create(name='B')
C = Role.objects.create(name='C')
A.children.add(B)
B.children.add(C)
C.children.add(A)
X.children.add(A)
assert X.is_ancestor_of(A)
assert X.is_ancestor_of(B)
assert X.is_ancestor_of(C)
assert A.is_ancestor_of(B)
assert A.is_ancestor_of(C)
assert B.is_ancestor_of(C)
assert B.is_ancestor_of(A)
assert C.is_ancestor_of(A)
assert C.is_ancestor_of(B)
X.children.remove(A)
X.rebuild_role_ancestor_list()
assert X.is_ancestor_of(A) is False
assert X.is_ancestor_of(B) is False
assert X.is_ancestor_of(C) is False
X.children.add(A)
assert X.is_ancestor_of(A)
assert X.is_ancestor_of(B)
assert X.is_ancestor_of(C)
C.children.remove(A)
assert A.is_ancestor_of(B)
assert A.is_ancestor_of(C)
assert B.is_ancestor_of(C)
assert B.is_ancestor_of(A) is False
assert C.is_ancestor_of(A) is False
assert C.is_ancestor_of(B) is False
assert X.is_ancestor_of(A)
assert X.is_ancestor_of(B)
assert X.is_ancestor_of(C)
@pytest.mark.django_db
def test_auto_parenting():
org1 = Organization.objects.create(name='org1')

View File

@ -952,7 +952,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertNotEqual(new_inv.groups.count(), 0)
self.assertNotEqual(new_inv.total_hosts, 0)
self.assertNotEqual(new_inv.total_groups, 0)
self.assertElapsedLessThan(1800) # TODO: We need to revisit this again to see if we can optimize this back to the sub-600 second range - anoek 2016-04-18
self.assertElapsedLessThan(600)
def _get_ngroups_for_nhosts(self, n):
@ -978,7 +978,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertEqual(new_inv.groups.count(), ngroups)
self.assertEqual(new_inv.total_hosts, nhosts)
self.assertEqual(new_inv.total_groups, ngroups)
self.assertElapsedLessThan(180) # TODO: We need to revisit this again to see if we can optimize this back to the sub-120 second range - anoek 2016-04-18
self.assertElapsedLessThan(120)
@unittest.skipIf(getattr(settings, 'LOCAL_DEVELOPMENT', False),
'Skip this test in local development environments, '