mirror of
https://github.com/ansible/awx.git
synced 2026-01-19 21:51:26 -03:30
Merge pull request #5434 from AlanCoding/activity_hygiene
avoid multi-field implicit role activity stream entries
This commit is contained in:
commit
d2795c4784
@ -96,6 +96,37 @@ def resolve_role_field(obj, field):
|
||||
return ret
|
||||
|
||||
|
||||
def is_implicit_parent(parent_role, child_role):
|
||||
'''
|
||||
Determine if the parent_role is an implicit parent as defined by
|
||||
the model definition. This does not include any role parents that
|
||||
might have been set by the user.
|
||||
'''
|
||||
# Get the list of implicit parents that were defined at the class level.
|
||||
implicit_parents = getattr(
|
||||
child_role.content_object.__class__, child_role.role_field
|
||||
).field.parent_role
|
||||
if type(implicit_parents) != list:
|
||||
implicit_parents = [implicit_parents]
|
||||
|
||||
# Check to see if the role matches any in the implicit parents list
|
||||
for implicit_parent_path in implicit_parents:
|
||||
if implicit_parent_path.startswith('singleton:'):
|
||||
# Singleton role isn't an object role, `singleton_name` uniquely identifies it
|
||||
if parent_role.is_singleton() and parent_role.singleton_name == implicit_parent_path[10:]:
|
||||
return True
|
||||
else:
|
||||
# Walk over multiple related objects to obtain the implicit parent
|
||||
related_obj = child_role.content_object
|
||||
for next_field in implicit_parent_path.split('.'):
|
||||
related_obj = getattr(related_obj, next_field)
|
||||
if related_obj is None:
|
||||
break
|
||||
if related_obj and parent_role == related_obj:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
||||
pass
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ from awx.api.serializers import * # noqa
|
||||
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore
|
||||
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
|
||||
from awx.main.tasks import update_inventory_computed_fields
|
||||
from awx.main.fields import is_implicit_parent
|
||||
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
|
||||
@ -179,30 +180,9 @@ def rbac_activity_stream(instance, sender, **kwargs):
|
||||
return
|
||||
elif sender.__name__ == 'Role_parents':
|
||||
role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first()
|
||||
# don't record implicit creation / parents
|
||||
if role is not None:
|
||||
if role.content_type is None:
|
||||
if role.is_singleton():
|
||||
parent = 'singleton:' + role.singleton_name
|
||||
else:
|
||||
# Ill-defined role, may need additional logic in the
|
||||
# case of future expansions of the RBAC system
|
||||
parent = str(role.role_field)
|
||||
else:
|
||||
parent = role.content_type.name + "." + role.role_field
|
||||
# Get the list of implicit parents that were defined at the class level.
|
||||
# We have to take this list from the class property to avoid including parents
|
||||
# that may have been added since the creation of the ImplicitRoleField
|
||||
implicit_parents = getattr(instance.content_object.__class__, instance.role_field).field.parent_role
|
||||
if type(implicit_parents) != list:
|
||||
implicit_parents = [implicit_parents]
|
||||
# Ignore any singleton parents we find. If the parent for the role
|
||||
# matches any of the implicit parents we find, skip recording the activity stream.
|
||||
for ip in implicit_parents:
|
||||
if '.' not in ip and 'singleton:' not in ip:
|
||||
ip = instance.content_type.name + "." + ip
|
||||
if parent == ip:
|
||||
return
|
||||
# don't record implicit creation / parents in activity stream
|
||||
if role is not None and is_implicit_parent(parent_role=role, child_role=instance):
|
||||
return
|
||||
else:
|
||||
role = instance
|
||||
instance = instance.content_object
|
||||
|
||||
@ -1,13 +1,82 @@
|
||||
import pytest
|
||||
|
||||
# AWX models
|
||||
from awx.main.models.organization import Organization
|
||||
from awx.main.models import ActivityStream
|
||||
from awx.main.models import ActivityStream, Organization, JobTemplate
|
||||
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_entries():
|
||||
Organization.objects.create(name='test-organization2')
|
||||
assert ActivityStream.objects.filter(organization__isnull=False).count() == 1
|
||||
class TestImplicitRolesOmitted:
|
||||
'''
|
||||
Test that there is exactly 1 "create" entry in the activity stream for
|
||||
common items in the system.
|
||||
These tests will fail if `rbac_activity_stream` creates
|
||||
false-positive entries.
|
||||
'''
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_organization(self):
|
||||
Organization.objects.create(name='test-organization2')
|
||||
qs = ActivityStream.objects.filter(organization__isnull=False)
|
||||
assert qs.count() == 1
|
||||
assert qs[0].operation == 'create'
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_delete_organization(self):
|
||||
org = Organization.objects.create(name='gYSlNSOFEW')
|
||||
org.delete()
|
||||
qs = ActivityStream.objects.filter(changes__icontains='gYSlNSOFEW')
|
||||
assert qs.count() == 2
|
||||
assert qs[1].operation == 'delete'
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_JT(self, project, inventory, credential):
|
||||
JobTemplate.objects.create(
|
||||
name='test-jt',
|
||||
project=project,
|
||||
inventory=inventory,
|
||||
credential=credential
|
||||
)
|
||||
qs = ActivityStream.objects.filter(job_template__isnull=False)
|
||||
assert qs.count() == 1
|
||||
assert qs[0].operation == 'create'
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_inventory(self, organization):
|
||||
organization.inventories.create(name='test-inv')
|
||||
qs = ActivityStream.objects.filter(inventory__isnull=False)
|
||||
assert qs.count() == 1
|
||||
assert qs[0].operation == 'create'
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_activity_stream_create_credential(self, organization):
|
||||
organization.inventories.create(name='test-inv')
|
||||
qs = ActivityStream.objects.filter(inventory__isnull=False)
|
||||
assert qs.count() == 1
|
||||
assert qs[0].operation == 'create'
|
||||
|
||||
|
||||
class TestRolesAssociationEntries:
|
||||
'''
|
||||
Test that non-implicit role associations have a corresponding
|
||||
activity stream entry.
|
||||
These tests will fail if `rbac_activity_stream` skipping logic
|
||||
finds a false-negative.
|
||||
'''
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_non_implicit_associations_are_recorded(self, project):
|
||||
org2 = Organization.objects.create(name='test-organization2')
|
||||
project.admin_role.parents.add(org2.admin_role)
|
||||
assert ActivityStream.objects.filter(
|
||||
role=org2.admin_role,
|
||||
organization=org2,
|
||||
project=project
|
||||
).count() == 1
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_model_associations_are_recorded(self, organization):
|
||||
proj1 = organization.projects.create(name='proj1')
|
||||
proj2 = organization.projects.create(name='proj2')
|
||||
proj2.use_role.parents.add(proj1.admin_role)
|
||||
assert ActivityStream.objects.filter(role=proj1.admin_role, project=proj2).count() == 1
|
||||
|
||||
|
||||
103
awx/main/tests/unit/models/test_rbac_unit.py
Normal file
103
awx/main/tests/unit/models/test_rbac_unit.py
Normal file
@ -0,0 +1,103 @@
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from awx.main.models.rbac import (
|
||||
Role,
|
||||
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
||||
ROLE_SINGLETON_SYSTEM_AUDITOR
|
||||
)
|
||||
from awx.main.models import Organization, JobTemplate, Project
|
||||
|
||||
from awx.main.fields import (
|
||||
ImplicitRoleField,
|
||||
is_implicit_parent
|
||||
)
|
||||
|
||||
|
||||
def apply_fake_roles(obj):
|
||||
'''
|
||||
Creates an un-saved role for all the implicit role fields on an object
|
||||
'''
|
||||
for fd in obj._meta.fields:
|
||||
if not isinstance(fd, ImplicitRoleField):
|
||||
continue
|
||||
r = Role(role_field=fd.name)
|
||||
setattr(obj, fd.name, r)
|
||||
with mock.patch('django.contrib.contenttypes.fields.GenericForeignKey.get_content_type') as mck_ct:
|
||||
mck_ct.return_value = ContentType(model=obj._meta.model_name)
|
||||
r.content_object = obj
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def system_administrator():
|
||||
return Role(
|
||||
role_field=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
||||
singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def system_auditor():
|
||||
return Role(
|
||||
role_field=ROLE_SINGLETON_SYSTEM_AUDITOR,
|
||||
singleton_name=ROLE_SINGLETON_SYSTEM_AUDITOR
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def organization():
|
||||
o = Organization(name='unit-test-org')
|
||||
apply_fake_roles(o)
|
||||
return o
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def project(organization):
|
||||
p = Project(name='unit-test-proj', organization=organization)
|
||||
apply_fake_roles(p)
|
||||
return p
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(project):
|
||||
jt = JobTemplate(name='unit-test-jt', project=project)
|
||||
apply_fake_roles(jt)
|
||||
return jt
|
||||
|
||||
|
||||
class TestIsImplicitParent:
|
||||
'''
|
||||
Tests to confirm that `is_implicit_parent` gives the right answers
|
||||
'''
|
||||
def test_sys_admin_implicit_parent(self, organization, system_administrator):
|
||||
assert is_implicit_parent(
|
||||
parent_role=system_administrator,
|
||||
child_role=organization.admin_role
|
||||
)
|
||||
|
||||
|
||||
def test_admin_is_parent_of_member_role(self, organization):
|
||||
assert is_implicit_parent(
|
||||
parent_role=organization.admin_role,
|
||||
child_role=organization.member_role
|
||||
)
|
||||
|
||||
def test_member_is_not_parent_of_admin_role(self, organization):
|
||||
assert not is_implicit_parent(
|
||||
parent_role=organization.member_role,
|
||||
child_role=organization.admin_role
|
||||
)
|
||||
|
||||
def test_second_level_implicit_parent_role(self, job_template, organization):
|
||||
assert is_implicit_parent(
|
||||
parent_role=organization.admin_role,
|
||||
child_role=job_template.admin_role
|
||||
)
|
||||
|
||||
def test_second_level_is_not_an_implicit_parent_role(self, job_template, organization):
|
||||
assert not is_implicit_parent(
|
||||
parent_role=organization.member_role,
|
||||
child_role=job_template.admin_role
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user