mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 01:57:35 -03:30
hostname validation in InstanceSerializer (#12979)
* initial commit of hostname validation to InstanceSerializer Co-authored-by: Cesar Francisco San Nicolas Martinez <cesarfsannicolasmartinez@gmail.com>
This commit is contained in:
parent
03eaeac459
commit
385a2eabce
@ -29,6 +29,7 @@ from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.text import capfirst
|
||||
from django.utils.timezone import now
|
||||
from django.core.validators import RegexValidator, MaxLengthValidator
|
||||
|
||||
# Django REST Framework
|
||||
from rest_framework.exceptions import ValidationError, PermissionDenied
|
||||
@ -120,6 +121,9 @@ from awx.main.validators import vars_validate_or_raise
|
||||
from awx.api.versioning import reverse
|
||||
from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, VerbatimField, DeprecatedCredentialField
|
||||
|
||||
# AWX Utils
|
||||
from awx.api.validators import HostnameRegexValidator
|
||||
|
||||
logger = logging.getLogger('awx.api.serializers')
|
||||
|
||||
# Fields that should be summarized regardless of object type.
|
||||
@ -4921,6 +4925,18 @@ class InstanceSerializer(BaseSerializer):
|
||||
extra_kwargs = {
|
||||
'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION},
|
||||
'node_state': {'initial': Instance.States.INSTALLED, 'default': Instance.States.INSTALLED},
|
||||
'hostname': {
|
||||
'validators': [
|
||||
MaxLengthValidator(limit_value=255),
|
||||
RegexValidator(
|
||||
regex='^localhost$|^127(?:\.[0-9]+){0,2}\.[0-9]+$|^(?:0*\:)*?:?0*1$',
|
||||
flags=re.IGNORECASE,
|
||||
inverse_match=True,
|
||||
message="hostname cannot be localhost or 127.0.0.1",
|
||||
),
|
||||
HostnameRegexValidator(),
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
def get_related(self, obj):
|
||||
@ -4991,6 +5007,10 @@ class InstanceSerializer(BaseSerializer):
|
||||
return value
|
||||
|
||||
def validate_hostname(self, value):
|
||||
"""
|
||||
- Hostname cannot be "localhost" - but can be something like localhost.domain
|
||||
- Cannot change the hostname of an-already instantiated & initialized Instance object
|
||||
"""
|
||||
if self.instance and self.instance.hostname != value:
|
||||
raise serializers.ValidationError("Cannot change hostname.")
|
||||
|
||||
|
||||
55
awx/api/validators.py
Normal file
55
awx/api/validators.py
Normal file
@ -0,0 +1,55 @@
|
||||
import re
|
||||
|
||||
from django.core.validators import RegexValidator, validate_ipv46_address
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
|
||||
class HostnameRegexValidator(RegexValidator):
|
||||
"""
|
||||
Fully validates a domain name that is compliant with norms in Linux/RHEL
|
||||
- Cannot start with a hyphen
|
||||
- Cannot begin with, or end with a "."
|
||||
- Cannot contain any whitespaces
|
||||
- Entire hostname is max 255 chars (including dots)
|
||||
- Each domain/label is between 1 and 63 characters, except top level domain, which must be at least 2 characters
|
||||
- Supports ipv4, ipv6, simple hostnames and FQDNs
|
||||
- Follows RFC 9210 (modern RFC 1123, 1178) requirements
|
||||
|
||||
Accepts an IP Address or Hostname as the argument
|
||||
"""
|
||||
|
||||
regex = '^[a-z0-9][-a-z0-9]*$|^([a-z0-9][-a-z0-9]{0,62}[.])*[a-z0-9][-a-z0-9]{1,62}$'
|
||||
flags = re.IGNORECASE
|
||||
|
||||
def __call__(self, value):
|
||||
regex_matches, err = self.__validate(value)
|
||||
invalid_input = regex_matches if self.inverse_match else not regex_matches
|
||||
if invalid_input:
|
||||
if err is None:
|
||||
err = ValidationError(self.message, code=self.code, params={"value": value})
|
||||
raise err
|
||||
|
||||
def __str__(self):
|
||||
return f"regex={self.regex}, message={self.message}, code={self.code}, inverse_match={self.inverse_match}, flags={self.flags}"
|
||||
|
||||
def __validate(self, value):
|
||||
|
||||
if ' ' in value:
|
||||
return False, ValidationError("whitespaces in hostnames are illegal")
|
||||
|
||||
"""
|
||||
If we have an IP address, try and validate it.
|
||||
"""
|
||||
try:
|
||||
validate_ipv46_address(value)
|
||||
return True, None
|
||||
except ValidationError:
|
||||
pass
|
||||
|
||||
"""
|
||||
By this point in the code, we probably have a simple hostname, FQDN or a strange hostname like "192.localhost.domain.101"
|
||||
"""
|
||||
if not self.regex.match(value):
|
||||
return False, ValidationError(f"illegal characters detected in hostname={value}. Please verify.")
|
||||
|
||||
return True, None
|
||||
@ -4,6 +4,8 @@ from awx.api.versioning import reverse
|
||||
from awx.main.models.activity_stream import ActivityStream
|
||||
from awx.main.models.ha import Instance
|
||||
|
||||
from django.test.utils import override_settings
|
||||
|
||||
|
||||
INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42)
|
||||
|
||||
@ -54,3 +56,33 @@ def test_health_check_usage(get, post, admin_user):
|
||||
get(url=url, user=admin_user, expect=200)
|
||||
r = post(url=url, user=admin_user, expect=200)
|
||||
assert r.data['msg'] == f"Health check is running for {instance.hostname}."
|
||||
|
||||
|
||||
def test_custom_hostname_regex(post, admin_user):
|
||||
url = reverse('api:instance_list')
|
||||
with override_settings(IS_K8S=True):
|
||||
for value in [
|
||||
("foo.bar.baz", 201),
|
||||
("f.bar.bz", 201),
|
||||
("foo.bar.b", 400),
|
||||
("a.b.c", 400),
|
||||
("localhost", 400),
|
||||
("127.0.0.1", 400),
|
||||
("192.168.56.101", 201),
|
||||
("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 201),
|
||||
("foobar", 201),
|
||||
("--yoooo", 400),
|
||||
("$3$@foobar@#($!@#*$", 400),
|
||||
("999.999.999.999", 201),
|
||||
("0000:0000:0000:0000:0000:0000:0000:0001", 400),
|
||||
("whitespaces are bad for hostnames", 400),
|
||||
("0:0:0:0:0:0:0:1", 400),
|
||||
("192.localhost.domain.101", 201),
|
||||
("F@$%(@#$H%^(I@#^HCTQEWRFG", 400),
|
||||
]:
|
||||
data = {
|
||||
"hostname": value[0],
|
||||
"node_type": "execution",
|
||||
"node_state": "installed",
|
||||
}
|
||||
post(url=url, user=admin_user, data=data, expect=value[1])
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
# Copyright (c) 2017 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
import os
|
||||
import re
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
import json
|
||||
@ -12,9 +13,13 @@ from unittest import mock
|
||||
from rest_framework.exceptions import ParseError
|
||||
|
||||
from awx.main.utils import common
|
||||
from awx.api.validators import HostnameRegexValidator
|
||||
|
||||
from awx.main.models import Job, AdHocCommand, InventoryUpdate, ProjectUpdate, SystemJob, WorkflowJob, Inventory, JobTemplate, UnifiedJobTemplate, UnifiedJob
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.regex_helper import _lazy_re_compile
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'input_, output',
|
||||
@ -275,3 +280,55 @@ def test_update_scm_url(scm_type, url, username, password, check_special_cases,
|
||||
assert str(excinfo.value) == str(expected)
|
||||
else:
|
||||
assert common.update_scm_url(scm_type, url, username, password, check_special_cases, scp_format) == expected
|
||||
|
||||
|
||||
class TestHostnameRegexValidator:
|
||||
@pytest.fixture
|
||||
def regex_expr(self):
|
||||
return '^[a-z0-9][-a-z0-9]*$|^([a-z0-9][-a-z0-9]{0,62}[.])*[a-z0-9][-a-z0-9]{1,62}$'
|
||||
|
||||
@pytest.fixture
|
||||
def re_flags(self):
|
||||
return re.IGNORECASE
|
||||
|
||||
@pytest.fixture
|
||||
def custom_err_message(self):
|
||||
return "foobar"
|
||||
|
||||
def test_hostame_regex_validator_constructor_with_args(self, regex_expr, re_flags, custom_err_message):
|
||||
h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, message=custom_err_message)
|
||||
assert h.regex == _lazy_re_compile(regex_expr, re_flags)
|
||||
assert h.message == 'foobar'
|
||||
assert h.code == 'invalid'
|
||||
assert h.inverse_match == False
|
||||
assert h.flags == re_flags
|
||||
|
||||
def test_hostame_regex_validator_default_constructor(self, regex_expr, re_flags):
|
||||
h = HostnameRegexValidator()
|
||||
assert h.regex == _lazy_re_compile(regex_expr, re_flags)
|
||||
assert h.message == 'Enter a valid value.'
|
||||
assert h.code == 'invalid'
|
||||
assert h.inverse_match == False
|
||||
assert h.flags == re_flags
|
||||
|
||||
def test_good_call(self, regex_expr, re_flags):
|
||||
h = HostnameRegexValidator(regex=regex_expr, flags=re_flags)
|
||||
assert (h("192.168.56.101"), None)
|
||||
|
||||
def test_bad_call(self, regex_expr, re_flags):
|
||||
h = HostnameRegexValidator(regex=regex_expr, flags=re_flags)
|
||||
try:
|
||||
h("@#$%)$#(TUFAS_DG")
|
||||
except ValidationError as e:
|
||||
assert e.message is not None
|
||||
|
||||
def test_good_call_with_inverse(self, regex_expr, re_flags, inverse_match=True):
|
||||
h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, inverse_match=inverse_match)
|
||||
try:
|
||||
h("1.2.3.4")
|
||||
except ValidationError as e:
|
||||
assert e.message is not None
|
||||
|
||||
def test_bad_call_with_inverse(self, regex_expr, re_flags, inverse_match=True):
|
||||
h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, inverse_match=inverse_match)
|
||||
assert (h("@#$%)$#(TUFAS_DG"), None)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user