diff --git a/awx/main/fields.py b/awx/main/fields.py index fe7e86b96f..51b0efd2ed 100644 --- a/awx/main/fields.py +++ b/awx/main/fields.py @@ -96,6 +96,37 @@ def resolve_role_field(obj, field): return ret +def is_implicit_parent(parent_role, child_role): + ''' + Determine if the parent_role is an implicit parent as defined by + the model definition. This does not include any role parents that + might have been set by the user. + ''' + # Get the list of implicit parents that were defined at the class level. + implicit_parents = getattr( + child_role.content_object.__class__, child_role.role_field + ).field.parent_role + if type(implicit_parents) != list: + implicit_parents = [implicit_parents] + + # Check to see if the role matches any in the implicit parents list + for implicit_parent_path in implicit_parents: + if implicit_parent_path.startswith('singleton:'): + # Singleton role isn't an object role, `singleton_name` uniquely identifies it + if parent_role.is_singleton() and parent_role.singleton_name == implicit_parent_path[10:]: + return True + else: + # Walk over multiple related objects to obtain the implicit parent + related_obj = child_role.content_object + for next_field in implicit_parent_path.split('.'): + related_obj = getattr(related_obj, next_field) + if related_obj is None: + break + if related_obj and parent_role == related_obj: + return True + return False + + class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor): pass diff --git a/awx/main/signals.py b/awx/main/signals.py index 20dcd2dcd6..9e5fa95692 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -22,6 +22,7 @@ from awx.api.serializers import * # noqa from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates from awx.main.tasks import update_inventory_computed_fields +from awx.main.fields import is_implicit_parent from awx.main.consumers import emit_channel_notification @@ -179,30 +180,9 @@ def rbac_activity_stream(instance, sender, **kwargs): return elif sender.__name__ == 'Role_parents': role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first() - # don't record implicit creation / parents - if role is not None: - if role.content_type is None: - if role.is_singleton(): - parent = 'singleton:' + role.singleton_name - else: - # Ill-defined role, may need additional logic in the - # case of future expansions of the RBAC system - parent = str(role.role_field) - else: - parent = role.content_type.name + "." + role.role_field - # Get the list of implicit parents that were defined at the class level. - # We have to take this list from the class property to avoid including parents - # that may have been added since the creation of the ImplicitRoleField - implicit_parents = getattr(instance.content_object.__class__, instance.role_field).field.parent_role - if type(implicit_parents) != list: - implicit_parents = [implicit_parents] - # Ignore any singleton parents we find. If the parent for the role - # matches any of the implicit parents we find, skip recording the activity stream. - for ip in implicit_parents: - if '.' not in ip and 'singleton:' not in ip: - ip = instance.content_type.name + "." + ip - if parent == ip: - return + # don't record implicit creation / parents in activity stream + if role is not None and is_implicit_parent(parent_role=role, child_role=instance): + return else: role = instance instance = instance.content_object diff --git a/awx/main/tests/functional/models/test_activity_stream.py b/awx/main/tests/functional/models/test_activity_stream.py index b6e63a4377..b9c1611f05 100644 --- a/awx/main/tests/functional/models/test_activity_stream.py +++ b/awx/main/tests/functional/models/test_activity_stream.py @@ -1,13 +1,82 @@ import pytest # AWX models -from awx.main.models.organization import Organization -from awx.main.models import ActivityStream +from awx.main.models import ActivityStream, Organization, JobTemplate -@pytest.mark.django_db -def test_activity_stream_create_entries(): - Organization.objects.create(name='test-organization2') - assert ActivityStream.objects.filter(organization__isnull=False).count() == 1 +class TestImplicitRolesOmitted: + ''' + Test that there is exactly 1 "create" entry in the activity stream for + common items in the system. + These tests will fail if `rbac_activity_stream` creates + false-positive entries. + ''' + + @pytest.mark.django_db + def test_activity_stream_create_organization(self): + Organization.objects.create(name='test-organization2') + qs = ActivityStream.objects.filter(organization__isnull=False) + assert qs.count() == 1 + assert qs[0].operation == 'create' + + @pytest.mark.django_db + def test_activity_stream_delete_organization(self): + org = Organization.objects.create(name='gYSlNSOFEW') + org.delete() + qs = ActivityStream.objects.filter(changes__icontains='gYSlNSOFEW') + assert qs.count() == 2 + assert qs[1].operation == 'delete' + + @pytest.mark.django_db + def test_activity_stream_create_JT(self, project, inventory, credential): + JobTemplate.objects.create( + name='test-jt', + project=project, + inventory=inventory, + credential=credential + ) + qs = ActivityStream.objects.filter(job_template__isnull=False) + assert qs.count() == 1 + assert qs[0].operation == 'create' + + @pytest.mark.django_db + def test_activity_stream_create_inventory(self, organization): + organization.inventories.create(name='test-inv') + qs = ActivityStream.objects.filter(inventory__isnull=False) + assert qs.count() == 1 + assert qs[0].operation == 'create' + + @pytest.mark.django_db + def test_activity_stream_create_credential(self, organization): + organization.inventories.create(name='test-inv') + qs = ActivityStream.objects.filter(inventory__isnull=False) + assert qs.count() == 1 + assert qs[0].operation == 'create' + + +class TestRolesAssociationEntries: + ''' + Test that non-implicit role associations have a corresponding + activity stream entry. + These tests will fail if `rbac_activity_stream` skipping logic + finds a false-negative. + ''' + + @pytest.mark.django_db + def test_non_implicit_associations_are_recorded(self, project): + org2 = Organization.objects.create(name='test-organization2') + project.admin_role.parents.add(org2.admin_role) + assert ActivityStream.objects.filter( + role=org2.admin_role, + organization=org2, + project=project + ).count() == 1 + + @pytest.mark.django_db + def test_model_associations_are_recorded(self, organization): + proj1 = organization.projects.create(name='proj1') + proj2 = organization.projects.create(name='proj2') + proj2.use_role.parents.add(proj1.admin_role) + assert ActivityStream.objects.filter(role=proj1.admin_role, project=proj2).count() == 1 diff --git a/awx/main/tests/unit/models/test_rbac_unit.py b/awx/main/tests/unit/models/test_rbac_unit.py new file mode 100644 index 0000000000..24d9a657ba --- /dev/null +++ b/awx/main/tests/unit/models/test_rbac_unit.py @@ -0,0 +1,103 @@ +import pytest +import mock + +from django.contrib.contenttypes.models import ContentType + +from awx.main.models.rbac import ( + Role, + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, + ROLE_SINGLETON_SYSTEM_AUDITOR +) +from awx.main.models import Organization, JobTemplate, Project + +from awx.main.fields import ( + ImplicitRoleField, + is_implicit_parent +) + + +def apply_fake_roles(obj): + ''' + Creates an un-saved role for all the implicit role fields on an object + ''' + for fd in obj._meta.fields: + if not isinstance(fd, ImplicitRoleField): + continue + r = Role(role_field=fd.name) + setattr(obj, fd.name, r) + with mock.patch('django.contrib.contenttypes.fields.GenericForeignKey.get_content_type') as mck_ct: + mck_ct.return_value = ContentType(model=obj._meta.model_name) + r.content_object = obj + + +@pytest.fixture +def system_administrator(): + return Role( + role_field=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, + singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR + ) + + +@pytest.fixture +def system_auditor(): + return Role( + role_field=ROLE_SINGLETON_SYSTEM_AUDITOR, + singleton_name=ROLE_SINGLETON_SYSTEM_AUDITOR + ) + + +@pytest.fixture +def organization(): + o = Organization(name='unit-test-org') + apply_fake_roles(o) + return o + + +@pytest.fixture +def project(organization): + p = Project(name='unit-test-proj', organization=organization) + apply_fake_roles(p) + return p + + +@pytest.fixture +def job_template(project): + jt = JobTemplate(name='unit-test-jt', project=project) + apply_fake_roles(jt) + return jt + + +class TestIsImplicitParent: + ''' + Tests to confirm that `is_implicit_parent` gives the right answers + ''' + def test_sys_admin_implicit_parent(self, organization, system_administrator): + assert is_implicit_parent( + parent_role=system_administrator, + child_role=organization.admin_role + ) + + + def test_admin_is_parent_of_member_role(self, organization): + assert is_implicit_parent( + parent_role=organization.admin_role, + child_role=organization.member_role + ) + + def test_member_is_not_parent_of_admin_role(self, organization): + assert not is_implicit_parent( + parent_role=organization.member_role, + child_role=organization.admin_role + ) + + def test_second_level_implicit_parent_role(self, job_template, organization): + assert is_implicit_parent( + parent_role=organization.admin_role, + child_role=job_template.admin_role + ) + + def test_second_level_is_not_an_implicit_parent_role(self, job_template, organization): + assert not is_implicit_parent( + parent_role=organization.member_role, + child_role=job_template.admin_role + )