Add install bundle support

group_vars all.yaml changes:
- peer entry has two fields, address and port
- receptor_port is inferred from the first
receptor_address entry that uses protocol tcp

other changes:
ActivityStream now records when receptor_addresses
are peered to

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster
2023-10-12 17:08:29 -04:00
committed by Seth Foster
parent c32f234ebb
commit bca68bcdf1
8 changed files with 46 additions and 48 deletions

View File

@@ -5505,6 +5505,7 @@ class InstanceSerializer(BaseSerializer):
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
health_check_pending = serializers.SerializerMethodField()
peers = serializers.PrimaryKeyRelatedField(many=True, required=False, queryset=ReceptorAddress.objects.all())
class Meta:
model = Instance
@@ -5540,8 +5541,7 @@ class InstanceSerializer(BaseSerializer):
'node_type',
'node_state',
'ip_address',
'listener_port',
'peers_from_control_nodes',
'peers',
)
extra_kwargs = {
'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION},
@@ -5565,6 +5565,7 @@ class InstanceSerializer(BaseSerializer):
res = super(InstanceSerializer, self).get_related(obj)
res['receptor_addresses'] = self.reverse('api:instance_receptor_addresses_list', kwargs={'pk': obj.pk})
res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk})
res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk})
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
if obj.node_type in [Instance.Types.EXECUTION, Instance.Types.HOP]:
res['install_bundle'] = self.reverse('api:instance_install_bundle', kwargs={'pk': obj.pk})
@@ -5610,13 +5611,8 @@ class InstanceSerializer(BaseSerializer):
raise serializers.ValidationError(_("Can only create instances on Kubernetes or OpenShift."))
node_type = get_field_from_model_or_attrs("node_type")
peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes")
listener_port = get_field_from_model_or_attrs("listener_port")
# peers = attrs.get('peers', [])
if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP):
raise serializers.ValidationError(_("peers_from_control_nodes can only be enabled for execution or hop nodes."))
# if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]:
# if check_peers_changed():
# raise serializers.ValidationError(
@@ -5672,24 +5668,6 @@ class InstanceSerializer(BaseSerializer):
return value
def validate_listener_port(self, value):
"""
Cannot change listener port, unless going from none to integer, and vice versa
"""
if value and self.instance and self.instance.listener_port and self.instance.listener_port != value:
raise serializers.ValidationError(_("Cannot change listener port."))
return value
def validate_peers_from_control_nodes(self, value):
"""
Can only enable for K8S based deployments
"""
if value and not settings.IS_K8S:
raise serializers.ValidationError(_("Can only be enabled on Kubernetes or Openshift."))
return value
class InstanceHealthCheckSerializer(BaseSerializer):
class Meta:

View File

@@ -17,19 +17,18 @@ custom_worksign_public_keyfile: receptor/work_public_key.pem
custom_tls_certfile: receptor/tls/receptor.crt
custom_tls_keyfile: receptor/tls/receptor.key
custom_ca_certfile: receptor/tls/ca/mesh-CA.crt
receptor_protocol: 'tcp'
{% if instance.listener_port %}
{% if listener_port %}
receptor_protocol: tcp
receptor_listener: true
receptor_port: {{ instance.listener_port }}
receptor_port: {{ listener_port }}
{% else %}
receptor_listener: false
{% endif %}
{% if peers %}
receptor_peers:
{% for peer in peers %}
- host: {{ peer.host }}
port: {{ peer.port }}
protocol: tcp
- address: {{ peer.address }}
protocol: {{ peer.protocol}}
{% endfor %}
{% endif %}
{% verbatim %}

View File

@@ -375,13 +375,13 @@ class InstanceUnifiedJobsList(SubListAPIView):
class InstancePeersList(SubListAPIView):
name = _("Instance Peers")
name = _("Peers")
parent_model = models.Instance
model = models.Instance
serializer_class = serializers.InstanceSerializer
model = models.ReceptorAddress
serializer_class = serializers.ReceptorAddressSerializer
parent_access = 'read'
search_fields = {'hostname'}
relationship = 'peers'
search_fields = 'address'
class InstanceReceptorAddressesList(ListCreateAPIView):

View File

@@ -124,10 +124,19 @@ def generate_inventory_yml(instance_obj):
def generate_group_vars_all_yml(instance_obj):
# get peers
peers = []
for instance in instance_obj.peers.all():
peers.append(dict(host=instance.hostname, port=instance.listener_port))
all_yaml = render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj, peers=peers))
for addr in instance_obj.peers.all():
peers.append(dict(address=addr.get_full_address(), protocol=addr.protocol))
context = dict(instance=instance_obj, peers=peers)
# we infer the listener port information from the first tcp receptor address
# currently for external remote nodes, we only support a single tcp backend listeners
listener_addr = instance_obj.receptor_addresses.filter(protocol="tcp").first()
if listener_addr:
context['listener_port'] = listener_addr.port
all_yaml = render_to_string("instance_install_bundle/group_vars/all.yml", context=context)
# convert consecutive newlines with a single newline
return re.sub(r'\n+', '\n', all_yaml)

View File

@@ -28,7 +28,11 @@ class Command(BaseCommand):
try:
instance = Instance.objects.get(hostname=kwargs.pop('hostname'))
kwargs['instance'] = instance
addr = ReceptorAddress.objects.create(**kwargs)
# address and protocol are unique together for ReceptorAddress
# If an address has (address, protocol), it will update the rest of the values suppled in defaults dict
# if no address exists with (address, protocol), then a new address will be created
# these unique together fields need to be consistent with the unique constraint in the ReceptorAddress model
addr, _ = ReceptorAddress.objects.update_or_create(address=kwargs.pop('address'), protocol=kwargs.pop('protocol'), defaults=kwargs)
print(f"Successfully added receptor address {addr.get_full_address()}")
self.changed = True
except Exception as e:

View File

@@ -115,7 +115,7 @@ class InstanceManager(models.Manager):
return node[0]
raise RuntimeError("No instance found with the current cluster host id")
def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', defaults=None):
def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', peers_from_control_nodes=False, defaults=None):
if not hostname:
hostname = settings.CLUSTER_HOST_ID
@@ -180,5 +180,13 @@ class InstanceManager(models.Manager):
uuid_option = {'uuid': node_uuid if node_uuid is not None else uuid.uuid4()}
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(hostname=hostname, ip_address=ip_address, listener_port=listener_port, node_type=node_type, **create_defaults, **uuid_option)
instance = self.create(
hostname=hostname,
ip_address=ip_address,
listener_port=listener_port,
node_type=node_type,
peers_from_control_nodes=peers_from_control_nodes,
**create_defaults,
**uuid_option
)
return (True, instance)

View File

@@ -77,6 +77,7 @@ class ActivityStream(models.Model):
notification_template = models.ManyToManyField("NotificationTemplate", blank=True)
notification = models.ManyToManyField("Notification", blank=True)
label = models.ManyToManyField("Label", blank=True)
receptor_address = models.ManyToManyField("ReceptorAddress", blank=True)
role = models.ManyToManyField("Role", blank=True)
instance = models.ManyToManyField("Instance", blank=True)
instance_group = models.ManyToManyField("InstanceGroup", blank=True)

View File

@@ -509,14 +509,13 @@ def receptor_address_saved(sender, instance, **kwargs):
address = instance
with disable_activity_stream():
control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID]))
if address.peers_from_control_nodes:
address.peers_from.add(*control_instances)
else:
address.peers_from.remove(*control_instances)
control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID]))
if address.peers_from_control_nodes:
address.peers_from.add(*control_instances)
else:
address.peers_from.remove(*control_instances)
schedule_write_receptor_config()
schedule_write_receptor_config()
@receiver(post_delete, sender=ReceptorAddress)