From 3a17c45b6493017678c9d4db9a4f1866fa33fe05 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 2 Nov 2023 14:08:13 -0400 Subject: [PATCH] Register_peers support for receptor_addresses register_peers has inputs: source: source instance peers: list of instances the source should peer to InstanceLink "target" is now expected to be a ReceptorAddress For each peer, we can just use the first receptor address. If multiple receptor addresses exist, throw a command error. Currently this command is only used on VM-deployments, where there is only a single receptor address per instance, so this should work fine. Other changes: drop listener_port field from Instance. Listener port is now just "port" on ReceptorAddress Signed-off-by: Seth Foster --- .../commands/add_receptor_address.py | 35 ++++++++++--------- .../management/commands/list_instances.py | 2 +- .../management/commands/register_peers.py | 16 ++++++--- awx/main/managers.py | 8 ++--- awx/main/migrations/0188_inbound_hop_nodes.py | 15 ++++++-- awx/main/models/ha.py | 7 ---- awx/main/models/receptor_address.py | 4 +-- 7 files changed, 48 insertions(+), 39 deletions(-) diff --git a/awx/main/management/commands/add_receptor_address.py b/awx/main/management/commands/add_receptor_address.py index c61930c50c..890098f13f 100644 --- a/awx/main/management/commands/add_receptor_address.py +++ b/awx/main/management/commands/add_receptor_address.py @@ -2,11 +2,27 @@ # All Rights Reserved from django.core.management.base import BaseCommand -from django.db import transaction from awx.main.models import Instance, ReceptorAddress +def add_address(**kwargs): + try: + instance = Instance.objects.get(hostname=kwargs.pop('hostname')) + kwargs['instance'] = instance + # address and protocol are unique together for ReceptorAddress + # If an address has (address, protocol), it will update the rest of the values suppled in defaults dict + # if no address exists with (address, protocol), then a new address will be created + # these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model + addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs) + print(f"Successfully added receptor address {addr.get_full_address()}") + changed = True + except Exception as e: + changed = False + print(f"Error adding receptor address: {e}") + return changed + + class Command(BaseCommand): """ Internal tower command. @@ -24,24 +40,9 @@ class Command(BaseCommand): parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster") parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address") - def _add_address(self, **kwargs): - try: - instance = Instance.objects.get(hostname=kwargs.pop('hostname')) - kwargs['instance'] = instance - # address and protocol are unique together for ReceptorAddress - # If an address has (address, protocol), it will update the rest of the values suppled in defaults dict - # if no address exists with (address, protocol), then a new address will be created - # these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model - addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs) - print(f"Successfully added receptor address {addr.get_full_address()}") - self.changed = True - except Exception as e: - self.changed = False - print(f"Error adding receptor address: {e}") - def handle(self, **options): self.changed = False address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')} - self._add_address(**address_options) + self.changed = add_address(**address_options) if self.changed: print("(changed: True)") diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index a434e2299e..4ebb6505e9 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -55,7 +55,7 @@ class Command(BaseCommand): capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else '' version = f" version={x.version or '?'}" if x.node_type != 'hop' else '' - heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else '' + heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.last_seen and x.capacity or x.node_type == 'hop' else '' print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}{end_color}') print() diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 078edb08c7..8948e6ef6d 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -28,7 +28,7 @@ class Command(BaseCommand): def handle(self, **options): # provides a mapping of hostname to Instance objects - nodes = Instance.objects.in_bulk(field_name='hostname') + nodes = Instance.objects.all().prefetch_related('receptor_addresses').in_bulk(field_name='hostname') if options['source'] not in nodes: raise CommandError(f"Host {options['source']} is not a registered instance.") @@ -39,6 +39,12 @@ class Command(BaseCommand): if options['exact'] is not None and options['disconnect']: raise CommandError("The option --disconnect may not be used with --exact.") + # make sure peers and source instances only have one receptor address + for hostname, node in nodes.items(): + if hostname in options.get('peers', []) or hostname == options['source']: + if node.receptor_addresses.count() > 1: + raise CommandError(f"Instance {hostname} has more than one receptor address.") + # No 1-cycles for collection in ('peers', 'disconnect', 'exact'): if options[collection] is not None and options['source'] in options[collection]: @@ -60,7 +66,7 @@ class Command(BaseCommand): results = 0 for target in options['peers']: _, created = InstanceLink.objects.update_or_create( - source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED} ) if created: results += 1 @@ -72,7 +78,7 @@ class Command(BaseCommand): for target in options['disconnect']: if target not in nodes: # Be permissive, the node might have already been de-registered. continue - n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete() + n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target].receptor_addresses.get()).delete() results += n print(f"{results} peer links removed from the database.") @@ -82,10 +88,10 @@ class Command(BaseCommand): with transaction.atomic(): peers = set(options['exact']) links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True)) - removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete() + removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__instance__hostname__in=links - peers).delete() for target in peers - links: _, created = InstanceLink.objects.update_or_create( - source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED} ) if created: additions += 1 diff --git a/awx/main/managers.py b/awx/main/managers.py index ce5bef7e60..2936653d2d 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -161,9 +161,6 @@ class InstanceManager(models.Manager): if instance.node_type != node_type: instance.node_type = node_type update_fields.append('node_type') - if instance.listener_port != listener_port: - instance.listener_port = listener_port - update_fields.append('listener_port') if update_fields: instance.save(update_fields=update_fields) return (True, instance) @@ -183,10 +180,13 @@ class InstanceManager(models.Manager): instance = self.create( hostname=hostname, ip_address=ip_address, - listener_port=listener_port, node_type=node_type, peers_from_control_nodes=peers_from_control_nodes, **create_defaults, **uuid_option ) + from awx.main.management.commands.add_receptor_address import add_address + + if listener_port: + add_address(address=hostname, hostname=hostname, port=listener_port, protocol='tcp') return (True, instance) diff --git a/awx/main/migrations/0188_inbound_hop_nodes.py b/awx/main/migrations/0188_inbound_hop_nodes.py index 77570c2ef9..117012c4d7 100644 --- a/awx/main/migrations/0188_inbound_hop_nodes.py +++ b/awx/main/migrations/0188_inbound_hop_nodes.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.5 on 2023-10-10 00:26 +# Generated by Django 4.2.6 on 2023-11-02 18:07 from django.db import migrations, models import django.db.models.deletion @@ -15,8 +15,8 @@ class Migration(migrations.Migration): fields=[ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('address', models.CharField(max_length=255)), - ('port', models.IntegerField()), - ('protocol', models.CharField(max_length=10)), + ('port', models.IntegerField(default=27199)), + ('protocol', models.CharField(default='tcp', max_length=10)), ('websocket_path', models.CharField(blank=True, default='', max_length=255)), ('is_internal', models.BooleanField(default=False)), ('peers_from_control_nodes', models.BooleanField(default=False)), @@ -30,6 +30,10 @@ class Migration(migrations.Migration): name='instancelink', unique_together=set(), ), + migrations.RemoveField( + model_name='instance', + name='listener_port', + ), migrations.AlterField( model_name='instancelink', name='source', @@ -40,6 +44,11 @@ class Migration(migrations.Migration): name='instance', field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'), ), + migrations.AddField( + model_name='activitystream', + name='receptor_address', + field=models.ManyToManyField(blank=True, to='main.receptoraddress'), + ), migrations.AlterField( model_name='instance', name='peers', diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index e38f792320..3073466c7e 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -185,13 +185,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): node_state = models.CharField( choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.") ) - listener_port = models.PositiveIntegerField( - blank=True, - null=True, - default=None, - validators=[MinValueValidator(1024), MaxValueValidator(65535)], - help_text=_("Port that Receptor will listen for incoming connections on."), - ) peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) diff --git a/awx/main/models/receptor_address.py b/awx/main/models/receptor_address.py index c47d229912..0de057f4c9 100644 --- a/awx/main/models/receptor_address.py +++ b/awx/main/models/receptor_address.py @@ -18,8 +18,8 @@ class ReceptorAddress(models.Model): ] address = models.CharField(max_length=255) - port = models.IntegerField(blank=False) - protocol = models.CharField(max_length=10) + port = models.IntegerField(default=27199) + protocol = models.CharField(max_length=10, default="tcp") websocket_path = models.CharField(max_length=255, default="", blank=True) is_internal = models.BooleanField(default=False) peers_from_control_nodes = models.BooleanField(default=False)