awx/awx/main/fields.py
Alan Rominger a192077be4 Merge pull request #4025 from AlanCoding/text_type
Change JSONField to be a DB text type
2016-11-18 11:50:13 -05:00

264 lines
10 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import json
# Django
from django.db.models.signals import (
post_save,
post_delete,
)
from django.db.models.signals import m2m_changed
from django.db import models
from django.db.models.fields.related import (
add_lazy_relation,
SingleRelatedObjectDescriptor,
ReverseSingleRelatedObjectDescriptor,
ManyRelatedObjectsDescriptor,
ReverseManyRelatedObjectsDescriptor,
)
from django.utils.encoding import smart_text
# Django-JSONField
from jsonfield import JSONField as upstream_JSONField
# AWX
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
from awx.main.utils import get_current_apps
__all__ = ['AutoOneToOneField', 'ImplicitRoleField', 'JSONField']
class JSONField(upstream_JSONField):
def db_type(self, connection):
return 'text'
def from_db_value(self, value, expression, connection, context):
if value in {'', None} and not self.null:
return {}
return super(JSONField, self).from_db_value(value, expression, connection, context)
# Based on AutoOneToOneField from django-annoying:
# https://bitbucket.org/offline/django-annoying/src/a0de8b294db3/annoying/fields.py
class AutoSingleRelatedObjectDescriptor(SingleRelatedObjectDescriptor):
"""Descriptor for access to the object from its related class."""
def __get__(self, instance, instance_type=None):
try:
return super(AutoSingleRelatedObjectDescriptor,
self).__get__(instance, instance_type)
except self.related.related_model.DoesNotExist:
obj = self.related.related_model(**{self.related.field.name: instance})
if self.related.field.rel.parent_link:
raise NotImplementedError('not supported with polymorphic!')
for f in instance._meta.local_fields:
setattr(obj, f.name, getattr(instance, f.name))
obj.save()
return obj
class AutoOneToOneField(models.OneToOneField):
"""OneToOneField that creates related object if it doesn't exist."""
def contribute_to_related_class(self, cls, related):
setattr(cls, related.get_accessor_name(),
AutoSingleRelatedObjectDescriptor(related))
def resolve_role_field(obj, field):
ret = []
field_components = field.split('.', 1)
if hasattr(obj, field_components[0]):
obj = getattr(obj, field_components[0])
else:
return []
if obj is None:
return []
if len(field_components) == 1:
role_cls = str(get_current_apps().get_model('main', 'Role'))
if not str(type(obj)) == role_cls:
raise Exception(smart_text('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
else:
if type(obj) is ManyRelatedObjectsDescriptor:
for o in obj.all():
ret += resolve_role_field(o, field_components[1])
else:
ret += resolve_role_field(obj, field_components[1])
return ret
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
pass
class ImplicitRoleField(models.ForeignKey):
"""Implicitly creates a role entry for a resource"""
def __init__(self, parent_role=None, *args, **kwargs):
self.parent_role = parent_role
kwargs.setdefault('to', 'Role')
kwargs.setdefault('related_name', '+')
kwargs.setdefault('null', 'True')
super(ImplicitRoleField, self).__init__(*args, **kwargs)
def deconstruct(self):
name, path, args, kwargs = super(ImplicitRoleField, self).deconstruct()
kwargs['parent_role'] = self.parent_role
return name, path, args, kwargs
def contribute_to_class(self, cls, name):
super(ImplicitRoleField, self).contribute_to_class(cls, name)
setattr(cls, self.name, ImplicitRoleDescriptor(self))
if not hasattr(cls, '__implicit_role_fields'):
setattr(cls, '__implicit_role_fields', [])
getattr(cls, '__implicit_role_fields').append(self)
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
add_lazy_relation(cls, self, "self", self.bind_m2m_changed)
def bind_m2m_changed(self, _self, _role_class, cls):
if not self.parent_role:
return
field_names = self.parent_role
if type(field_names) is not list:
field_names = [field_names]
for field_name in field_names:
# Handle the OR syntax for role parents
if type(field_name) == tuple:
continue
if field_name.startswith('singleton:'):
continue
field_name, sep, field_attr = field_name.partition('.')
field = getattr(cls, field_name)
if type(field) is ReverseManyRelatedObjectsDescriptor or \
type(field) is ManyRelatedObjectsDescriptor:
if '.' in field_attr:
raise Exception('Referencing deep roles through ManyToMany fields is unsupported.')
if type(field) is ReverseManyRelatedObjectsDescriptor:
sender = field.through
else:
sender = field.related.through
reverse = type(field) is ManyRelatedObjectsDescriptor
m2m_changed.connect(self.m2m_update(field_attr, reverse), sender, weak=False)
def m2m_update(self, field_attr, _reverse):
def _m2m_update(instance, action, model, pk_set, reverse, **kwargs):
if action == 'post_add' or action == 'pre_remove':
if _reverse:
reverse = not reverse
if reverse:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).parents.add(getattr(obj, field_attr))
if action == 'pre_remove':
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _post_save(self, instance, created, *args, **kwargs):
Role_ = get_current_apps().get_model('main', 'Role')
ContentType_ = get_current_apps().get_model('contenttypes', 'ContentType')
ct_id = ContentType_.objects.get_for_model(instance).id
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(
Role_(
role_field=implicit_role_field.name,
content_type_id=ct_id,
object_id=instance.id
)
)
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=instance.id):
setattr(instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(instance).objects.filter(pk=instance.pk).update(**updates)
Role.rebuild_role_ancestor_list(role_ids, [])
# Update parentage if necessary
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
cur_role.parents.remove(*list(original_parents - new_parents))
cur_role.parents.add(*list(new_parents - original_parents))
new_parents_list = list(new_parents)
new_parents_list.sort()
new_parents_json = json.dumps(new_parents_list)
if cur_role.implicit_parents != new_parents_json:
cur_role.implicit_parents = new_parents_json
cur_role.save()
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
else:
role = Role_.objects.create(singleton_name=singleton_name, role_field=singleton_name)
parents = [role.id]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
parent_roles.add(parent)
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
Role.rebuild_role_ancestor_list([], child_ids)