Merge pull request #2860 from AlanCoding/auditor_association

Show entry for system auditor associations

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2019-04-17 10:48:37 +00:00 committed by GitHub
commit 4fd04e095f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 40 deletions

View File

@ -893,7 +893,7 @@ class UserSerializer(BaseSerializer):
def get_validation_exclusions(self, obj=None):
ret = super(UserSerializer, self).get_validation_exclusions(obj)
ret.append('password')
ret.extend(['password', 'is_system_auditor'])
return ret
def validate_password(self, value):
@ -937,14 +937,20 @@ class UserSerializer(BaseSerializer):
def create(self, validated_data):
new_password = validated_data.pop('password', None)
is_system_auditor = validated_data.pop('is_system_auditor', None)
obj = super(UserSerializer, self).create(validated_data)
self._update_password(obj, new_password)
if is_system_auditor is not None:
obj.is_system_auditor = is_system_auditor
return obj
def update(self, obj, validated_data):
new_password = validated_data.pop('password', None)
is_system_auditor = validated_data.pop('is_system_auditor', None)
obj = super(UserSerializer, self).update(obj, validated_data)
self._update_password(obj, new_password)
if is_system_auditor is not None:
obj.is_system_auditor = is_system_auditor
return obj
def get_related(self, obj):
@ -996,6 +1002,16 @@ class UserSerializer(BaseSerializer):
return self._validate_ldap_managed_field(value, 'is_superuser')
class UserActivityStreamSerializer(UserSerializer):
"""Changes to system auditor status are shown as separate entries,
so by excluding it from fields here we avoid duplication, which
would carry some unintended consequences.
"""
class Meta:
model = User
fields = ('*', '-is_system_auditor')
class BaseOAuth2TokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()

View File

@ -922,21 +922,6 @@ class UserList(ListCreateAPIView):
serializer_class = serializers.UserSerializer
permission_classes = (UserPermission,)
def post(self, request, *args, **kwargs):
ret = super(UserList, self).post( request, *args, **kwargs)
try:
if request.data.get('is_system_auditor', False):
# This is a faux-field that just maps to checking the system
# auditor role member list.. unfortunately this means we can't
# set it on creation, and thus needs to be set here.
user = models.User.objects.get(id=ret.data['id'])
user.is_system_auditor = request.data['is_system_auditor']
ret.data['is_system_auditor'] = request.data['is_system_auditor']
except AttributeError as exc:
print(exc)
pass
return ret
class UserMeList(ListAPIView):

View File

@ -43,7 +43,10 @@ from rest_framework import serializers
from awx.main.utils.filters import SmartFilter
from awx.main.utils.encryption import encrypt_value, decrypt_value, get_encryption_key
from awx.main.validators import validate_ssh_private_key
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
from awx.main.models.rbac import (
batch_role_ancestor_rebuilding, Role,
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ROLE_SINGLETON_SYSTEM_AUDITOR
)
from awx.main.constants import ENV_BLACKLIST
from awx.main import utils
@ -159,6 +162,13 @@ def is_implicit_parent(parent_role, child_role):
the model definition. This does not include any role parents that
might have been set by the user.
'''
if child_role.content_object is None:
# The only singleton implicit parent is the system admin being
# a parent of the system auditor role
return bool(
child_role.singleton_name == ROLE_SINGLETON_SYSTEM_AUDITOR and
parent_role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
)
# Get the list of implicit parents that were defined at the class level.
implicit_parents = getattr(
child_role.content_object.__class__, child_role.role_field

View File

@ -142,10 +142,15 @@ def user_is_system_auditor(user):
def user_is_system_auditor(user, tf):
if user.id:
if tf:
Role.singleton('system_auditor').members.add(user)
role = Role.singleton('system_auditor')
# must check if member to not duplicate activity stream
if user not in role.members.all():
role.members.add(user)
user._is_system_auditor = True
else:
Role.singleton('system_auditor').members.remove(user)
role = Role.singleton('system_auditor')
if user in role.members.all():
role.members.remove(user)
user._is_system_auditor = False

View File

@ -76,6 +76,11 @@ class ActivityStream(models.Model):
setting = JSONField(blank=True)
def __str__(self):
operation = self.operation if 'operation' in self.__dict__ else '_delayed_'
timestamp = self.timestamp.isoformat() if 'timestamp' in self.__dict__ else '_delayed_'
return u'%s-%s-pk=%s' % (operation, timestamp, self.pk)
def get_absolute_url(self, request=None):
return reverse('api:activity_stream_detail', kwargs={'pk': self.pk}, request=request)

View File

@ -20,7 +20,6 @@ from django.db.models.signals import (
)
from django.dispatch import receiver
from django.contrib.auth import SESSION_KEY
from django.contrib.contenttypes.models import ContentType
from django.contrib.sessions.models import Session
from django.utils import timezone
@ -192,22 +191,28 @@ def sync_superuser_status_to_rbac(instance, **kwargs):
def rbac_activity_stream(instance, sender, **kwargs):
user_type = ContentType.objects.get_for_model(User)
# Only if we are associating/disassociating
if kwargs['action'] in ['pre_add', 'pre_remove']:
# Only if this isn't for the User.admin_role
if hasattr(instance, 'content_type'):
if instance.content_type in [None, user_type]:
if hasattr(instance, 'content_type'): # Duck typing, migration-independent isinstance(instance, Role)
if instance.content_type_id is None and instance.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR:
# Skip entries for the system admin role because user serializer covers it
# System auditor role is shown in the serializer, but its relationship is
# managed separately, its value is incorrect, and a correction entry is needed
return
elif sender.__name__ == 'Role_parents':
# This juggles which role to use, because could be A->B or B->A association
if sender.__name__ == 'Role_parents':
role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first()
# don't record implicit creation / parents in activity stream
if role is not None and is_implicit_parent(parent_role=role, child_role=instance):
return
else:
role = instance
instance = instance.content_object
# If a singleton role is the instance, the singleton role is acted on
# otherwise the related object is considered to be acted on
if instance.content_object:
instance = instance.content_object
else:
# Association with actor, like role->user
role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first()
activity_stream_associate(sender, instance, role=role, **kwargs)
@ -403,6 +408,7 @@ def model_serializer_mapping():
from awx.conf.serializers import SettingSerializer
return {
Setting: SettingSerializer,
models.User: serializers.UserActivityStreamSerializer,
models.Organization: serializers.OrganizationSerializer,
models.Inventory: serializers.InventorySerializer,
models.Host: serializers.HostSerializer,

View File

@ -11,7 +11,8 @@ from awx.main.models import (
Credential,
CredentialType,
Inventory,
InventorySource
InventorySource,
User
)
# other AWX
@ -75,31 +76,57 @@ class TestImplicitRolesOmitted:
assert qs[0].operation == 'create'
@pytest.mark.django_db
class TestRolesAssociationEntries:
'''
Test that non-implicit role associations have a corresponding
activity stream entry.
These tests will fail if `rbac_activity_stream` skipping logic
finds a false-negative.
in signals is wrong.
'''
@pytest.mark.django_db
def test_non_implicit_associations_are_recorded(self, project):
org2 = Organization.objects.create(name='test-organization2')
project.admin_role.parents.add(org2.admin_role)
assert ActivityStream.objects.filter(
role=org2.admin_role,
organization=org2,
project=project
).count() == 1
# check that duplicate adds do not get recorded in 2nd loop
for i in range(2):
# Not supported, should not be possible via API
# org2.admin_role.children.add(project.admin_role)
project.admin_role.parents.add(org2.admin_role)
assert ActivityStream.objects.filter(
role=org2.admin_role,
organization=org2,
project=project
).count() == 1, 'In loop %s' % i
@pytest.mark.django_db
def test_model_associations_are_recorded(self, organization):
proj1 = organization.projects.create(name='proj1')
proj2 = organization.projects.create(name='proj2')
proj2.use_role.parents.add(proj1.admin_role)
assert ActivityStream.objects.filter(role=proj1.admin_role, project=proj2).count() == 1
@pytest.mark.parametrize('value', [True, False])
def test_auditor_is_recorded(self, post, value):
u = User.objects.create(username='foouser')
assert not u.is_system_auditor
u.is_system_auditor = value
u = User.objects.get(pk=u.pk) # refresh from db
assert u.is_system_auditor == value
entry_qs = ActivityStream.objects.filter(user=u)
if value:
assert len(entry_qs) == 2
else:
assert len(entry_qs) == 1
# unfortunate, the original creation does _not_ set a real is_auditor field
assert 'is_system_auditor' not in json.loads(entry_qs[0].changes)
if value:
auditor_changes = json.loads(entry_qs[1].changes)
assert auditor_changes['object2'] == 'user'
assert auditor_changes['object2_pk'] == u.pk
def test_user_no_op_api(self, system_auditor):
as_ct = ActivityStream.objects.count()
system_auditor.is_system_auditor = True # already auditor
assert ActivityStream.objects.count() == as_ct
@pytest.fixture

View File

@ -423,10 +423,8 @@ def model_to_dict(obj, serializer_mapping=None):
allowed_fields = get_allowed_fields(obj, serializer_mapping)
for field in obj._meta.fields:
if field.name not in allowed_fields:
continue
attr_d[field.name] = _convert_model_field_for_display(obj, field.name, password_fields=password_fields)
for field_name in allowed_fields:
attr_d[field_name] = _convert_model_field_for_display(obj, field_name, password_fields=password_fields)
return attr_d