awx/awx/conf/views.py
Matthew Jones 8505783350
Merge remote-tracking branch 'tower/release_3.2.3' into devel
* tower/release_3.2.3:
  fix unicode bugs with log statements
  use --export option for ansible-inventory
  add support for new "BECOME" prompt in Ansible 2.5+ for adhoc commands
  enforce strings for secret password inputs on Credentials
  fix a bug for "users should be able to change type of unused credential"
  fix xss vulnerabilities - on host recent jobs popover - on schedule name tooltip
  fix a bug when testing UDP-based logging configuration
  bump templates form credential_types page limit
  Wait for Slack RTM API websocket connection to be established
  don't process artifacts from custom `set_stat` calls asynchronously
  don't overwrite env['ANSIBLE_LIBRARY'] when fact caching is enabled
  only allow facts to cache in the proper file system location
  replace our memcached-based fact cache implementation with local files
  add support for new "BECOME" prompt in Ansible 2.5+
  fix a bug in inventory generation for isolated nodes
  properly handle unicode for isolated job buffers
2018-02-20 12:22:25 -05:00

221 lines
9.3 KiB
Python

# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
# Python
import collections
import sys
# Django
from django.conf import settings
from django.http import Http404
from django.utils.translation import ugettext_lazy as _
# Django REST Framework
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.response import Response
from rest_framework import serializers
from rest_framework import status
# Tower
from awx.api.generics import * # noqa
from awx.api.permissions import IsSuperUser
from awx.api.versioning import reverse, get_request_version
from awx.main.utils import * # noqa
from awx.main.utils.handlers import BaseHTTPSHandler, UDPHandler, LoggingConnectivityException
from awx.main.tasks import handle_setting_changes
from awx.conf.license import get_licensed_features
from awx.conf.models import Setting
from awx.conf.serializers import SettingCategorySerializer, SettingSingletonSerializer
from awx.conf import settings_registry
SettingCategory = collections.namedtuple('SettingCategory', ('url', 'slug', 'name'))
VERSION_SPECIFIC_CATEGORIES_TO_EXCLUDE = {
1: set([
'named-url',
]),
2: set([]),
}
class SettingCategoryList(ListAPIView):
model = Setting # Not exactly, but needed for the view.
serializer_class = SettingCategorySerializer
filter_backends = []
view_name = _('Setting Categories')
def get_queryset(self):
setting_categories = []
categories = settings_registry.get_registered_categories(features_enabled=get_licensed_features())
if self.request.user.is_superuser or self.request.user.is_system_auditor:
pass # categories = categories
elif 'user' in categories:
categories = {'user', _('User')}
else:
categories = {}
for category_slug in sorted(categories.keys()):
if category_slug in VERSION_SPECIFIC_CATEGORIES_TO_EXCLUDE[get_request_version(self.request)]:
continue
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': category_slug}, request=self.request)
setting_categories.append(SettingCategory(url, category_slug, categories[category_slug]))
return setting_categories
class SettingSingletonDetail(RetrieveUpdateDestroyAPIView):
model = Setting # Not exactly, but needed for the view.
serializer_class = SettingSingletonSerializer
filter_backends = []
view_name = _('Setting Detail')
def get_queryset(self):
self.category_slug = self.kwargs.get('category_slug', 'all')
all_category_slugs = settings_registry.get_registered_categories(features_enabled=get_licensed_features()).keys()
for slug_to_delete in VERSION_SPECIFIC_CATEGORIES_TO_EXCLUDE[get_request_version(self.request)]:
all_category_slugs.remove(slug_to_delete)
if self.request.user.is_superuser or getattr(self.request.user, 'is_system_auditor', False):
category_slugs = all_category_slugs
else:
category_slugs = {'user'}
if self.category_slug not in all_category_slugs:
raise Http404
if self.category_slug not in category_slugs:
raise PermissionDenied()
registered_settings = settings_registry.get_registered_settings(
category_slug=self.category_slug, read_only=False, features_enabled=get_licensed_features(),
slugs_to_ignore=VERSION_SPECIFIC_CATEGORIES_TO_EXCLUDE[get_request_version(self.request)]
)
if self.category_slug == 'user':
return Setting.objects.filter(key__in=registered_settings, user=self.request.user)
else:
return Setting.objects.filter(key__in=registered_settings, user__isnull=True)
def get_object(self):
settings_qs = self.get_queryset()
registered_settings = settings_registry.get_registered_settings(
category_slug=self.category_slug, features_enabled=get_licensed_features(),
slugs_to_ignore=VERSION_SPECIFIC_CATEGORIES_TO_EXCLUDE[get_request_version(self.request)]
)
all_settings = {}
for setting in settings_qs:
all_settings[setting.key] = setting.value
for key in registered_settings:
if key in all_settings or self.category_slug == 'changed':
continue
try:
field = settings_registry.get_setting_field(key, for_user=bool(self.category_slug == 'user'))
all_settings[key] = field.get_default()
except serializers.SkipField:
all_settings[key] = None
all_settings['user'] = self.request.user if self.category_slug == 'user' else None
obj = type('Settings', (object,), all_settings)()
self.check_object_permissions(self.request, obj)
return obj
def perform_update(self, serializer):
settings_qs = self.get_queryset()
user = self.request.user if self.category_slug == 'user' else None
settings_change_list = []
for key, value in serializer.validated_data.items():
if key == 'LICENSE' or settings_registry.is_setting_read_only(key):
continue
if settings_registry.is_setting_encrypted(key) and \
isinstance(value, basestring) and \
value.startswith('$encrypted$'):
continue
setattr(serializer.instance, key, value)
setting = settings_qs.filter(key=key).order_by('pk').first()
if not setting:
setting = Setting.objects.create(key=key, user=user, value=value)
settings_change_list.append(key)
elif setting.value != value:
setting.value = value
setting.save(update_fields=['value'])
settings_change_list.append(key)
if settings_change_list and 'migrate_to_database_settings' not in sys.argv:
handle_setting_changes.delay(settings_change_list)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
self.perform_destroy(instance)
return Response(status=status.HTTP_204_NO_CONTENT)
def perform_destroy(self, instance):
settings_change_list = []
for setting in self.get_queryset().exclude(key='LICENSE'):
if settings_registry.get_setting_field(setting.key).read_only:
continue
setting.delete()
settings_change_list.append(setting.key)
if settings_change_list and 'migrate_to_database_settings' not in sys.argv:
handle_setting_changes.delay(settings_change_list)
# When TOWER_URL_BASE is deleted from the API, reset it to the hostname
# used to make the request as a default.
if hasattr(instance, 'TOWER_URL_BASE'):
url = '{}://{}'.format(self.request.scheme, self.request.get_host())
if settings.TOWER_URL_BASE != url:
settings.TOWER_URL_BASE = url
class SettingLoggingTest(GenericAPIView):
view_name = _('Logging Connectivity Test')
model = Setting
serializer_class = SettingSingletonSerializer
permission_classes = (IsSuperUser,)
filter_backends = []
def post(self, request, *args, **kwargs):
defaults = dict()
for key in settings_registry.get_registered_settings(category_slug='logging'):
try:
defaults[key] = settings_registry.get_setting_field(key).get_default()
except serializers.SkipField:
defaults[key] = None
obj = type('Settings', (object,), defaults)()
serializer = self.get_serializer(obj, data=request.data)
serializer.is_valid(raise_exception=True)
# Special validation specific to logging test.
errors = {}
for key in ['LOG_AGGREGATOR_TYPE', 'LOG_AGGREGATOR_HOST']:
if not request.data.get(key, ''):
errors[key] = 'This field is required.'
if errors:
raise ValidationError(errors)
if request.data.get('LOG_AGGREGATOR_PASSWORD', '').startswith('$encrypted$'):
serializer.validated_data['LOG_AGGREGATOR_PASSWORD'] = getattr(
settings, 'LOG_AGGREGATOR_PASSWORD', ''
)
try:
class MockSettings:
pass
mock_settings = MockSettings()
for k, v in serializer.validated_data.items():
setattr(mock_settings, k, v)
mock_settings.LOG_AGGREGATOR_LEVEL = 'DEBUG'
if mock_settings.LOG_AGGREGATOR_PROTOCOL.upper() == 'UDP':
UDPHandler.perform_test(mock_settings)
return Response(status=status.HTTP_201_CREATED)
else:
BaseHTTPSHandler.perform_test(mock_settings)
except LoggingConnectivityException as e:
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(status=status.HTTP_200_OK)
# Create view functions for all of the class-based views to simplify inclusion
# in URL patterns and reverse URL lookups, converting CamelCase names to
# lowercase_with_underscore (e.g. MyView.as_view() becomes my_view).
this_module = sys.modules[__name__]
for attr, value in locals().items():
if isinstance(value, type) and issubclass(value, APIView):
name = camelcase_to_underscore(attr)
view = value.as_view()
setattr(this_module, name, view)