Files
awx/awx/main/fields.py
John Westcott IV cd4d83acb7 Compensating for NUL unicode characters
NUL characters are not allowed in text fields in the database

We used to strip them out of stdout but the exception changed

And we want to be sure to strip them out of JSONBlob fields
2023-06-14 17:40:15 -04:00

1053 lines
43 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import copy
import json
import re
import urllib.parse
from jinja2 import sandbox, StrictUndefined
from jinja2.exceptions import UndefinedError, TemplateSyntaxError, SecurityError
# Django
from django.core import exceptions as django_exceptions
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models.signals import (
post_save,
post_delete,
)
from django.db.models.signals import m2m_changed
from django.db import models
from django.db.models.fields.related import lazy_related_operation
from django.db.models.fields.related_descriptors import (
ReverseOneToOneDescriptor,
ForwardManyToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
create_forward_many_to_many_manager,
)
from django.utils.encoding import smart_str
from django.db.models import JSONField
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
# jsonschema
from jsonschema import Draft4Validator, FormatChecker
import jsonschema.exceptions
# DRF
from rest_framework import serializers
# AWX
from awx.main.utils.filters import SmartFilter
from awx.main.utils.encryption import encrypt_value, decrypt_value, get_encryption_key
from awx.main.validators import validate_ssh_private_key
from awx.main.constants import ENV_BLOCKLIST
from awx.main import utils
__all__ = [
'JSONBlob',
'AutoOneToOneField',
'ImplicitRoleField',
'SmartFilterField',
'OrderedManyToManyField',
'update_role_parentage_for_instance',
'is_implicit_parent',
]
# Provide a (better) custom error message for enum jsonschema validation
def __enum_validate__(validator, enums, instance, schema):
if instance not in enums:
yield jsonschema.exceptions.ValidationError(_("'{value}' is not one of ['{allowed_values}']").format(value=instance, allowed_values="', '".join(enums)))
Draft4Validator.VALIDATORS['enum'] = __enum_validate__
import logging
logger = logging.getLogger('awx.main.fields')
class JSONBlob(JSONField):
# Cringe... a JSONField that is back ended with a TextField.
# This field was a legacy custom field type that tl;dr; was a TextField
# Over the years, with Django upgrades, we were able to go to a JSONField instead of the custom field
# However, we didn't want to have large customers with millions of events to update from text to json during an upgrade
# So we keep this field type as backended with TextField.
def get_internal_type(self):
return "TextField"
# postgres uses a Jsonb field as the default backend
# with psycopg2 it was using a psycopg2._json.Json class internally
# with psycopg3 it uses a psycopg.types.json.Jsonb class internally
# The binary class was not compatible with a text field, so we are going to override these next two methods and ensure we are using a string
def from_db_value(self, value, expression, connection):
if value is None:
return value
if isinstance(value, str):
try:
return json.loads(value)
except Exception as e:
logger.error(f"Failed to load JSONField {self.name}: {e}")
return value
def get_db_prep_value(self, value, connection, prepared=False):
if not prepared:
value = self.get_prep_value(value)
try:
# Null characters are not allowed in text fields and JSONBlobs are JSON data but saved as text
# So we want to make sure we strip out any null characters also note, these "should" be escaped by the dumps process:
# >>> my_obj = { 'test': '\x00' }
# >>> import json
# >>> json.dumps(my_obj)
# '{"test": "\\u0000"}'
# But just to be safe, lets remove them if they are there. \x00 and \u0000 are the same:
# >>> string = "\x00"
# >>> "\u0000" in string
# True
dumped_value = json.dumps(value)
if "\x00" in dumped_value:
dumped_value = dumped_value.replace("\x00", '')
return dumped_value
except Exception as e:
logger.error(f"Failed to dump JSONField {self.name}: {e} value: {value}")
return value
# Based on AutoOneToOneField from django-annoying:
# https://bitbucket.org/offline/django-annoying/src/a0de8b294db3/annoying/fields.py
class AutoSingleRelatedObjectDescriptor(ReverseOneToOneDescriptor):
"""Descriptor for access to the object from its related class."""
def __get__(self, instance, instance_type=None):
try:
return super(AutoSingleRelatedObjectDescriptor, self).__get__(instance, instance_type)
except self.related.related_model.DoesNotExist:
obj = self.related.related_model(**{self.related.field.name: instance})
obj.save()
return obj
class AutoOneToOneField(models.OneToOneField):
"""OneToOneField that creates related object if it doesn't exist."""
def contribute_to_related_class(self, cls, related):
setattr(cls, related.get_accessor_name(), AutoSingleRelatedObjectDescriptor(related))
def resolve_role_field(obj, field):
ret = []
field_components = field.split('.', 1)
if hasattr(obj, field_components[0]):
obj = getattr(obj, field_components[0])
else:
return []
if obj is None:
return []
if len(field_components) == 1:
# use extremely generous duck typing to accomidate all possible forms
# of the model that may be used during various migrations
if obj._meta.model_name != 'role' or obj._meta.app_label != 'main':
raise Exception(smart_str('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
else:
if type(obj) is ManyToManyDescriptor:
for o in obj.all():
ret += resolve_role_field(o, field_components[1])
else:
ret += resolve_role_field(obj, field_components[1])
return ret
def is_implicit_parent(parent_role, child_role):
"""
Determine if the parent_role is an implicit parent as defined by
the model definition. This does not include any role parents that
might have been set by the user.
"""
# Avoid circular import
from awx.main.models.rbac import ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ROLE_SINGLETON_SYSTEM_AUDITOR
if child_role.content_object is None:
# The only singleton implicit parent is the system admin being
# a parent of the system auditor role
return bool(child_role.singleton_name == ROLE_SINGLETON_SYSTEM_AUDITOR and parent_role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR)
# Get the list of implicit parents that were defined at the class level.
implicit_parents = getattr(child_role.content_object.__class__, child_role.role_field).field.parent_role
if type(implicit_parents) != list:
implicit_parents = [implicit_parents]
# Check to see if the role matches any in the implicit parents list
for implicit_parent_path in implicit_parents:
if implicit_parent_path.startswith('singleton:'):
# Singleton role isn't an object role, `singleton_name` uniquely identifies it
if parent_role.is_singleton() and parent_role.singleton_name == implicit_parent_path[10:]:
return True
else:
# Walk over multiple related objects to obtain the implicit parent
related_obj = child_role.content_object
for next_field in implicit_parent_path.split('.'):
related_obj = getattr(related_obj, next_field)
if related_obj is None:
break
if related_obj and parent_role == related_obj:
return True
return False
def update_role_parentage_for_instance(instance):
"""update_role_parentage_for_instance
updates the parents listing for all the roles
of a given instance if they have changed
"""
parents_removed = set()
parents_added = set()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
removals = original_parents - new_parents
if removals:
cur_role.parents.remove(*list(removals))
parents_removed.add(cur_role.pk)
additions = new_parents - original_parents
if additions:
cur_role.parents.add(*list(additions))
parents_added.add(cur_role.pk)
new_parents_list = list(new_parents)
new_parents_list.sort()
new_parents_json = json.dumps(new_parents_list)
if cur_role.implicit_parents != new_parents_json:
cur_role.implicit_parents = new_parents_json
cur_role.save(update_fields=['implicit_parents'])
return (parents_added, parents_removed)
class ImplicitRoleDescriptor(ForwardManyToOneDescriptor):
pass
class ImplicitRoleField(models.ForeignKey):
"""Implicitly creates a role entry for a resource"""
def __init__(self, parent_role=None, *args, **kwargs):
self.parent_role = parent_role
kwargs.setdefault('to', 'Role')
kwargs.setdefault('related_name', '+')
kwargs.setdefault('null', 'True')
kwargs.setdefault('editable', False)
kwargs.setdefault('on_delete', models.CASCADE)
super(ImplicitRoleField, self).__init__(*args, **kwargs)
def deconstruct(self):
name, path, args, kwargs = super(ImplicitRoleField, self).deconstruct()
kwargs['parent_role'] = self.parent_role
return name, path, args, kwargs
def contribute_to_class(self, cls, name):
super(ImplicitRoleField, self).contribute_to_class(cls, name)
setattr(cls, self.name, ImplicitRoleDescriptor(self))
if not hasattr(cls, '__implicit_role_fields'):
setattr(cls, '__implicit_role_fields', [])
getattr(cls, '__implicit_role_fields').append(self)
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
function = lambda local, related, field: self.bind_m2m_changed(field, related, local)
lazy_related_operation(function, cls, "self", field=self)
def bind_m2m_changed(self, _self, _role_class, cls):
if not self.parent_role:
return
field_names = self.parent_role
if type(field_names) is not list:
field_names = [field_names]
for field_name in field_names:
if field_name.startswith('singleton:'):
continue
field_name, sep, field_attr = field_name.partition('.')
# Non existent fields will occur if ever a parent model is
# moved inside a migration, needed for job_template_organization_field
# migration in particular
# consistency is assured by unit test awx.main.tests.functional
field = getattr(cls, field_name, None)
if field and type(field) is ReverseManyToOneDescriptor or type(field) is ManyToManyDescriptor:
if '.' in field_attr:
raise Exception('Referencing deep roles through ManyToMany fields is unsupported.')
if type(field) is ReverseManyToOneDescriptor:
sender = field.through
else:
sender = field.related.through
reverse = type(field) is ManyToManyDescriptor
m2m_changed.connect(self.m2m_update(field_attr, reverse), sender, weak=False)
def m2m_update(self, field_attr, _reverse):
def _m2m_update(instance, action, model, pk_set, reverse, **kwargs):
if action == 'post_add' or action == 'pre_remove':
if _reverse:
reverse = not reverse
if reverse:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).parents.add(getattr(obj, field_attr))
if action == 'pre_remove':
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _post_save(self, instance, created, *args, **kwargs):
Role_ = utils.get_current_apps().get_model('main', 'Role')
ContentType_ = utils.get_current_apps().get_model('contenttypes', 'ContentType')
ct_id = ContentType_.objects.get_for_model(instance).id
Model = utils.get_current_apps().get_model('main', instance.__class__.__name__)
latest_instance = Model.objects.get(pk=instance.pk)
# Avoid circular import
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
cur_role = getattr(latest_instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(Role_(role_field=implicit_role_field.name, content_type_id=ct_id, object_id=latest_instance.id))
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=latest_instance.id):
setattr(latest_instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
Role.rebuild_role_ancestor_list(role_ids, [])
update_role_parentage_for_instance(latest_instance)
instance.refresh_from_db()
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = utils.get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
else:
role = Role_.objects.create(singleton_name=singleton_name, role_field=singleton_name)
parents = [role.id]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
parent_roles.add(parent)
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = utils.get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
# Avoid circular import
from awx.main.models.rbac import Role
Role.rebuild_role_ancestor_list([], child_ids)
class SmartFilterField(models.TextField):
def get_prep_value(self, value):
# Change any false value to none.
# https://docs.python.org/2/library/stdtypes.html#truth-value-testing
if not value:
return None
value = urllib.parse.unquote(value)
try:
SmartFilter().query_from_string(value)
except RuntimeError as e:
raise models.base.ValidationError(e)
return super(SmartFilterField, self).get_prep_value(value)
class JSONSchemaField(models.JSONField):
"""
A JSONB field that self-validates against a defined JSON schema
(http://json-schema.org). This base class is intended to be overwritten by
defining `self.schema`.
"""
format_checker = FormatChecker()
# If an empty {} is provided, we still want to perform this schema
# validation
empty_values = (None, '')
def __init__(self, encoder=None, decoder=None, **options):
if encoder is None:
encoder = DjangoJSONEncoder
super().__init__(encoder=encoder, decoder=decoder, **options)
def get_default(self):
return copy.deepcopy(super(models.JSONField, self).get_default())
def schema(self, model_instance):
raise NotImplementedError()
def validate(self, value, model_instance):
super(JSONSchemaField, self).validate(value, model_instance)
errors = []
for error in Draft4Validator(self.schema(model_instance), format_checker=self.format_checker).iter_errors(value):
if error.validator == 'pattern' and 'error' in error.schema:
error.message = error.schema['error'].format(instance=error.instance)
elif error.validator == 'type':
expected_type = error.validator_value
if expected_type == 'object':
expected_type = 'dict'
if error.path:
error.message = _('{type} provided in relative path {path}, expected {expected_type}').format(
path=list(error.path), type=type(error.instance).__name__, expected_type=expected_type
)
else:
error.message = _('{type} provided, expected {expected_type}').format(
path=list(error.path), type=type(error.instance).__name__, expected_type=expected_type
)
elif error.validator == 'additionalProperties' and hasattr(error, 'path'):
error.message = _('Schema validation error in relative path {path} ({error})').format(path=list(error.path), error=error.message)
errors.append(error)
if errors:
raise django_exceptions.ValidationError(
[e.message for e in errors],
code='invalid',
params={'value': value},
)
@JSONSchemaField.format_checker.checks('vault_id')
def format_vault_id(value):
if '@' in value:
raise jsonschema.exceptions.FormatError('@ is not an allowed character')
return True
@JSONSchemaField.format_checker.checks('ssh_private_key')
def format_ssh_private_key(value):
# Sanity check: GCE, in particular, provides JSON-encoded private
# keys, which developers will be tempted to copy and paste rather
# than JSON decode.
#
# These end in a unicode-encoded final character that gets double
# escaped due to being in a Python 2 bytestring, and that causes
# Python's key parsing to barf. Detect this issue and correct it.
if not value or value == '$encrypted$':
return True
if r'\u003d' in value:
value = value.replace(r'\u003d', '=')
try:
validate_ssh_private_key(value)
except django_exceptions.ValidationError as e:
raise jsonschema.exceptions.FormatError(e.message)
return True
@JSONSchemaField.format_checker.checks('url')
def format_url(value):
try:
parsed = urllib.parse.urlparse(value)
except Exception as e:
raise jsonschema.exceptions.FormatError(str(e))
if parsed.scheme == '':
raise jsonschema.exceptions.FormatError('Invalid URL: Missing url scheme (http, https, etc.)')
if parsed.netloc == '':
raise jsonschema.exceptions.FormatError('Invalid URL: {}'.format(value))
return True
class DynamicCredentialInputField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:CredentialInputSource().metadata`.
Metadata for input sources is represented as a dictionary e.g.,
{'secret_path': '/kv/somebody', 'secret_key': 'password'}
For the data to be valid, the keys of this dictionary should correspond
with the metadata field (and datatypes) defined in the associated
target CredentialType e.g.,
"""
def schema(self, credential_type):
# determine the defined fields for the associated credential type
properties = {}
for field in credential_type.inputs.get('metadata', []):
field = field.copy()
properties[field['id']] = field
if field.get('choices', []):
field['enum'] = list(field['choices'])[:]
return {
'type': 'object',
'properties': properties,
'additionalProperties': False,
}
def validate(self, value, model_instance):
if not isinstance(value, dict):
return super(DynamicCredentialInputField, self).validate(value, model_instance)
super(JSONSchemaField, self).validate(value, model_instance)
credential_type = model_instance.source_credential.credential_type
errors = {}
for error in Draft4Validator(self.schema(credential_type), format_checker=self.format_checker).iter_errors(value):
if error.validator == 'pattern' and 'error' in error.schema:
error.message = error.schema['error'].format(instance=error.instance)
if 'id' not in error.schema:
# If the error is not for a specific field, it's specific to
# `inputs` in general
raise django_exceptions.ValidationError(
error.message,
code='invalid',
params={'value': value},
)
errors[error.schema['id']] = [error.message]
defined_metadata = [field.get('id') for field in credential_type.inputs.get('metadata', [])]
for field in credential_type.inputs.get('required', []):
if field in defined_metadata and not value.get(field, None):
errors[field] = [_('required for %s') % (credential_type.name)]
if errors:
raise serializers.ValidationError({'metadata': errors})
class CredentialInputField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:Credential().inputs`.
Input data for credentials is represented as a dictionary e.g.,
{'api_token': 'abc123', 'api_secret': 'SECRET'}
For the data to be valid, the keys of this dictionary should correspond
with the field names (and datatypes) defined in the associated
CredentialType e.g.,
{
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string'
}, {
'id': 'api_secret',
'label': 'API Secret',
'type': 'string'
}]
}
"""
def schema(self, model_instance):
# determine the defined fields for the associated credential type
properties = {}
for field in model_instance.credential_type.inputs.get('fields', []):
field = field.copy()
properties[field['id']] = field
if field.get('choices', []):
field['enum'] = list(field['choices'])[:]
return {
'type': 'object',
'properties': properties,
'dependencies': model_instance.credential_type.inputs.get('dependencies', {}),
'additionalProperties': False,
}
def validate(self, value, model_instance):
# decrypt secret values so we can validate their contents (i.e.,
# ssh_key_data format)
if not isinstance(value, dict):
return super(CredentialInputField, self).validate(value, model_instance)
# Backwards compatability: in prior versions, if you submit `null` for
# a credential field value, it just considers the value an empty string
for unset in [key for key, v in model_instance.inputs.items() if not v]:
default_value = model_instance.credential_type.default_for_field(unset)
if default_value is not None:
model_instance.inputs[unset] = default_value
decrypted_values = {}
for k, v in value.items():
if all([k in model_instance.credential_type.secret_fields, v != '$encrypted$', model_instance.pk]):
if not isinstance(model_instance.inputs.get(k), str):
raise django_exceptions.ValidationError(
_('secret values must be of type string, not {}').format(type(v).__name__),
code='invalid',
params={'value': v},
)
decrypted_values[k] = utils.decrypt_field(model_instance, k)
else:
decrypted_values[k] = v
# don't allow secrets with $encrypted$ on new object creation
if not model_instance.pk:
for field in model_instance.credential_type.secret_fields:
if value.get(field) == '$encrypted$':
raise serializers.ValidationError({self.name: [f'$encrypted$ is a reserved keyword, and cannot be used for {field}.']})
super(JSONSchemaField, self).validate(decrypted_values, model_instance)
errors = {}
for error in Draft4Validator(self.schema(model_instance), format_checker=self.format_checker).iter_errors(decrypted_values):
if error.validator == 'pattern' and 'error' in error.schema:
error.message = error.schema['error'].format(instance=error.instance)
if error.validator == 'dependencies':
# replace the default error messaging w/ a better i18n string
# I wish there was a better way to determine the parameters of
# this validation failure, but the exception jsonschema raises
# doesn't include them as attributes (just a hard-coded error
# string)
match = re.search(
# 'foo' is a dependency of 'bar'
r"'" # apostrophe
r"([^']+)" # one or more non-apostrophes (first group)
r"'[\w ]+'" # one or more words/spaces
r"([^']+)", # second group
error.message,
)
if match:
label, extraneous = match.groups()
if error.schema['properties'].get(label):
label = error.schema['properties'][label]['label']
errors[extraneous] = [_('cannot be set unless "%s" is set') % label]
continue
if 'id' not in error.schema:
# If the error is not for a specific field, it's specific to
# `inputs` in general
raise django_exceptions.ValidationError(
error.message,
code='invalid',
params={'value': value},
)
errors[error.schema['id']] = [error.message]
defined_fields = model_instance.credential_type.defined_fields
# `ssh_key_unlock` requirements are very specific and can't be
# represented without complicated JSON schema
if model_instance.credential_type.managed is True and 'ssh_key_unlock' in defined_fields:
# in order to properly test the necessity of `ssh_key_unlock`, we
# need to know the real value of `ssh_key_data`; for a payload like:
# {
# 'ssh_key_data': '$encrypted$',
# 'ssh_key_unlock': 'do-you-need-me?',
# }
# ...we have to fetch the actual key value from the database
if model_instance.pk and model_instance.inputs.get('ssh_key_data') == '$encrypted$':
model_instance.inputs['ssh_key_data'] = model_instance.__class__.objects.get(pk=model_instance.pk).inputs.get('ssh_key_data')
if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'):
errors['ssh_key_unlock'] = [_('must be set when SSH key is encrypted.')]
if all(
[
model_instance.inputs.get('ssh_key_data'),
value.get('ssh_key_unlock'),
not model_instance.has_encrypted_ssh_key_data,
'ssh_key_data' not in errors,
]
):
errors['ssh_key_unlock'] = [_('should not be set when SSH key is not encrypted.')]
if errors:
raise serializers.ValidationError({'inputs': errors})
class CredentialTypeInputField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:CredentialType().inputs`.
"""
def schema(self, model_instance):
return {
'type': 'object',
'additionalProperties': False,
'properties': {
'required': {'type': 'array', 'items': {'type': 'string'}},
'fields': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'type': {'enum': ['string', 'boolean']},
'format': {'enum': ['ssh_private_key', 'url']},
'choices': {'type': 'array', 'minItems': 1, 'items': {'type': 'string'}, 'uniqueItems': True},
'id': {
'type': 'string',
'pattern': '^[a-zA-Z_]+[a-zA-Z0-9_]*$',
'error': '{instance} is an invalid variable name',
},
'label': {'type': 'string'},
'help_text': {'type': 'string'},
'multiline': {'type': 'boolean'},
'secret': {'type': 'boolean'},
'ask_at_runtime': {'type': 'boolean'},
'default': {},
},
'additionalProperties': False,
'required': ['id', 'label'],
},
},
},
}
def validate(self, value, model_instance):
if isinstance(value, dict) and 'dependencies' in value and not model_instance.managed:
raise django_exceptions.ValidationError(
_("'dependencies' is not supported for custom credentials."),
code='invalid',
params={'value': value},
)
super(CredentialTypeInputField, self).validate(value, model_instance)
ids = {}
for field in value.get('fields', []):
id_ = field.get('id')
if id_ == 'tower':
raise django_exceptions.ValidationError(
_('"tower" is a reserved field name'),
code='invalid',
params={'value': value},
)
if id_ in ids:
raise django_exceptions.ValidationError(
_('field IDs must be unique (%s)' % id_),
code='invalid',
params={'value': value},
)
ids[id_] = True
if 'type' not in field:
# If no type is specified, default to string
field['type'] = 'string'
if 'default' in field:
default = field['default']
_type = {'string': str, 'boolean': bool}[field['type']]
if type(default) != _type:
raise django_exceptions.ValidationError(_('{} is not a {}').format(default, field['type']))
for key in (
'choices',
'multiline',
'format',
'secret',
):
if key in field and field['type'] != 'string':
raise django_exceptions.ValidationError(
_(
'{sub_key} not allowed for {element_type} type ({element_id})'.format(
sub_key=key, element_type=field['type'], element_id=field['id']
)
),
code='invalid',
params={'value': value},
)
class CredentialTypeInjectorField(JSONSchemaField):
"""
Used to validate JSON for
`awx.main.models.credential:CredentialType().injectors`.
"""
def schema(self, model_instance):
return {
'type': 'object',
'additionalProperties': False,
'properties': {
'file': {
'type': 'object',
'patternProperties': {
r'^template(\.[a-zA-Z_]+[a-zA-Z0-9_]*)?$': {'type': 'string'},
},
'additionalProperties': False,
},
'env': {
'type': 'object',
'patternProperties': {
# http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html
# In the shell command language, a word consisting solely
# of underscores, digits, and alphabetics from the portable
# character set. The first character of a name is not
# a digit.
'^[a-zA-Z_]+[a-zA-Z0-9_]*$': {
'type': 'string',
# The environment variable _value_ can be any ascii,
# but pexpect will choke on any unicode
'pattern': '^[\x00-\x7F]*$',
},
},
'additionalProperties': False,
},
'extra_vars': {
'type': 'object',
'patternProperties': {
# http://docs.ansible.com/ansible/playbooks_variables.html#what-makes-a-valid-variable-name
# plus, add ability to template
r'^[a-zA-Z_\{\}]+[a-zA-Z0-9_\{\}]*$': {"anyOf": [{'type': 'string'}, {'type': 'array'}, {'$ref': '#/properties/extra_vars'}]}
},
'additionalProperties': False,
},
},
'additionalProperties': False,
}
def validate_env_var_allowed(self, env_var):
if env_var.startswith('ANSIBLE_'):
raise django_exceptions.ValidationError(
_('Environment variable {} may affect Ansible configuration so its use is not allowed in credentials.').format(env_var),
code='invalid',
params={'value': env_var},
)
if env_var in ENV_BLOCKLIST:
raise django_exceptions.ValidationError(
_('Environment variable {} is not allowed to be used in credentials.').format(env_var),
code='invalid',
params={'value': env_var},
)
def validate(self, value, model_instance):
super(CredentialTypeInjectorField, self).validate(value, model_instance)
# make sure the inputs are valid first
try:
CredentialTypeInputField().validate(model_instance.inputs, model_instance)
except django_exceptions.ValidationError:
# If `model_instance.inputs` itself is invalid, we can't make an
# estimation as to whether our Jinja templates contain valid field
# names; don't continue
return
# In addition to basic schema validation, search the injector fields
# for template variables and make sure they match the fields defined in
# the inputs
valid_namespace = dict((field, 'EXAMPLE') for field in model_instance.defined_fields)
class ExplodingNamespace:
def __str__(self):
raise UndefinedError(_('Must define unnamed file injector in order to reference `tower.filename`.'))
class TowerNamespace:
def __init__(self):
self.filename = ExplodingNamespace()
def __str__(self):
raise UndefinedError(_('Cannot directly reference reserved `tower` namespace container.'))
valid_namespace['tower'] = TowerNamespace()
# ensure either single file or multi-file syntax is used (but not both)
template_names = [x for x in value.get('file', {}).keys() if x.startswith('template')]
if 'template' in template_names:
valid_namespace['tower'].filename = 'EXAMPLE_FILENAME'
if len(template_names) > 1:
raise django_exceptions.ValidationError(
_('Must use multi-file syntax when injecting multiple files'),
code='invalid',
params={'value': value},
)
elif template_names:
for template_name in template_names:
template_name = template_name.split('.')[1]
setattr(valid_namespace['tower'].filename, template_name, 'EXAMPLE_FILENAME')
def validate_template_string(type_, key, tmpl):
try:
sandbox.ImmutableSandboxedEnvironment(undefined=StrictUndefined).from_string(tmpl).render(valid_namespace)
except UndefinedError as e:
raise django_exceptions.ValidationError(
_('{sub_key} uses an undefined field ({error_msg})').format(sub_key=key, error_msg=e),
code='invalid',
params={'value': value},
)
except SecurityError as e:
raise django_exceptions.ValidationError(_('Encountered unsafe code execution: {}').format(e))
except TemplateSyntaxError as e:
raise django_exceptions.ValidationError(
_('Syntax error rendering template for {sub_key} inside of {type} ({error_msg})').format(sub_key=key, type=type_, error_msg=e),
code='invalid',
params={'value': value},
)
def validate_extra_vars(key, node):
if isinstance(node, dict):
for k, v in node.items():
validate_template_string("extra_vars", 'a key' if key is None else key, k)
validate_extra_vars(k if key is None else "{key}.{k}".format(key=key, k=k), v)
elif isinstance(node, list):
for i, x in enumerate(node):
validate_extra_vars("{key}[{i}]".format(key=key, i=i), x)
else:
validate_template_string("extra_vars", key, node)
for type_, injector in value.items():
if type_ == 'env':
for key in injector.keys():
self.validate_env_var_allowed(key)
if type_ == 'extra_vars':
validate_extra_vars(None, injector)
else:
for key, tmpl in injector.items():
validate_template_string(type_, key, tmpl)
class AskForField(models.BooleanField):
"""
Denotes whether to prompt on launch for another field on the same template
"""
def __init__(self, allows_field=None, **kwargs):
super(AskForField, self).__init__(**kwargs)
self._allows_field = allows_field
@property
def allows_field(self):
if self._allows_field is None:
try:
return self.name[len('ask_') : -len('_on_launch')]
except AttributeError:
# self.name will be set by the model metaclass, not this field
raise Exception('Corresponding allows_field cannot be accessed until model is initialized.')
return self._allows_field
class OAuth2ClientSecretField(models.CharField):
def get_db_prep_value(self, value, connection, prepared=False):
return super(OAuth2ClientSecretField, self).get_db_prep_value(encrypt_value(value), connection, prepared)
def from_db_value(self, value, expression, connection):
if value and value.startswith('$encrypted$'):
return decrypt_value(get_encryption_key('value', pk=None), value)
return value
class OrderedManyToManyDescriptor(ManyToManyDescriptor):
"""
Django doesn't seem to support:
class Meta:
ordering = [...]
...on custom through= relations for ManyToMany fields.
Meaning, queries made _through_ the intermediary table will _not_ apply an
ORDER_BY clause based on the `Meta.ordering` of the intermediary M2M class
(which is the behavior we want for "ordered" many to many relations):
https://github.com/django/django/blob/stable/1.11.x/django/db/models/fields/related_descriptors.py#L593
This descriptor automatically sorts all queries through this relation
using the `position` column on the M2M table.
"""
@cached_property
def related_manager_cls(self):
model = self.rel.related_model if self.reverse else self.rel.model
def add_custom_queryset_to_many_related_manager(many_related_manage_cls):
class OrderedManyRelatedManager(many_related_manage_cls):
def get_queryset(self):
return super(OrderedManyRelatedManager, self).get_queryset().order_by('%s__position' % self.through._meta.model_name)
def add(self, *objects):
if len(objects) > 1:
raise RuntimeError('Ordered many-to-many fields do not support multiple objects')
return super().add(*objects)
def remove(self, *objects):
if len(objects) > 1:
raise RuntimeError('Ordered many-to-many fields do not support multiple objects')
return super().remove(*objects)
return OrderedManyRelatedManager
return add_custom_queryset_to_many_related_manager(
create_forward_many_to_many_manager(
model._default_manager.__class__,
self.rel,
reverse=self.reverse,
)
)
class OrderedManyToManyField(models.ManyToManyField):
"""
A ManyToManyField that automatically sorts all querysets
by a special `position` column on the M2M table
"""
def _update_m2m_position(self, sender, instance, action, **kwargs):
if action in ('post_add', 'post_remove'):
descriptor = getattr(instance, self.name)
order_with_respect_to = descriptor.source_field_name
for i, ig in enumerate(sender.objects.filter(**{order_with_respect_to: instance.pk})):
if ig.position != i:
ig.position = i
ig.save()
def contribute_to_class(self, cls, name, **kwargs):
super(OrderedManyToManyField, self).contribute_to_class(cls, name, **kwargs)
setattr(cls, name, OrderedManyToManyDescriptor(self.remote_field, reverse=False))
through = getattr(cls, name).through
if isinstance(through, str) and "." not in through:
# support lazy loading of string model names
through = '.'.join([cls._meta.app_label, through])
m2m_changed.connect(self._update_m2m_position, sender=through)