From 385a2eabcecb4b618c6ce28a338d2fe502c93892 Mon Sep 17 00:00:00 2001 From: Sarabraj Singh Date: Wed, 5 Oct 2022 13:50:06 -0400 Subject: [PATCH] hostname validation in InstanceSerializer (#12979) * initial commit of hostname validation to InstanceSerializer Co-authored-by: Cesar Francisco San Nicolas Martinez --- awx/api/serializers.py | 20 +++++++ awx/api/validators.py | 55 ++++++++++++++++++ .../tests/functional/api/test_instance.py | 32 +++++++++++ awx/main/tests/unit/utils/test_common.py | 57 +++++++++++++++++++ 4 files changed, 164 insertions(+) create mode 100644 awx/api/validators.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 91474643db..bd1f486dd8 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -29,6 +29,7 @@ from django.utils.translation import gettext_lazy as _ from django.utils.encoding import force_str from django.utils.text import capfirst from django.utils.timezone import now +from django.core.validators import RegexValidator, MaxLengthValidator # Django REST Framework from rest_framework.exceptions import ValidationError, PermissionDenied @@ -120,6 +121,9 @@ from awx.main.validators import vars_validate_or_raise from awx.api.versioning import reverse from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, VerbatimField, DeprecatedCredentialField +# AWX Utils +from awx.api.validators import HostnameRegexValidator + logger = logging.getLogger('awx.api.serializers') # Fields that should be summarized regardless of object type. @@ -4921,6 +4925,18 @@ class InstanceSerializer(BaseSerializer): extra_kwargs = { 'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION}, 'node_state': {'initial': Instance.States.INSTALLED, 'default': Instance.States.INSTALLED}, + 'hostname': { + 'validators': [ + MaxLengthValidator(limit_value=255), + RegexValidator( + regex='^localhost$|^127(?:\.[0-9]+){0,2}\.[0-9]+$|^(?:0*\:)*?:?0*1$', + flags=re.IGNORECASE, + inverse_match=True, + message="hostname cannot be localhost or 127.0.0.1", + ), + HostnameRegexValidator(), + ], + }, } def get_related(self, obj): @@ -4991,6 +5007,10 @@ class InstanceSerializer(BaseSerializer): return value def validate_hostname(self, value): + """ + - Hostname cannot be "localhost" - but can be something like localhost.domain + - Cannot change the hostname of an-already instantiated & initialized Instance object + """ if self.instance and self.instance.hostname != value: raise serializers.ValidationError("Cannot change hostname.") diff --git a/awx/api/validators.py b/awx/api/validators.py new file mode 100644 index 0000000000..7f7f6cd25f --- /dev/null +++ b/awx/api/validators.py @@ -0,0 +1,55 @@ +import re + +from django.core.validators import RegexValidator, validate_ipv46_address +from django.core.exceptions import ValidationError + + +class HostnameRegexValidator(RegexValidator): + """ + Fully validates a domain name that is compliant with norms in Linux/RHEL + - Cannot start with a hyphen + - Cannot begin with, or end with a "." + - Cannot contain any whitespaces + - Entire hostname is max 255 chars (including dots) + - Each domain/label is between 1 and 63 characters, except top level domain, which must be at least 2 characters + - Supports ipv4, ipv6, simple hostnames and FQDNs + - Follows RFC 9210 (modern RFC 1123, 1178) requirements + + Accepts an IP Address or Hostname as the argument + """ + + regex = '^[a-z0-9][-a-z0-9]*$|^([a-z0-9][-a-z0-9]{0,62}[.])*[a-z0-9][-a-z0-9]{1,62}$' + flags = re.IGNORECASE + + def __call__(self, value): + regex_matches, err = self.__validate(value) + invalid_input = regex_matches if self.inverse_match else not regex_matches + if invalid_input: + if err is None: + err = ValidationError(self.message, code=self.code, params={"value": value}) + raise err + + def __str__(self): + return f"regex={self.regex}, message={self.message}, code={self.code}, inverse_match={self.inverse_match}, flags={self.flags}" + + def __validate(self, value): + + if ' ' in value: + return False, ValidationError("whitespaces in hostnames are illegal") + + """ + If we have an IP address, try and validate it. + """ + try: + validate_ipv46_address(value) + return True, None + except ValidationError: + pass + + """ + By this point in the code, we probably have a simple hostname, FQDN or a strange hostname like "192.localhost.domain.101" + """ + if not self.regex.match(value): + return False, ValidationError(f"illegal characters detected in hostname={value}. Please verify.") + + return True, None diff --git a/awx/main/tests/functional/api/test_instance.py b/awx/main/tests/functional/api/test_instance.py index ec569d945f..25ecb78ded 100644 --- a/awx/main/tests/functional/api/test_instance.py +++ b/awx/main/tests/functional/api/test_instance.py @@ -4,6 +4,8 @@ from awx.api.versioning import reverse from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance +from django.test.utils import override_settings + INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42) @@ -54,3 +56,33 @@ def test_health_check_usage(get, post, admin_user): get(url=url, user=admin_user, expect=200) r = post(url=url, user=admin_user, expect=200) assert r.data['msg'] == f"Health check is running for {instance.hostname}." + + +def test_custom_hostname_regex(post, admin_user): + url = reverse('api:instance_list') + with override_settings(IS_K8S=True): + for value in [ + ("foo.bar.baz", 201), + ("f.bar.bz", 201), + ("foo.bar.b", 400), + ("a.b.c", 400), + ("localhost", 400), + ("127.0.0.1", 400), + ("192.168.56.101", 201), + ("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 201), + ("foobar", 201), + ("--yoooo", 400), + ("$3$@foobar@#($!@#*$", 400), + ("999.999.999.999", 201), + ("0000:0000:0000:0000:0000:0000:0000:0001", 400), + ("whitespaces are bad for hostnames", 400), + ("0:0:0:0:0:0:0:1", 400), + ("192.localhost.domain.101", 201), + ("F@$%(@#$H%^(I@#^HCTQEWRFG", 400), + ]: + data = { + "hostname": value[0], + "node_type": "execution", + "node_state": "installed", + } + post(url=url, user=admin_user, data=data, expect=value[1]) diff --git a/awx/main/tests/unit/utils/test_common.py b/awx/main/tests/unit/utils/test_common.py index 8d14d7cf64..6e1d677363 100644 --- a/awx/main/tests/unit/utils/test_common.py +++ b/awx/main/tests/unit/utils/test_common.py @@ -3,6 +3,7 @@ # Copyright (c) 2017 Ansible, Inc. # All Rights Reserved. import os +import re import pytest from uuid import uuid4 import json @@ -12,9 +13,13 @@ from unittest import mock from rest_framework.exceptions import ParseError from awx.main.utils import common +from awx.api.validators import HostnameRegexValidator from awx.main.models import Job, AdHocCommand, InventoryUpdate, ProjectUpdate, SystemJob, WorkflowJob, Inventory, JobTemplate, UnifiedJobTemplate, UnifiedJob +from django.core.exceptions import ValidationError +from django.utils.regex_helper import _lazy_re_compile + @pytest.mark.parametrize( 'input_, output', @@ -275,3 +280,55 @@ def test_update_scm_url(scm_type, url, username, password, check_special_cases, assert str(excinfo.value) == str(expected) else: assert common.update_scm_url(scm_type, url, username, password, check_special_cases, scp_format) == expected + + +class TestHostnameRegexValidator: + @pytest.fixture + def regex_expr(self): + return '^[a-z0-9][-a-z0-9]*$|^([a-z0-9][-a-z0-9]{0,62}[.])*[a-z0-9][-a-z0-9]{1,62}$' + + @pytest.fixture + def re_flags(self): + return re.IGNORECASE + + @pytest.fixture + def custom_err_message(self): + return "foobar" + + def test_hostame_regex_validator_constructor_with_args(self, regex_expr, re_flags, custom_err_message): + h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, message=custom_err_message) + assert h.regex == _lazy_re_compile(regex_expr, re_flags) + assert h.message == 'foobar' + assert h.code == 'invalid' + assert h.inverse_match == False + assert h.flags == re_flags + + def test_hostame_regex_validator_default_constructor(self, regex_expr, re_flags): + h = HostnameRegexValidator() + assert h.regex == _lazy_re_compile(regex_expr, re_flags) + assert h.message == 'Enter a valid value.' + assert h.code == 'invalid' + assert h.inverse_match == False + assert h.flags == re_flags + + def test_good_call(self, regex_expr, re_flags): + h = HostnameRegexValidator(regex=regex_expr, flags=re_flags) + assert (h("192.168.56.101"), None) + + def test_bad_call(self, regex_expr, re_flags): + h = HostnameRegexValidator(regex=regex_expr, flags=re_flags) + try: + h("@#$%)$#(TUFAS_DG") + except ValidationError as e: + assert e.message is not None + + def test_good_call_with_inverse(self, regex_expr, re_flags, inverse_match=True): + h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, inverse_match=inverse_match) + try: + h("1.2.3.4") + except ValidationError as e: + assert e.message is not None + + def test_bad_call_with_inverse(self, regex_expr, re_flags, inverse_match=True): + h = HostnameRegexValidator(regex=regex_expr, flags=re_flags, inverse_match=inverse_match) + assert (h("@#$%)$#(TUFAS_DG"), None)