Add canonical receptor address

Creates a non-deletable address that acts as
the "main" address for this instance.

All other addresses for that instance must
be non-canonical.

When listener_port on an instance is set, automatically
create a canonical receptor address where:
  - address is hostname of instance
  - port is listener_port
  - canonical is True

Additionally, protocol field is added to instance to
denote the receptor listener protocol to use (ws, tcp).

The receptor config listener information is derived from
the listener_port and protocol information. Having a
canonical address that mirrors the listener_port ensures that
an address exists that matches the receptor config information.

Other changes:
- Add managed field to receptor address.
If managed is True, no fields on on this address can be edited
via the API.
If canonical is True, only the address cannot be edited.

- Add managed field to instance. If managed is True, users
cannot set node_state to deprovisioning (i.e. cannot delete node)

This change to our mechanism to prevent users from deleting
the mesh ingress hop node.

- Field is_internal is now renamed to k8s_routable

- Add reverse_peers on instance which is a list of instance IDs
that peer to this instance (via an address)

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster
2023-12-18 16:43:43 -05:00
committed by Seth Foster
parent 46dc61253f
commit 9ba70c151d
10 changed files with 197 additions and 69 deletions

View File

@@ -8,13 +8,16 @@ from awx.main.models import Instance, ReceptorAddress
def add_address(**kwargs):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
instance = Instance.objects.get(hostname=kwargs.pop('instance'))
kwargs['instance'] = instance
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
# if ReceptorAddress already exists with address, just update
# otherwise, create new ReceptorAddress
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), defaults=kwargs)
# update listener_port on instance if address is canonical
if addr.canonical:
addr.instance.listener_port = addr.port
addr.instance.save(update_fields=['listener_port'])
print(f"Successfully added receptor address {addr.get_full_address()}")
changed = True
except Exception as e:
@@ -32,17 +35,22 @@ class Command(BaseCommand):
help = "Add receptor address to an instance."
def add_arguments(self, parser):
parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname this address is added to")
parser.add_argument('--instance', dest='instance', type=str, help="Instance hostname this address is added to")
parser.add_argument('--address', dest='address', type=str, help="Receptor address")
parser.add_argument('--port', dest='port', type=int, help="Receptor listener port")
parser.add_argument('--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws'], help="Protocol of the backend connection")
parser.add_argument('--websocket_path', dest='websocket_path', type=str, default="", help="Path for websockets")
parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster")
parser.add_argument('--k8s_routable', action='store_true', help="If true, address only resolvable within the Kubernetes cluster")
parser.add_argument('--canonical', action='store_true', help="If true, address is the canonical address for the instance")
parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address")
parser.add_argument('--managed', action='store_true', help="If True, this address should be managed by the control plane.")
def handle(self, **options):
self.changed = False
address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')}
address_options = {
k: options[k]
for k in ('instance', 'address', 'port', 'websocket_path', 'k8s_routable', 'peers_from_control_nodes', 'canonical', 'managed')
if options[k]
}
self.changed = add_address(**address_options)
if self.changed:
print("(changed: True)")

View File

@@ -26,10 +26,13 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname used during provisioning")
parser.add_argument('--listener_port', dest='listener_port', type=int, help="Receptor listener port")
parser.add_argument(
'--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws', 'wss'], help="Protocol to use for the Receptor listener"
)
parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type")
parser.add_argument('--uuid', type=str, help="Instance UUID")
def _register_hostname(self, hostname, node_type, uuid, listener_port):
def _register_hostname(self, hostname, node_type, uuid, listener_port, protocol):
if not hostname:
if not settings.AWX_AUTO_DEPROVISION_INSTANCES:
raise CommandError('Registering with values from settings only intended for use in K8s installs')
@@ -37,7 +40,7 @@ class Command(BaseCommand):
from awx.main.management.commands.register_queue import RegisterQueue
(changed, instance) = Instance.objects.register(
ip_address=os.environ.get('MY_POD_IP'), listener_port=listener_port, node_type='control', node_uuid=settings.SYSTEM_UUID
ip_address=os.environ.get('MY_POD_IP'), listener_port=listener_port, node_type='control', node_uuid=settings.SYSTEM_UUID, protocol=protocol
)
RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue(
@@ -51,16 +54,19 @@ class Command(BaseCommand):
max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS,
).register()
else:
(changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port)
(changed, instance) = Instance.objects.register(
hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port, protocol=protocol
)
if changed:
print("Successfully registered instance {}".format(hostname))
else:
print("Instance already registered {}".format(instance.hostname))
self.changed = changed
@transaction.atomic
def handle(self, **options):
self.changed = False
self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'))
self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'), options.get('protocol'))
if self.changed:
print("(changed: True)")