Merge branch 'release_3.1.3' into devel

* release_3.1.3: (52 commits)
  ack fact scan messages
  making ldap user/group search fields into codemirror instances
  removing UI parsing for LDAP User and Group Search fields
  Allow exception view to accept all valid HTTP methods.
  Restore ability of parsing extra_vars string for provisioning callback.
  Fix up backup/restore role broken in f7a8e45809758322d9ee41c5305850dd70ed5faf
  Stop / start ansible-tower-service during restores
  value_to_python should encode lookup fields as ascii
  fix brace interpolation on standard out pane
  Adjust some hardcoded usages of 'awx' to use 'aw_user' and 'aw_group'.
  Pull Spanish updates from Zanata
  Temporarily grant awx user createdb role
  Stop giving ownership of backups to postgres
  don't display chunked lines'
  Add dropdown li truncation with ellipsis
  CTiT -> adhoc modules should allow the user to add new modules
  Remove task that was replacing the supervisor systemd tmp file
  Fix failing supervisorctl commands on RH-based distros
  Give ownership of the supervisor socket to awx
  Setting for external log emissions cert verification
  ...
This commit is contained in:
Matthew Jones
2017-04-28 13:57:04 -04:00
50 changed files with 3787 additions and 2938 deletions

View File

@@ -61,12 +61,15 @@ class FactBrokerWorker(ConsumerMixin):
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
except Fact.DoesNotExist:
logger.warn('Failed to intake fact. Host does not exist <hostname, inventory_id> <%s, %s>' % (hostname, inventory_id))
message.ack()
return
except Fact.MultipleObjectsReturned:
logger.warn('Database inconsistent. Multiple Hosts found for <hostname, inventory_id> <%s, %s>.' % (hostname, inventory_id))
message.ack()
return None
except Exception as e:
logger.error("Exception communicating with Fact Cache Database: %s" % str(e))
message.ack()
return None
(module_name, facts) = self.process_facts(facts_data)
@@ -84,6 +87,7 @@ class FactBrokerWorker(ConsumerMixin):
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
analytics_logger.info('Received message with fact data', extra=dict(
module_name=module_name, facts_data=facts))
message.ack()
return fact_obj

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0036_v311_insights'),
]
operations = [
migrations.AddField(
model_name='instance',
name='version',
field=models.CharField(max_length=24, blank=True),
),
]

View File

@@ -26,6 +26,7 @@ class Instance(models.Model):
hostname = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
version = models.CharField(max_length=24, blank=True)
capacity = models.PositiveIntegerField(
default=100,
editable=False,

View File

@@ -596,7 +596,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
playbook=self.playbook,
credential=self.credential.name if self.credential else None,
limit=self.limit,
extra_vars=self.extra_vars,
extra_vars=self.display_extra_vars(),
hosts=all_hosts))
return data

View File

@@ -1,5 +1,6 @@
# Python
import json
from copy import copy
# Django
from django.db import models
@@ -28,9 +29,7 @@ class ResourceMixin(models.Model):
'''
Use instead of `MyModel.objects` when you want to only consider
resources that a user has specific permissions for. For example:
MyModel.accessible_objects(user, 'read_role').filter(name__istartswith='bar');
NOTE: This should only be used for list type things. If you have a
specific resource you want to check permissions on, it is more
performant to resolve the resource in question then call
@@ -159,12 +158,13 @@ class SurveyJobTemplateMixin(models.Model):
errors.append("Value %s for '%s' expected to be a string." % (data[survey_element['variable']],
survey_element['variable']))
return errors
if 'min' in survey_element and survey_element['min'] not in ["", None] and len(data[survey_element['variable']]) < int(survey_element['min']):
errors.append("'%s' value %s is too small (length is %s must be at least %s)." %
(survey_element['variable'], data[survey_element['variable']], len(data[survey_element['variable']]), survey_element['min']))
if 'max' in survey_element and survey_element['max'] not in ["", None] and len(data[survey_element['variable']]) > int(survey_element['max']):
errors.append("'%s' value %s is too large (must be no more than %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['max']))
if not data[survey_element['variable']] == '$encrypted$' and not survey_element['type'] == 'password':
if 'min' in survey_element and survey_element['min'] not in ["", None] and len(data[survey_element['variable']]) < int(survey_element['min']):
errors.append("'%s' value %s is too small (length is %s must be at least %s)." %
(survey_element['variable'], data[survey_element['variable']], len(data[survey_element['variable']]), survey_element['min']))
if 'max' in survey_element and survey_element['max'] not in ["", None] and len(data[survey_element['variable']]) > int(survey_element['max']):
errors.append("'%s' value %s is too large (must be no more than %s)." %
(survey_element['variable'], data[survey_element['variable']], survey_element['max']))
elif survey_element['type'] == 'integer':
if survey_element['variable'] in data:
if type(data[survey_element['variable']]) != int:
@@ -196,16 +196,22 @@ class SurveyJobTemplateMixin(models.Model):
if type(data[survey_element['variable']]) != list:
errors.append("'%s' value is expected to be a list." % survey_element['variable'])
else:
choice_list = copy(survey_element['choices'])
if isinstance(choice_list, basestring):
choice_list = choice_list.split('\n')
for val in data[survey_element['variable']]:
if val not in survey_element['choices']:
if val not in choice_list:
errors.append("Value %s for '%s' expected to be one of %s." % (val, survey_element['variable'],
survey_element['choices']))
choice_list))
elif survey_element['type'] == 'multiplechoice':
choice_list = copy(survey_element['choices'])
if isinstance(choice_list, basestring):
choice_list = choice_list.split('\n')
if survey_element['variable'] in data:
if data[survey_element['variable']] not in survey_element['choices']:
if data[survey_element['variable']] not in choice_list:
errors.append("Value %s for '%s' expected to be one of %s." % (data[survey_element['variable']],
survey_element['variable'],
survey_element['choices']))
choice_list))
return errors
def survey_variable_validation(self, data):

View File

@@ -204,6 +204,12 @@ class ProjectOptions(models.Model):
break
return sorted(results, key=lambda x: smart_str(x).lower())
def get_lock_file(self):
proj_path = self.get_project_path()
if not proj_path:
return None
return proj_path + '.lock'
class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
'''

View File

@@ -21,7 +21,9 @@ import traceback
import urlparse
import uuid
from distutils.version import LooseVersion as Version
from datetime import timedelta
import yaml
import fcntl
try:
import psutil
except:
@@ -46,6 +48,7 @@ from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx import __version__ as tower_application_version
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
@@ -54,7 +57,7 @@ from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
get_system_task_capacity, OutputEventFilter, parse_yaml_or_json)
from awx.main.utils.reload import restart_local_services
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification
@@ -175,13 +178,27 @@ def purge_old_stdout_files(self):
@task(bind=True)
def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.")
nowtime = now()
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
if inst.exists():
inst = inst[0]
inst.capacity = get_system_task_capacity()
inst.version = tower_application_version
inst.save()
return
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID)
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in recent_inst:
if other_inst.version == "":
continue
if Version(other_inst.version) > Version(tower_application_version):
logger.error("Host {} reports Tower version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version,
inst.hostname,
inst.version))
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
@task(bind=True, queue='default')
@@ -1365,7 +1382,45 @@ class RunProjectUpdate(BaseTask):
logger.error('Encountered error updating project dependent inventory: {}'.format(e))
continue
def release_lock(self, instance):
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_UN)
except IOError as e:
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
os.close(self.lock_fd)
raise
os.close(self.lock_fd)
self.lock_fd = None
'''
Note: We don't support blocking=False
'''
def acquire_lock(self, instance, blocking=True):
lock_path = instance.get_lock_file()
if lock_path is None:
raise RuntimeError(u'Invalid lock file path')
try:
self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT)
except OSError as e:
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
raise
try:
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
except IOError as e:
os.close(self.lock_fd)
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
raise
def pre_run_hook(self, instance, **kwargs):
if instance.launch_type == 'sync':
self.acquire_lock(instance)
def post_run_hook(self, instance, status, **kwargs):
if instance.launch_type == 'sync':
self.release_lock(instance)
p = instance.project
dependent_inventory_sources = p.scm_inventory_sources.all()
if instance.job_type == 'check' and status not in ('failed', 'canceled',):

View File

@@ -135,13 +135,15 @@ def create_survey_spec(variables=None, default_type='integer', required=True):
argument specifying variable name(s)
'''
if isinstance(variables, list):
name = "%s survey" % variables[0]
description = "A survey that starts with %s." % variables[0]
vars_list = variables
else:
name = "%s survey" % variables
description = "A survey about %s." % variables
vars_list = [variables]
if isinstance(variables[0], basestring):
slogan = variables[0]
else:
slogan = variables[0].get('question_name', 'something')
name = "%s survey" % slogan
description = "A survey that asks about %s." % slogan
spec = []
index = 0

View File

@@ -2,6 +2,8 @@ from awx.main.models import Job, Instance
from django.test.utils import override_settings
import pytest
import json
@pytest.mark.django_db
def test_orphan_unified_job_creation(instance, inventory):
@@ -22,3 +24,15 @@ def test_job_capacity_and_with_inactive_node():
assert Instance.objects.total_capacity() == 100
with override_settings(AWX_ACTIVE_NODE_TIME=0):
assert Instance.objects.total_capacity() < 100
@pytest.mark.django_db
def test_job_notification_data(inventory):
encrypted_str = "$encrypted$"
job = Job.objects.create(
job_template=None, inventory=inventory, name='hi world',
extra_vars=json.dumps({"SSN": "123-45-6789"}),
survey_passwords={"SSN": encrypted_str}
)
notification_data = job.notification_data(block=0)
assert json.loads(notification_data['extra_vars'])['SSN'] == encrypted_str

View File

@@ -91,6 +91,40 @@ def test_update_kwargs_survey_invalid_default(survey_spec_factory):
assert json.loads(defaulted_extra_vars['extra_vars'])['var2'] == 2
@pytest.mark.parametrize("question_type,default,expect_use,expect_value", [
("multiplechoice", "", False, 'N/A'), # historical bug
("multiplechoice", "zeb", False, 'N/A'), # zeb not in choices
("multiplechoice", "coffee", True, 'coffee'),
("multiselect", None, False, 'N/A'), # NOTE: Behavior is arguable, value of [] may be prefered
("multiselect", "", False, 'N/A'),
("multiselect", ["zeb"], False, 'N/A'),
("multiselect", ["milk"], True, ["milk"]),
("multiselect", ["orange\nmilk"], False, 'N/A'), # historical bug
])
def test_optional_survey_question_defaults(
survey_spec_factory, question_type, default, expect_use, expect_value):
spec = survey_spec_factory([
{
"required": False,
"default": default,
"choices": "orange\nmilk\nchocolate\ncoffee",
"variable": "c",
"type": question_type
},
])
jt = JobTemplate(name="test-jt", survey_spec=spec, survey_enabled=True)
defaulted_extra_vars = jt._update_unified_job_kwargs()
element = spec['spec'][0]
if expect_use:
assert jt._survey_element_validation(element, {element['variable']: element['default']}) == []
else:
assert jt._survey_element_validation(element, {element['variable']: element['default']})
if expect_use:
assert json.loads(defaulted_extra_vars['extra_vars'])['c'] == expect_value
else:
assert 'c' not in defaulted_extra_vars['extra_vars']
class TestWorkflowSurveys:
def test_update_kwargs_survey_defaults(self, survey_spec_factory):
"Assure that the survey default over-rides a JT variable"

View File

@@ -5,9 +5,11 @@ import ConfigParser
import json
import tempfile
import os
import fcntl
import pytest
import yaml
import mock
import yaml
from awx.main.models import (
Credential,
@@ -1067,3 +1069,62 @@ class TestInventoryUpdateCredentials(TestJobExecution):
self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect)
self.task.run(self.pk)
def test_os_open_oserror():
with pytest.raises(OSError):
os.open('this_file_does_not_exist', os.O_RDONLY)
def test_fcntl_ioerror():
with pytest.raises(IOError):
fcntl.flock(99999, fcntl.LOCK_EX)
@mock.patch('os.open')
@mock.patch('logging.getLogger')
def test_aquire_lock_open_fail_logged(logging_getLogger, os_open):
err = OSError()
err.errno = 3
err.strerror = 'dummy message'
instance = mock.Mock()
instance.get_lock_file.return_value = 'this_file_does_not_exist'
os_open.side_effect = err
logger = mock.Mock()
logging_getLogger.return_value = logger
ProjectUpdate = tasks.RunProjectUpdate()
with pytest.raises(OSError, errno=3, strerror='dummy message'):
ProjectUpdate.acquire_lock(instance)
assert logger.err.called_with("I/O error({0}) while trying to open lock file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message'))
@mock.patch('os.open')
@mock.patch('os.close')
@mock.patch('logging.getLogger')
@mock.patch('fcntl.flock')
def test_aquire_lock_acquisition_fail_logged(fcntl_flock, logging_getLogger, os_close, os_open):
err = IOError()
err.errno = 3
err.strerror = 'dummy message'
instance = mock.Mock()
instance.get_lock_file.return_value = 'this_file_does_not_exist'
os_open.return_value = 3
logger = mock.Mock()
logging_getLogger.return_value = logger
fcntl_flock.side_effect = err
ProjectUpdate = tasks.RunProjectUpdate()
with pytest.raises(IOError, errno=3, strerror='dummy message'):
ProjectUpdate.acquire_lock(instance)
os_close.assert_called_with(3)
assert logger.err.called_with("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message'))

View File

@@ -0,0 +1,37 @@
import pytest
import mock
# Django REST Framework
from rest_framework import exceptions
# AWX
from awx.main.views import ApiErrorView
HTTP_METHOD_NAMES = [
'get',
'post',
'put',
'patch',
'delete',
'head',
'options',
'trace',
]
@pytest.fixture
def api_view_obj_fixture():
return ApiErrorView()
@pytest.mark.parametrize('method_name', HTTP_METHOD_NAMES)
def test_exception_view_allow_http_methods(method_name):
assert hasattr(ApiErrorView, method_name)
@pytest.mark.parametrize('method_name', HTTP_METHOD_NAMES)
def test_exception_view_raises_exception(api_view_obj_fixture, method_name):
request_mock = mock.MagicMock()
with pytest.raises(exceptions.APIException):
getattr(api_view_obj_fixture, method_name)(request_mock)

View File

@@ -2,6 +2,7 @@
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
import pytest
from awx.conf.models import Setting
from awx.main.utils import common
@@ -52,3 +53,13 @@ def test_encrypt_field_with_ask():
def test_encrypt_field_with_empty_value():
encrypted = common.encrypt_field(Setting(value=None), 'value')
assert encrypted is None
@pytest.mark.parametrize('input_, output', [
({"foo": "bar"}, {"foo": "bar"}),
('{"foo": "bar"}', {"foo": "bar"}),
('---\nfoo: bar', {"foo": "bar"}),
(4399, {}),
])
def test_parse_yaml_or_json(input_, output):
assert common.parse_yaml_or_json(input_) == output

View File

@@ -3,10 +3,15 @@ from awx.main.utils import reload
def test_produce_supervisor_command(mocker):
with mocker.patch.object(reload.subprocess, 'Popen'):
reload._supervisor_service_restart(['beat', 'callback', 'fact'])
communicate_mock = mocker.MagicMock(return_value=('Everything is fine', ''))
mock_process = mocker.MagicMock()
mock_process.communicate = communicate_mock
Popen_mock = mocker.MagicMock(return_value=mock_process)
with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock):
reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart")
reload.subprocess.Popen.assert_called_once_with(
['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'])
['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'],
stderr=-1, stdin=-1, stdout=-1)
def test_routing_of_service_restarts_works(mocker):
@@ -14,13 +19,13 @@ def test_routing_of_service_restarts_works(mocker):
This tests that the parent restart method will call the appropriate
service restart methods, depending on which services are given in args
'''
with mocker.patch.object(reload, '_uwsgi_reload'),\
with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
mocker.patch.object(reload, '_reset_celery_thread_pool'),\
mocker.patch.object(reload, '_supervisor_service_restart'):
mocker.patch.object(reload, '_supervisor_service_command'):
reload.restart_local_services(['uwsgi', 'celery', 'flower', 'daphne'])
reload._uwsgi_reload.assert_called_once_with()
reload._uwsgi_fifo_command.assert_called_once_with(uwsgi_command="c")
reload._reset_celery_thread_pool.assert_called_once_with()
reload._supervisor_service_restart.assert_called_once_with(['flower', 'daphne'])
reload._supervisor_service_command.assert_called_once_with(['flower', 'daphne'], command="restart")
@@ -28,11 +33,11 @@ def test_routing_of_service_restarts_diables(mocker):
'''
Test that methods are not called if not in the args
'''
with mocker.patch.object(reload, '_uwsgi_reload'),\
with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
mocker.patch.object(reload, '_reset_celery_thread_pool'),\
mocker.patch.object(reload, '_supervisor_service_restart'):
mocker.patch.object(reload, '_supervisor_service_command'):
reload.restart_local_services(['flower'])
reload._uwsgi_reload.assert_not_called()
reload._uwsgi_fifo_command.assert_not_called()
reload._reset_celery_thread_pool.assert_not_called()
reload._supervisor_service_restart.assert_called_once_with(['flower'])
reload._supervisor_service_command.assert_called_once_with(['flower'], command="restart")

View File

@@ -854,8 +854,8 @@ class OutputEventFilter(object):
def callback_filter_out_ansible_extra_vars(extra_vars):
extra_vars_redacted = {}
extra_vars = parse_yaml_or_json(extra_vars)
for key, value in extra_vars.iteritems():
if not key.startswith('ansible_'):
extra_vars_redacted[key] = value
return extra_vars_redacted

View File

@@ -44,6 +44,7 @@ PARAM_NAMES = {
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
'verify_cert': 'LOG_AGGREGATOR_VERIFY_CERT'
}
@@ -242,8 +243,11 @@ class BaseHTTPSHandler(BaseHandler):
payload_str = json.dumps(payload_input)
else:
payload_str = payload_input
return dict(data=payload_str, background_callback=unused_callback,
timeout=self.tcp_timeout)
kwargs = dict(data=payload_str, background_callback=unused_callback,
timeout=self.tcp_timeout)
if self.verify_cert is False:
kwargs['verify'] = False
return kwargs
def _send(self, payload):

View File

@@ -14,12 +14,12 @@ from celery import current_app
logger = logging.getLogger('awx.main.utils.reload')
def _uwsgi_reload():
def _uwsgi_fifo_command(uwsgi_command):
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
logger.warn('Initiating uWSGI chain reload of server')
TRIGGER_CHAIN_RELOAD = 'c'
TRIGGER_COMMAND = uwsgi_command
with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo:
awxfifo.write(TRIGGER_CHAIN_RELOAD)
awxfifo.write(TRIGGER_COMMAND)
def _reset_celery_thread_pool():
@@ -29,7 +29,7 @@ def _reset_celery_thread_pool():
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
def _supervisor_service_restart(service_internal_names):
def _supervisor_service_command(service_internal_names, command):
'''
Service internal name options:
- beat - celery - callback - channels - uwsgi - daphne
@@ -46,23 +46,35 @@ def _supervisor_service_restart(service_internal_names):
for n in service_internal_names:
if n in name_translation_dict:
programs.append('{}:{}'.format(group_name, name_translation_dict[n]))
args.extend(['restart'])
args.extend([command])
args.extend(programs)
logger.debug('Issuing command to restart services, args={}'.format(args))
subprocess.Popen(args)
supervisor_process = subprocess.Popen(args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
restart_stdout, restart_err = supervisor_process.communicate()
restart_code = supervisor_process.returncode
if restart_code or restart_err:
logger.error('supervisorctl restart errored with exit code `{}`, stdout:\n{}stderr:\n{}'.format(
restart_code, restart_stdout.strip(), restart_err.strip()))
else:
logger.info('supervisorctl restart finished, stdout:\n{}'.format(restart_stdout.strip()))
def restart_local_services(service_internal_names):
logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names))
if 'uwsgi' in service_internal_names:
_uwsgi_reload()
_uwsgi_fifo_command(uwsgi_command='c')
service_internal_names.remove('uwsgi')
restart_celery = False
if 'celery' in service_internal_names:
restart_celery = True
service_internal_names.remove('celery')
_supervisor_service_restart(service_internal_names)
_supervisor_service_command(service_internal_names, command='restart')
if restart_celery:
# Celery restarted last because this probably includes current process
_reset_celery_thread_pool()
def stop_local_services(service_internal_names):
logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names))
_supervisor_service_command(service_internal_names, command='stop')

View File

@@ -10,20 +10,32 @@ from django.utils.translation import ugettext_lazy as _
from rest_framework import exceptions, permissions, views
def _force_raising_exception(view_obj, request, format=None):
raise view_obj.exception_class()
class ApiErrorView(views.APIView):
authentication_classes = []
permission_classes = (permissions.AllowAny,)
metadata_class = None
allowed_methods = ('GET', 'HEAD')
exception_class = exceptions.APIException
view_name = _('API Error')
def get_view_name(self):
return self.view_name
def get(self, request, format=None):
raise self.exception_class()
def finalize_response(self, request, response, *args, **kwargs):
response = super(ApiErrorView, self).finalize_response(request, response, *args, **kwargs)
try:
del response['Allow']
except Exception:
pass
return response
for method_name in ApiErrorView.http_method_names:
setattr(ApiErrorView, method_name, _force_raising_exception)
def handle_error(request, status=404, **kwargs):