Add peers_from_control_nodes to ReceptorAddress

- write_receptor_config peers to ReceptorAddress entries
that have peers_from_control_nodes enabled

- peers_from_control_nodes and listener_port removed from Instance model

- peers_from_control_nodes added to ReceptorAddress model

- ReceptorAddress is now unique by address and protocol combination

- Write receptor config task is dispatched upon ReceptorAddress creation
or deletion, and when control node is first created

- InstanceLinkSerializer adds a target_address field and has logic
to grab the instance hostname associated with the peered ReceptorAddress

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster 2023-10-11 13:21:35 -04:00 committed by Seth Foster
parent 5cb3d3b078
commit c32f234ebb
10 changed files with 162 additions and 164 deletions

View File

@ -5459,17 +5459,25 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria
class InstanceLinkSerializer(BaseSerializer):
class Meta:
model = InstanceLink
fields = ('id', 'url', 'related', 'source', 'target', 'link_state')
fields = ('id', 'related', 'source', 'target', 'target_full_address', 'link_state')
source = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all())
target = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all())
target = serializers.SerializerMethodField()
target_full_address = serializers.SerializerMethodField()
def get_related(self, obj):
res = super(InstanceLinkSerializer, self).get_related(obj)
res['source_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.source.id})
res['target_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.target.id})
res['target_address'] = self.reverse('api:receptor_address_detail', kwargs={'pk': obj.target.id})
return res
def get_target(self, obj):
return obj.target.instance.hostname
def get_target_full_address(self, obj):
return obj.target.get_full_address()
class InstanceNodeSerializer(BaseSerializer):
class Meta:
@ -5482,7 +5490,7 @@ class ReceptorAddressSerializer(BaseSerializer):
class Meta:
model = ReceptorAddress
fields = ('id', 'url', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'instance', 'full_address')
fields = ('id', 'url', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'instance', 'peers_from_control_nodes', 'full_address')
read_only = 'full_address'
def get_full_address(self, obj):
@ -5497,7 +5505,6 @@ class InstanceSerializer(BaseSerializer):
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
health_check_pending = serializers.SerializerMethodField()
peers = serializers.SlugRelatedField(many=True, required=False, slug_field="hostname", queryset=Instance.objects.all())
class Meta:
model = Instance
@ -5534,7 +5541,6 @@ class InstanceSerializer(BaseSerializer):
'node_state',
'ip_address',
'listener_port',
'peers',
'peers_from_control_nodes',
)
extra_kwargs = {
@ -5562,7 +5568,6 @@ class InstanceSerializer(BaseSerializer):
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
if obj.node_type in [Instance.Types.EXECUTION, Instance.Types.HOP]:
res['install_bundle'] = self.reverse('api:instance_install_bundle', kwargs={'pk': obj.pk})
res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk})
if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor:
if obj.node_type == 'execution':
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
@ -5607,30 +5612,30 @@ class InstanceSerializer(BaseSerializer):
node_type = get_field_from_model_or_attrs("node_type")
peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes")
listener_port = get_field_from_model_or_attrs("listener_port")
peers = attrs.get('peers', [])
# peers = attrs.get('peers', [])
if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP):
raise serializers.ValidationError(_("peers_from_control_nodes can only be enabled for execution or hop nodes."))
if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]:
if check_peers_changed():
raise serializers.ValidationError(
_("Setting peers manually for control nodes is not allowed. Enable peers_from_control_nodes on the hop and execution nodes instead.")
)
# if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]:
# if check_peers_changed():
# raise serializers.ValidationError(
# _("Setting peers manually for control nodes is not allowed. Enable peers_from_control_nodes on the hop and execution nodes instead.")
# )
if not listener_port and peers_from_control_nodes:
raise serializers.ValidationError(_("Field listener_port must be a valid integer when peers_from_control_nodes is enabled."))
# if not listener_port and peers_from_control_nodes:
# raise serializers.ValidationError(_("Field listener_port must be a valid integer when peers_from_control_nodes is enabled."))
if not listener_port and self.instance and self.instance.peers_from.exists():
raise serializers.ValidationError(_("Field listener_port must be a valid integer when other nodes peer to it."))
# if not listener_port and self.instance and self.instance.peers_from.exists():
# raise serializers.ValidationError(_("Field listener_port must be a valid integer when other nodes peer to it."))
for peer in peers:
if peer.listener_port is None:
raise serializers.ValidationError(_("Field listener_port must be set on peer ") + peer.hostname + ".")
# for peer in peers:
# if peer.listener_port is None:
# raise serializers.ValidationError(_("Field listener_port must be set on peer ") + peer.hostname + ".")
if not settings.IS_K8S:
if check_peers_changed():
raise serializers.ValidationError(_("Cannot change peers."))
# if not settings.IS_K8S:
# if check_peers_changed():
# raise serializers.ValidationError(_("Cannot change peers."))
return super().validate(attrs)

View File

@ -22,6 +22,7 @@ class Command(BaseCommand):
parser.add_argument('--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws'], help="Protocol of the backend connection")
parser.add_argument('--websocket_path', dest='websocket_path', type=str, default="", help="Path for websockets")
parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster")
parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address")
def _add_address(self, **kwargs):
try:
@ -34,10 +35,9 @@ class Command(BaseCommand):
self.changed = False
print(f"Error adding receptor address: {e}")
@transaction.atomic
def handle(self, **options):
self.changed = False
address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal')}
address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')}
self._add_address(**address_options)
if self.changed:
print("(changed: True)")

View File

@ -28,9 +28,8 @@ class Command(BaseCommand):
parser.add_argument('--listener_port', dest='listener_port', type=int, help="Receptor listener port")
parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type")
parser.add_argument('--uuid', type=str, help="Instance UUID")
parser.add_argument('--peers_from_control_nodes', action='store_true', help="If set, will automatically peer from control nodes")
def _register_hostname(self, hostname, node_type, uuid, listener_port, peers_from_control_nodes):
def _register_hostname(self, hostname, node_type, uuid, listener_port):
if not hostname:
if not settings.AWX_AUTO_DEPROVISION_INSTANCES:
raise CommandError('Registering with values from settings only intended for use in K8s installs')
@ -52,9 +51,7 @@ class Command(BaseCommand):
max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS,
).register()
else:
(changed, instance) = Instance.objects.register(
hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port, peers_from_control_nodes=peers_from_control_nodes
)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port)
if changed:
print("Successfully registered instance {}".format(hostname))
else:
@ -64,8 +61,6 @@ class Command(BaseCommand):
@transaction.atomic
def handle(self, **options):
self.changed = False
self._register_hostname(
options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'), options.get('peers_from_control_nodes')
)
self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'))
if self.changed:
print("(changed: True)")

View File

@ -115,7 +115,7 @@ class InstanceManager(models.Manager):
return node[0]
raise RuntimeError("No instance found with the current cluster host id")
def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', peers_from_control_nodes=False, defaults=None):
def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', defaults=None):
if not hostname:
hostname = settings.CLUSTER_HOST_ID
@ -180,13 +180,5 @@ class InstanceManager(models.Manager):
uuid_option = {'uuid': node_uuid if node_uuid is not None else uuid.uuid4()}
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(
hostname=hostname,
ip_address=ip_address,
listener_port=listener_port,
node_type=node_type,
peers_from_control_nodes=peers_from_control_nodes,
**create_defaults,
**uuid_option
)
instance = self.create(hostname=hostname, ip_address=ip_address, listener_port=listener_port, node_type=node_type, **create_defaults, **uuid_option)
return (True, instance)

View File

@ -0,0 +1,59 @@
# Generated by Django 4.2.5 on 2023-10-10 00:26
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0187_hop_nodes'),
]
operations = [
migrations.CreateModel(
name='ReceptorAddress',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('address', models.CharField(max_length=255)),
('port', models.IntegerField()),
('protocol', models.CharField(max_length=10)),
('websocket_path', models.CharField(blank=True, default='', max_length=255)),
('is_internal', models.BooleanField(default=False)),
('peers_from_control_nodes', models.BooleanField(default=False)),
],
),
migrations.RemoveConstraint(
model_name='instancelink',
name='source_and_target_can_not_be_equal',
),
migrations.AlterUniqueTogether(
name='instancelink',
unique_together=set(),
),
migrations.AlterField(
model_name='instancelink',
name='source',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='main.instance'),
),
migrations.AddField(
model_name='receptoraddress',
name='instance',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'),
),
migrations.AlterField(
model_name='instance',
name='peers',
field=models.ManyToManyField(related_name='peers_from', through='main.InstanceLink', to='main.receptoraddress'),
),
migrations.AlterField(
model_name='instancelink',
name='target',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='main.receptoraddress'),
),
migrations.AddConstraint(
model_name='receptoraddress',
constraint=models.UniqueConstraint(
fields=('address', 'protocol'), name='unique_receptor_address', violation_error_message='Receptor address + protocol must be unique.'
),
),
]

View File

@ -1,43 +0,0 @@
# Generated by Django 4.2.5 on 2023-10-04 06:51
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0187_hop_nodes'),
]
operations = [
migrations.CreateModel(
name='ReceptorAddress',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('address', models.CharField(max_length=255)),
('port', models.IntegerField(null=True)),
('protocol', models.CharField(max_length=10)),
('websocket_path', models.CharField(blank=True, default='', max_length=255)),
('is_internal', models.BooleanField(default=False)),
('instance', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance')),
],
),
migrations.AddConstraint(
model_name='receptoraddress',
constraint=models.UniqueConstraint(
condition=models.Q(('port', None)),
fields=('address', 'protocol', 'websocket_path'),
name='unique_receptor_address_no_port',
violation_error_message='Receptor address must be unique.',
),
),
migrations.AddConstraint(
model_name='receptoraddress',
constraint=models.UniqueConstraint(
condition=models.Q(('port', None), _negated=True),
fields=('address', 'port', 'protocol', 'websocket_path'),
name='unique_receptor_address_with_port',
violation_error_message='Receptor address must be unique.',
),
),
]

View File

@ -34,6 +34,7 @@ from awx.main.models.rbac import (
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity
from awx.main.models.mixins import RelatedJobsMixin, ResourceMixin
from awx.main.models.receptor_address import ReceptorAddress
# ansible-runner
from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes
@ -64,8 +65,11 @@ class HasPolicyEditsMixin(HasEditsMixin):
class InstanceLink(BaseModel):
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
class Meta:
ordering = ("id",)
source = models.ForeignKey('Instance', on_delete=models.CASCADE)
target = models.ForeignKey('ReceptorAddress', on_delete=models.CASCADE)
class States(models.TextChoices):
ADDING = 'adding', _('Adding')
@ -76,11 +80,6 @@ class InstanceLink(BaseModel):
choices=States.choices, default=States.ADDING, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
)
class Meta:
unique_together = ('source', 'target')
ordering = ("id",)
constraints = [models.CheckConstraint(check=~models.Q(source=models.F('target')), name='source_and_target_can_not_be_equal')]
class Instance(HasPolicyEditsMixin, BaseModel):
"""A model representing an AWX instance running against this database."""
@ -194,7 +193,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
help_text=_("Port that Receptor will listen for incoming connections on."),
)
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from')
peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from')
peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it."))
POLICY_FIELDS = frozenset(('managed_by_policy', 'hostname', 'capacity_adjustment'))
@ -502,6 +501,29 @@ def schedule_write_receptor_config(broadcast=True):
write_receptor_config() # just run locally
# TODO: don't use the receiver post save, just call this at the moment when we need it
# that way we don't call this multiple times unnecessarily
@receiver(post_save, sender=ReceptorAddress)
def receptor_address_saved(sender, instance, **kwargs):
from awx.main.signals import disable_activity_stream
address = instance
with disable_activity_stream():
control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID]))
if address.peers_from_control_nodes:
address.peers_from.add(*control_instances)
else:
address.peers_from.remove(*control_instances)
schedule_write_receptor_config()
@receiver(post_delete, sender=ReceptorAddress)
def receptor_address_deleted(sender, instance, **kwargs):
schedule_write_receptor_config()
@receiver(post_save, sender=Instance)
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
'''
@ -512,10 +534,12 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
2. a node changes its value of peers_from_control_nodes
3. a new control node comes online and has instances to peer to
'''
from awx.main.signals import disable_activity_stream
if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]:
inst = Instance.objects.filter(peers_from_control_nodes=True)
if set(instance.peers.all()) != set(inst):
instance.peers.set(inst)
peers_address = ReceptorAddress.objects.filter(peers_from_control_nodes=True)
with disable_activity_stream():
instance.peers.add(*peers_address)
schedule_write_receptor_config(broadcast=False)
if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]:
@ -525,16 +549,6 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
# wait for jobs on the node to complete, then delete the
# node and kick off write_receptor_config
connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname]))
else:
control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID]))
if instance.peers_from_control_nodes:
if (control_instances & set(instance.peers_from.all())) != set(control_instances):
instance.peers_from.add(*control_instances)
schedule_write_receptor_config() # keep method separate to make pytest mocking easier
else:
if set(control_instances) & set(instance.peers_from.all()):
instance.peers_from.remove(*control_instances)
schedule_write_receptor_config()
if created or instance.has_policy_changes():
schedule_policy_task()

View File

@ -5,32 +5,24 @@ from django.dispatch import receiver
from awx.api.versioning import reverse
from django.db.models import Sum, Q
from awx.main.models.ha import schedule_write_receptor_config
class ReceptorAddress(models.Model):
class Meta:
app_label = 'main'
constraints = [
models.UniqueConstraint(
fields=["address", "protocol", "websocket_path"],
condition=Q(port=None),
name="unique_receptor_address_no_port",
violation_error_message=_("Receptor address must be unique."),
),
models.UniqueConstraint(
fields=["address", "port", "protocol", "websocket_path"],
condition=~Q(port=None),
name="unique_receptor_address_with_port",
violation_error_message=_("Receptor address must be unique."),
),
fields=["address", "protocol"],
name="unique_receptor_address",
violation_error_message=_("Receptor address + protocol must be unique."),
)
]
address = models.CharField(max_length=255)
port = models.IntegerField(null=True)
port = models.IntegerField(blank=False)
protocol = models.CharField(max_length=10)
websocket_path = models.CharField(max_length=255, default="", blank=True)
is_internal = models.BooleanField(default=False)
peers_from_control_nodes = models.BooleanField(default=False)
instance = models.ForeignKey(
'Instance',
related_name='receptor_addresses',
@ -62,13 +54,3 @@ class ReceptorAddress(models.Model):
def get_absolute_url(self, request=None):
return reverse('api:receptor_address_detail', kwargs={'pk': self.pk}, request=request)
@receiver(post_save, sender=ReceptorAddress)
def receptor_address_saved(sender, instance, **kwargs):
schedule_write_receptor_config(True)
@receiver(post_delete, sender=ReceptorAddress)
def receptor_address_deleted(sender, instance, **kwargs):
schedule_write_receptor_config(True)

View File

@ -27,7 +27,7 @@ from awx.main.utils.common import (
)
from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER
from awx.main.tasks.signals import signal_state, signal_callback, SignalExit
from awx.main.models import Instance, InstanceLink, UnifiedJob
from awx.main.models import Instance, InstanceLink, UnifiedJob, ReceptorAddress
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task
from awx.main.utils.pglock import advisory_lock
@ -676,49 +676,44 @@ RECEPTOR_CONFIG_STARTER = (
)
def should_update_config(instances):
def should_update_config(new_config):
'''
checks that the list of instances matches the list of
tcp-peers in the config
'''
current_config = read_receptor_config() # this gets receptor conf lock
current_peers = []
for config_entry in current_config:
for key, value in config_entry.items():
if key.endswith('-peer'):
current_peers.append(value['address'])
intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances]
logger.debug(f"Peers current {current_peers} intended {intended_peers}")
if set(current_peers) == set(intended_peers):
return False # config file is already update to date
return True
current_config = read_receptor_config() # this gets receptor conf lock
for config_entry in current_config:
if config_entry not in new_config:
logger.warning(f"{config_entry} should not be in receptor config. Updating.")
return True
for config_entry in new_config:
if config_entry not in current_config:
logger.warning(f"{config_entry} missing from receptor config. Updating.")
return True
return False
def generate_config_data():
# returns two values
# receptor config - based on current database peers
# should_update - If True, receptor_config differs from the receptor conf file on disk
instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True)
addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True)
receptor_config = list(RECEPTOR_CONFIG_STARTER)
for instance in instances:
# receptor_addresses = instance.receptor_addresses.all()
# if not receptor_addresses.exists() and instance.listener_port:
# peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
# receptor_config.append(peer)
for address in instance.receptor_addresses.all():
if address.get_peer_type() and address.is_internal:
peer = {
f'{address.get_peer_type()}': {
'address': f'{address.get_full_address()}',
'tls': 'tlsclient',
}
for address in addresses:
if address.get_peer_type() and address.is_internal:
peer = {
f'{address.get_peer_type()}': {
'address': f'{address.get_full_address()}',
'tls': 'tlsclient',
}
receptor_config.append(peer)
else:
logger.warning(f"Receptor address {address} has unsupported peer type, skipping.")
should_update = should_update_config(instances)
}
receptor_config.append(peer)
else:
logger.warning(f"Receptor address {address} has unsupported peer type, skipping.")
should_update = should_update_config(receptor_config)
return receptor_config, should_update
@ -760,14 +755,13 @@ def write_receptor_config():
with lock:
with open(__RECEPTOR_CONF, 'w') as file:
yaml.dump(receptor_config, file, default_flow_style=False)
reload_receptor()
@task(queue=get_task_queuename)
def remove_deprovisioned_node(hostname):
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
InstanceLink.objects.filter(target__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
node_jobs = UnifiedJob.objects.filter(
execution_node=hostname,

View File

@ -495,7 +495,7 @@ def inspect_established_receptor_connections(mesh_status):
update_links = []
for link in all_links:
if link.link_state != InstanceLink.States.REMOVING:
if link.target.hostname in active_receptor_conns.get(link.source.hostname, {}):
if link.target.instance.hostname in active_receptor_conns.get(link.source.hostname, {}):
if link.link_state is not InstanceLink.States.ESTABLISHED:
link.link_state = InstanceLink.States.ESTABLISHED
update_links.append(link)