Register_peers support for receptor_addresses

register_peers has inputs:

source: source instance
peers: list of instances the source should peer to

InstanceLink "target" is now expected to be a ReceptorAddress

For each peer, we can just use the first receptor address. If
multiple receptor addresses exist, throw a command error.

Currently this command is only used on VM-deployments, where
there is only a single receptor address per instance, so this
should work fine.

Other changes:
drop listener_port field from Instance. Listener port is now just
"port" on ReceptorAddress

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster 2023-11-02 14:08:13 -04:00 committed by Seth Foster
parent bca68bcdf1
commit 3a17c45b64
7 changed files with 48 additions and 39 deletions

View File

@ -2,11 +2,27 @@
# All Rights Reserved
from django.core.management.base import BaseCommand
from django.db import transaction
from awx.main.models import Instance, ReceptorAddress
def add_address(**kwargs):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
kwargs['instance'] = instance
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
print(f"Successfully added receptor address {addr.get_full_address()}")
changed = True
except Exception as e:
changed = False
print(f"Error adding receptor address: {e}")
return changed
class Command(BaseCommand):
"""
Internal tower command.
@ -24,24 +40,9 @@ class Command(BaseCommand):
parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster")
parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address")
def _add_address(self, **kwargs):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
kwargs['instance'] = instance
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
print(f"Successfully added receptor address {addr.get_full_address()}")
self.changed = True
except Exception as e:
self.changed = False
print(f"Error adding receptor address: {e}")
def handle(self, **options):
self.changed = False
address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')}
self._add_address(**address_options)
self.changed = add_address(**address_options)
if self.changed:
print("(changed: True)")

View File

@ -55,7 +55,7 @@ class Command(BaseCommand):
capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else ''
version = f" version={x.version or '?'}" if x.node_type != 'hop' else ''
heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else ''
heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.last_seen and x.capacity or x.node_type == 'hop' else ''
print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}{end_color}')
print()

View File

@ -28,7 +28,7 @@ class Command(BaseCommand):
def handle(self, **options):
# provides a mapping of hostname to Instance objects
nodes = Instance.objects.in_bulk(field_name='hostname')
nodes = Instance.objects.all().prefetch_related('receptor_addresses').in_bulk(field_name='hostname')
if options['source'] not in nodes:
raise CommandError(f"Host {options['source']} is not a registered instance.")
@ -39,6 +39,12 @@ class Command(BaseCommand):
if options['exact'] is not None and options['disconnect']:
raise CommandError("The option --disconnect may not be used with --exact.")
# make sure peers and source instances only have one receptor address
for hostname, node in nodes.items():
if hostname in options.get('peers', []) or hostname == options['source']:
if node.receptor_addresses.count() > 1:
raise CommandError(f"Instance {hostname} has more than one receptor address.")
# No 1-cycles
for collection in ('peers', 'disconnect', 'exact'):
if options[collection] is not None and options['source'] in options[collection]:
@ -60,7 +66,7 @@ class Command(BaseCommand):
results = 0
for target in options['peers']:
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
results += 1
@ -72,7 +78,7 @@ class Command(BaseCommand):
for target in options['disconnect']:
if target not in nodes: # Be permissive, the node might have already been de-registered.
continue
n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete()
n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target].receptor_addresses.get()).delete()
results += n
print(f"{results} peer links removed from the database.")
@ -82,10 +88,10 @@ class Command(BaseCommand):
with transaction.atomic():
peers = set(options['exact'])
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__instance__hostname__in=links - peers).delete()
for target in peers - links:
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
source=nodes[options['source']], target=nodes[target].receptor_addresses.get(), defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
additions += 1

View File

@ -161,9 +161,6 @@ class InstanceManager(models.Manager):
if instance.node_type != node_type:
instance.node_type = node_type
update_fields.append('node_type')
if instance.listener_port != listener_port:
instance.listener_port = listener_port
update_fields.append('listener_port')
if update_fields:
instance.save(update_fields=update_fields)
return (True, instance)
@ -183,10 +180,13 @@ class InstanceManager(models.Manager):
instance = self.create(
hostname=hostname,
ip_address=ip_address,
listener_port=listener_port,
node_type=node_type,
peers_from_control_nodes=peers_from_control_nodes,
**create_defaults,
**uuid_option
)
from awx.main.management.commands.add_receptor_address import add_address
if listener_port:
add_address(address=hostname, hostname=hostname, port=listener_port, protocol='tcp')
return (True, instance)

View File

@ -1,4 +1,4 @@
# Generated by Django 4.2.5 on 2023-10-10 00:26
# Generated by Django 4.2.6 on 2023-11-02 18:07
from django.db import migrations, models
import django.db.models.deletion
@ -15,8 +15,8 @@ class Migration(migrations.Migration):
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('address', models.CharField(max_length=255)),
('port', models.IntegerField()),
('protocol', models.CharField(max_length=10)),
('port', models.IntegerField(default=27199)),
('protocol', models.CharField(default='tcp', max_length=10)),
('websocket_path', models.CharField(blank=True, default='', max_length=255)),
('is_internal', models.BooleanField(default=False)),
('peers_from_control_nodes', models.BooleanField(default=False)),
@ -30,6 +30,10 @@ class Migration(migrations.Migration):
name='instancelink',
unique_together=set(),
),
migrations.RemoveField(
model_name='instance',
name='listener_port',
),
migrations.AlterField(
model_name='instancelink',
name='source',
@ -40,6 +44,11 @@ class Migration(migrations.Migration):
name='instance',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'),
),
migrations.AddField(
model_name='activitystream',
name='receptor_address',
field=models.ManyToManyField(blank=True, to='main.receptoraddress'),
),
migrations.AlterField(
model_name='instance',
name='peers',

View File

@ -185,13 +185,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
node_state = models.CharField(
choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.")
)
listener_port = models.PositiveIntegerField(
blank=True,
null=True,
default=None,
validators=[MinValueValidator(1024), MaxValueValidator(65535)],
help_text=_("Port that Receptor will listen for incoming connections on."),
)
peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from')
peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it."))

View File

@ -18,8 +18,8 @@ class ReceptorAddress(models.Model):
]
address = models.CharField(max_length=255)
port = models.IntegerField(blank=False)
protocol = models.CharField(max_length=10)
port = models.IntegerField(default=27199)
protocol = models.CharField(max_length=10, default="tcp")
websocket_path = models.CharField(max_length=255, default="", blank=True)
is_internal = models.BooleanField(default=False)
peers_from_control_nodes = models.BooleanField(default=False)