diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 6eae0601c1..6cede6f326 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5490,8 +5490,20 @@ class ReceptorAddressSerializer(BaseSerializer): class Meta: model = ReceptorAddress - fields = ('id', 'url', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'instance', 'peers_from_control_nodes', 'full_address') - read_only_fields = ('full_address',) + fields = ( + 'id', + 'url', + 'address', + 'port', + 'websocket_path', + 'k8s_routable', + 'canonical', + 'instance', + 'managed', + 'peers_from_control_nodes', + 'full_address', + ) + read_only_fields = ('full_address', 'managed', 'canonical', 'k8s_routable') def get_full_address(self, obj): return obj.get_full_address() @@ -5500,25 +5512,47 @@ class ReceptorAddressSerializer(BaseSerializer): def get_field_from_model_or_attrs(fd): return attrs.get(fd, self.instance and getattr(self.instance, fd) or None) + managed = get_field_from_model_or_attrs('managed') + canonical = get_field_from_model_or_attrs('canonical') + + if managed: + raise serializers.ValidationError(_("Cannot modify a managed address.")) + + # cannot modify address field if canonical is True + if canonical and attrs.get('address') and self.instance and self.instance.address != attrs.get('address'): + raise serializers.ValidationError(_("Cannot modify address field if it is canonical.")) + peers_from_control_nodes = get_field_from_model_or_attrs('peers_from_control_nodes') instance = get_field_from_model_or_attrs('instance') + address = get_field_from_model_or_attrs('address') - # only allow websocket_path to be set if protocol is ws - if attrs.get('protocol') != 'ws' and attrs.get('websocket_path'): + if not instance.listener_port: + raise serializers.ValidationError(_("Instance must have a listener port set.")) + + # only allow websocket_path to be set if instance protocol is ws + if attrs.get('websocket_path') and instance and instance.protocol != 'ws': raise serializers.ValidationError(_("Can only set websocket path if protocol is ws.")) # an instance can only have one address with peers_from_control_nodes set to True if peers_from_control_nodes: for other_address in ReceptorAddress.objects.filter(instance=instance.id): - if other_address.peers_from_control_nodes: + if other_address.address != address and other_address.peers_from_control_nodes: raise serializers.ValidationError(_("Only one address can set peers_from_control_nodes to True.")) - # is_internal should be False - if attrs.get('is_internal') == True: + # k8s_routable should be False + if attrs.get('k8s_routable') == True: raise serializers.ValidationError(_("Only external addresses can be created.")) return super().validate(attrs) + def update(self, obj, validated_data): + addr = super(ReceptorAddressSerializer, self).update(obj, validated_data) + if addr.port != addr.instance.listener_port: + addr.instance.listener_port = addr.port + addr.instance.save(update_fields=['listener_port']) + + return addr + class InstanceSerializer(BaseSerializer): show_capabilities = ['edit'] @@ -5531,10 +5565,11 @@ class InstanceSerializer(BaseSerializer): peers = serializers.PrimaryKeyRelatedField( help_text=_('Primary keys of receptor addresses to peer to.'), many=True, required=False, queryset=ReceptorAddress.objects.all() ) + reverse_peers = serializers.SerializerMethodField() class Meta: model = Instance - read_only_fields = ('ip_address', 'uuid', 'version') + read_only_fields = ('ip_address', 'uuid', 'version', 'managed', 'reverse_peers') fields = ( 'id', 'hostname', @@ -5565,8 +5600,12 @@ class InstanceSerializer(BaseSerializer): 'managed_by_policy', 'node_type', 'node_state', + 'managed', 'ip_address', 'peers', + 'reverse_peers', + 'listener_port', + 'protocol', ) extra_kwargs = { 'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION}, @@ -5599,6 +5638,37 @@ class InstanceSerializer(BaseSerializer): res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) return res + def create(self, validated_data): + # create a managed receptor address if listener port is defined + kwargs = { + 'port': validated_data.get('listener_port', None), + 'canonical': True, + } + kwargs = {k: v for k, v in kwargs.items() if v is not None} + instance = super(InstanceSerializer, self).create(validated_data) + if kwargs.get('port'): + instance.receptor_addresses.update_or_create(address=instance.hostname, defaults=kwargs) + else: + # delete the receptor address if the listener port is not defined + instance.receptor_addresses.filter(address=instance.hostname).delete() + return instance + + def update(self, obj, validated_data): + # update the managed receptor address if listener port is defined + kwargs = { + 'port': validated_data.get('listener_port', None), + 'canonical': True, + } + kwargs = {k: v for k, v in kwargs.items() if v is not None} + instance = super(InstanceSerializer, self).update(obj, validated_data) + if kwargs.get('port'): + instance.receptor_addresses.update_or_create(address=instance.hostname, defaults=kwargs) + else: + # delete the receptor address if the listener port is not defined + instance.receptor_addresses.filter(address=instance.hostname).delete() + + return instance + def get_summary_fields(self, obj): summary = super().get_summary_fields(obj) @@ -5608,6 +5678,9 @@ class InstanceSerializer(BaseSerializer): return summary + def get_reverse_peers(self, obj): + return Instance.objects.prefetch_related('peers').filter(peers__in=obj.receptor_addresses.all()).values_list('id', flat=True) + def get_consumed_capacity(self, obj): return obj.consumed_capacity @@ -5683,8 +5756,8 @@ class InstanceSerializer(BaseSerializer): raise serializers.ValidationError(_("Can only change the state on Kubernetes or OpenShift.")) if value != Instance.States.DEPROVISIONING: raise serializers.ValidationError(_("Can only change instances to the 'deprovisioning' state.")) - if self.instance.node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): - raise serializers.ValidationError(_("Can only deprovision execution or hop nodes.")) + if self.instance.managed: + raise serializers.ValidationError(_("Cannot deprovision managed nodes.")) else: if value and value != Instance.States.INSTALLED: raise serializers.ValidationError(_("Can only create instances in the 'installed' state.")) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index ae9baaa4dd..0e429d0190 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -405,6 +405,12 @@ class ReceptorAddressDetail(RetrieveUpdateDestroyAPIView): model = models.ReceptorAddress serializer_class = serializers.ReceptorAddressSerializer + def delete(self, request, *args, **kwargs): + obj = self.get_object() + if obj.canonical or obj.managed: + return Response({'detail': _('Cannot delete canonical or managed address.')}, status=status.HTTP_400_BAD_REQUEST) + return super(ReceptorAddressDetail, self).delete(request, *args, **kwargs) + class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAttachDetachAPIView): name = _("Instance's Instance Groups") diff --git a/awx/api/views/instance_install_bundle.py b/awx/api/views/instance_install_bundle.py index bdc1998a32..4d0fa437d7 100644 --- a/awx/api/views/instance_install_bundle.py +++ b/awx/api/views/instance_install_bundle.py @@ -126,15 +126,12 @@ def generate_inventory_yml(instance_obj): def generate_group_vars_all_yml(instance_obj): # get peers peers = [] - for addr in instance_obj.peers.all(): - peers.append(dict(address=addr.get_full_address(), protocol=addr.protocol)) + for addr in instance_obj.peers.all().prefetch_related('instance'): + peers.append(dict(address=addr.get_full_address(), protocol=addr.instance.protocol)) context = dict(instance=instance_obj, peers=peers) - # we infer the listener port information from the first tcp receptor address - # currently for external remote nodes, we only support a single tcp backend listeners - listener_addr = instance_obj.receptor_addresses.filter(protocol="tcp").first() - if listener_addr: - context['listener_port'] = listener_addr.port + if instance_obj.listener_port: + context['listener_port'] = instance_obj.listener_port all_yaml = render_to_string("instance_install_bundle/group_vars/all.yml", context=context) # convert consecutive newlines with a single newline diff --git a/awx/main/management/commands/add_receptor_address.py b/awx/main/management/commands/add_receptor_address.py index 890098f13f..032feb4fc5 100644 --- a/awx/main/management/commands/add_receptor_address.py +++ b/awx/main/management/commands/add_receptor_address.py @@ -8,13 +8,16 @@ from awx.main.models import Instance, ReceptorAddress def add_address(**kwargs): try: - instance = Instance.objects.get(hostname=kwargs.pop('hostname')) + instance = Instance.objects.get(hostname=kwargs.pop('instance')) kwargs['instance'] = instance - # address and protocol are unique together for ReceptorAddress - # If an address has (address, protocol), it will update the rest of the values suppled in defaults dict - # if no address exists with (address, protocol), then a new address will be created - # these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model - addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs) + # if ReceptorAddress already exists with address, just update + # otherwise, create new ReceptorAddress + addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), defaults=kwargs) + + # update listener_port on instance if address is canonical + if addr.canonical: + addr.instance.listener_port = addr.port + addr.instance.save(update_fields=['listener_port']) print(f"Successfully added receptor address {addr.get_full_address()}") changed = True except Exception as e: @@ -32,17 +35,22 @@ class Command(BaseCommand): help = "Add receptor address to an instance." def add_arguments(self, parser): - parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname this address is added to") + parser.add_argument('--instance', dest='instance', type=str, help="Instance hostname this address is added to") parser.add_argument('--address', dest='address', type=str, help="Receptor address") parser.add_argument('--port', dest='port', type=int, help="Receptor listener port") - parser.add_argument('--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws'], help="Protocol of the backend connection") parser.add_argument('--websocket_path', dest='websocket_path', type=str, default="", help="Path for websockets") - parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster") + parser.add_argument('--k8s_routable', action='store_true', help="If true, address only resolvable within the Kubernetes cluster") + parser.add_argument('--canonical', action='store_true', help="If true, address is the canonical address for the instance") parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address") + parser.add_argument('--managed', action='store_true', help="If True, this address should be managed by the control plane.") def handle(self, **options): self.changed = False - address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')} + address_options = { + k: options[k] + for k in ('instance', 'address', 'port', 'websocket_path', 'k8s_routable', 'peers_from_control_nodes', 'canonical', 'managed') + if options[k] + } self.changed = add_address(**address_options) if self.changed: print("(changed: True)") diff --git a/awx/main/management/commands/provision_instance.py b/awx/main/management/commands/provision_instance.py index e7f8063d61..5d17e59ad1 100644 --- a/awx/main/management/commands/provision_instance.py +++ b/awx/main/management/commands/provision_instance.py @@ -26,10 +26,13 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname used during provisioning") parser.add_argument('--listener_port', dest='listener_port', type=int, help="Receptor listener port") + parser.add_argument( + '--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws', 'wss'], help="Protocol to use for the Receptor listener" + ) parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type") parser.add_argument('--uuid', type=str, help="Instance UUID") - def _register_hostname(self, hostname, node_type, uuid, listener_port): + def _register_hostname(self, hostname, node_type, uuid, listener_port, protocol): if not hostname: if not settings.AWX_AUTO_DEPROVISION_INSTANCES: raise CommandError('Registering with values from settings only intended for use in K8s installs') @@ -37,7 +40,7 @@ class Command(BaseCommand): from awx.main.management.commands.register_queue import RegisterQueue (changed, instance) = Instance.objects.register( - ip_address=os.environ.get('MY_POD_IP'), listener_port=listener_port, node_type='control', node_uuid=settings.SYSTEM_UUID + ip_address=os.environ.get('MY_POD_IP'), listener_port=listener_port, node_type='control', node_uuid=settings.SYSTEM_UUID, protocol=protocol ) RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register() RegisterQueue( @@ -51,16 +54,19 @@ class Command(BaseCommand): max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS, ).register() else: - (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port) + (changed, instance) = Instance.objects.register( + hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port, protocol=protocol + ) if changed: print("Successfully registered instance {}".format(hostname)) else: print("Instance already registered {}".format(instance.hostname)) + self.changed = changed @transaction.atomic def handle(self, **options): self.changed = False - self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port')) + self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'), options.get('protocol')) if self.changed: print("(changed: True)") diff --git a/awx/main/managers.py b/awx/main/managers.py index 2936653d2d..eaccdb8b24 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -115,7 +115,17 @@ class InstanceManager(models.Manager): return node[0] raise RuntimeError("No instance found with the current cluster host id") - def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', peers_from_control_nodes=False, defaults=None): + def register( + self, + node_uuid=None, + hostname=None, + ip_address="", + listener_port=None, + protocol='tcp', + node_type='hybrid', + peers_from_control_nodes=False, + defaults=None, + ): if not hostname: hostname = settings.CLUSTER_HOST_ID @@ -161,6 +171,12 @@ class InstanceManager(models.Manager): if instance.node_type != node_type: instance.node_type = node_type update_fields.append('node_type') + if instance.protocol != protocol: + instance.protocol = protocol + update_fields.append('protocol') + if instance.listener_port != listener_port: + instance.listener_port = listener_port + update_fields.append('listener_port') if update_fields: instance.save(update_fields=update_fields) return (True, instance) @@ -171,6 +187,7 @@ class InstanceManager(models.Manager): create_defaults = { 'node_state': Instance.States.INSTALLED, 'capacity': 0, + 'managed': True, } if defaults is not None: create_defaults.update(defaults) @@ -185,8 +202,5 @@ class InstanceManager(models.Manager): **create_defaults, **uuid_option ) - from awx.main.management.commands.add_receptor_address import add_address - if listener_port: - add_address(address=hostname, hostname=hostname, port=listener_port, protocol='tcp') return (True, instance) diff --git a/awx/main/migrations/0188_inbound_hop_nodes.py b/awx/main/migrations/0188_inbound_hop_nodes.py index 3493ac19e8..d1cf76c3b0 100644 --- a/awx/main/migrations/0188_inbound_hop_nodes.py +++ b/awx/main/migrations/0188_inbound_hop_nodes.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.6 on 2023-11-16 05:50 +# Generated by Django 4.2.6 on 2023-12-14 19:14 import django.core.validators from django.db import migrations, models @@ -24,17 +24,10 @@ class Migration(migrations.Migration): validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(65535)], ), ), - ( - 'protocol', - models.CharField( - choices=[('tcp', 'TCP'), ('ws', 'WS'), ('wss', 'WSS')], - default='tcp', - help_text="Protocol to use when connecting, 'tcp', 'wss', or 'ws'.", - max_length=10, - ), - ), ('websocket_path', models.CharField(blank=True, default='', help_text='Websocket path.', max_length=255)), - ('is_internal', models.BooleanField(default=False, help_text='If True, only routable inside of the Kubernetes cluster.')), + ('k8s_routable', models.BooleanField(default=False, help_text='If True, only routable inside of the Kubernetes cluster.')), + ('canonical', models.BooleanField(default=False, help_text='If True, this address is the canonical address for the instance.')), + ('managed', models.BooleanField(default=False, editable=False, help_text='If True, this address is managed by the control plane.')), ( 'peers_from_control_nodes', models.BooleanField(default=False, help_text='If True, control plane cluster nodes should automatically peer to it.'), @@ -49,9 +42,31 @@ class Migration(migrations.Migration): name='instancelink', unique_together=set(), ), - migrations.RemoveField( + migrations.AddField( + model_name='instance', + name='managed', + field=models.BooleanField(default=False, editable=False, help_text='If True, this instance is managed by the control plane.'), + ), + migrations.AddField( + model_name='instance', + name='protocol', + field=models.CharField( + choices=[('tcp', 'TCP'), ('ws', 'WS'), ('wss', 'WSS')], + default='tcp', + help_text="Protocol to use for the Receptor listener, 'tcp', 'wss', or 'ws'.", + max_length=10, + ), + ), + migrations.AlterField( model_name='instance', name='listener_port', + field=models.PositiveIntegerField( + blank=True, + default=None, + help_text='Port that Receptor will listen for incoming connections on.', + null=True, + validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(65535)], + ), ), migrations.AlterField( model_name='instancelink', @@ -86,8 +101,6 @@ class Migration(migrations.Migration): ), migrations.AddConstraint( model_name='receptoraddress', - constraint=models.UniqueConstraint( - fields=('address', 'protocol'), name='unique_receptor_address', violation_error_message='Receptor address + protocol must be unique.' - ), + constraint=models.UniqueConstraint(fields=('address',), name='unique_receptor_address', violation_error_message='Receptor address must be unique.'), ), ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 95d5a69b82..6ced0bb1a9 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -5,7 +5,7 @@ from decimal import Decimal import logging import os -from django.core.validators import MinValueValidator +from django.core.validators import MinValueValidator, MaxValueValidator from django.db import models, connection from django.db.models.signals import post_save, post_delete from django.dispatch import receiver @@ -64,6 +64,12 @@ class HasPolicyEditsMixin(HasEditsMixin): return self._values_have_edits(new_values) +class Protocols(models.TextChoices): + TCP = 'tcp', 'TCP' + WS = 'ws', 'WS' + WSS = 'wss', 'WSS' + + class InstanceLink(BaseModel): class Meta: ordering = ("id",) @@ -165,6 +171,16 @@ class Instance(HasPolicyEditsMixin, BaseModel): default=0, editable=False, ) + listener_port = models.PositiveIntegerField( + blank=True, + null=True, + default=None, + validators=[MinValueValidator(0), MaxValueValidator(65535)], + help_text=_("Port that Receptor will listen for incoming connections on."), + ) + protocol = models.CharField( + help_text=_("Protocol to use for the Receptor listener, 'tcp', 'wss', or 'ws'."), max_length=10, default=Protocols.TCP, choices=Protocols.choices + ) class Types(models.TextChoices): CONTROL = 'control', _("Control plane node") @@ -187,6 +203,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.") ) + managed = models.BooleanField(help_text=_("If True, this instance is managed by the control plane."), default=False, editable=False) peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) diff --git a/awx/main/models/receptor_address.py b/awx/main/models/receptor_address.py index 25ff08db7e..68140c3647 100644 --- a/awx/main/models/receptor_address.py +++ b/awx/main/models/receptor_address.py @@ -9,24 +9,18 @@ class ReceptorAddress(models.Model): app_label = 'main' constraints = [ models.UniqueConstraint( - fields=["address", "protocol"], + fields=["address"], name="unique_receptor_address", - violation_error_message=_("Receptor address + protocol must be unique."), + violation_error_message=_("Receptor address must be unique."), ) ] - class Protocols(models.TextChoices): - TCP = 'tcp', 'TCP' - WS = 'ws', 'WS' - WSS = 'wss', 'WSS' - address = models.CharField(help_text=_("Routable address for this instance."), max_length=255) port = models.IntegerField(help_text=_("Port for the address."), default=27199, validators=[MinValueValidator(0), MaxValueValidator(65535)]) - protocol = models.CharField( - help_text=_("Protocol to use when connecting, 'tcp', 'wss', or 'ws'."), max_length=10, default=Protocols.TCP, choices=Protocols.choices - ) websocket_path = models.CharField(help_text=_("Websocket path."), max_length=255, default="", blank=True) - is_internal = models.BooleanField(help_text=_("If True, only routable inside of the Kubernetes cluster."), default=False) + k8s_routable = models.BooleanField(help_text=_("If True, only routable inside of the Kubernetes cluster."), default=False) + canonical = models.BooleanField(help_text=_("If True, this address is the canonical address for the instance."), default=False) + managed = models.BooleanField(help_text=_("If True, this address is managed by the control plane."), default=False, editable=False) peers_from_control_nodes = models.BooleanField(help_text=_("If True, control plane cluster nodes should automatically peer to it."), default=False) instance = models.ForeignKey( 'Instance', @@ -42,10 +36,10 @@ class ReceptorAddress(models.Model): scheme = "" path = "" port = "" - if self.protocol == "ws": + if self.instance.protocol == "ws": scheme = "wss://" - if self.protocol == "ws" and self.websocket_path: + if self.instance.protocol == "ws" and self.websocket_path: path = f"/{self.websocket_path}" if self.port: @@ -54,9 +48,9 @@ class ReceptorAddress(models.Model): return f"{scheme}{self.address}{port}{path}" def get_peer_type(self): - if self.protocol == 'tcp': + if self.instance.protocol == 'tcp': return 'tcp-peer' - elif self.protocol in ['ws', 'wss']: + elif self.instance.protocol in ['ws', 'wss']: return 'ws-peer' else: return None diff --git a/awx/main/tests/functional/api/test_instance_peers.py b/awx/main/tests/functional/api/test_instance_peers.py index 9ca7617607..a40189d483 100644 --- a/awx/main/tests/functional/api/test_instance_peers.py +++ b/awx/main/tests/functional/api/test_instance_peers.py @@ -63,14 +63,14 @@ class TestPeers: ) assert 'Cannot change node type.' in str(resp.data) - def test_is_internal(self, admin_user, post): + def test_k8s_routable(self, admin_user, post): """ - cannot set is_internal to True + cannot set k8s_routable to True """ hop = Instance.objects.create(hostname='abc', node_type="hop") resp = post( url=reverse('api:instance_receptor_addresses_list', kwargs={'pk': hop.pk}), - data={"address": "hopaddr", "is_internal": True}, + data={"address": "hopaddr", "k8s_routable": True}, user=admin_user, expect=400, )