From d8abd4912b8c8fe0e0d2ed847e7927d319705195 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 20 Apr 2023 01:58:43 -0400 Subject: [PATCH] Add support in hop nodes in API --- awx/api/serializers.py | 40 ++++++++------ .../group_vars/all.yml | 8 +++ awx/api/views/__init__.py | 5 +- awx/api/views/instance_install_bundle.py | 32 ++++++----- .../0183_instance_peers_from_control_nodes.py | 17 ++++++ awx/main/models/ha.py | 20 +++++-- awx/main/tasks/receptor.py | 55 +++++++++++-------- 7 files changed, 118 insertions(+), 59 deletions(-) create mode 100644 awx/main/migrations/0183_instance_peers_from_control_nodes.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index dfa72c802f..06ca3fd3e8 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5368,6 +5368,11 @@ class InstanceNodeSerializer(BaseSerializer): fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled') +class PeersSerializer(serializers.StringRelatedField): + def to_internal_value(self, value): + return Instance.objects.get(hostname=value) + + class InstanceSerializer(BaseSerializer): show_capabilities = ['edit'] @@ -5376,6 +5381,7 @@ class InstanceSerializer(BaseSerializer): jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True) jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True) health_check_pending = serializers.SerializerMethodField() + peers = PeersSerializer(many=True) class Meta: model = Instance @@ -5412,6 +5418,8 @@ class InstanceSerializer(BaseSerializer): 'node_state', 'ip_address', 'listener_port', + 'peers', + 'peers_from_control_nodes', ) extra_kwargs = { 'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION}, @@ -5464,22 +5472,22 @@ class InstanceSerializer(BaseSerializer): def get_health_check_pending(self, obj): return obj.health_check_pending - def validate(self, data): - if self.instance: - if self.instance.node_type == Instance.Types.HOP: - raise serializers.ValidationError("Hop node instances may not be changed.") - else: - if not settings.IS_K8S: - raise serializers.ValidationError("Can only create instances on Kubernetes or OpenShift.") - return data + def validate(self, attrs): + def get_field_from_model_or_attrs(fd): + return attrs.get(fd, self.instance and getattr(self.instance, fd) or None) + + if not self.instance and not settings.IS_K8S: + raise serializers.ValidationError("Can only create instances on Kubernetes or OpenShift.") + node_type = get_field_from_model_or_attrs("node_type") + peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes") + + if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): + raise serializers.ValidationError("peers_from_control_nodes can only be enabled for execution or hop nodes.") + return super().validate(attrs) def validate_node_type(self, value): - if not self.instance: - if value not in (Instance.Types.EXECUTION,): - raise serializers.ValidationError("Can only create execution nodes.") - else: - if self.instance.node_type != value: - raise serializers.ValidationError("Cannot change node type.") + if self.instance and self.instance.node_type != value: + raise serializers.ValidationError("Cannot change node type.") return value @@ -5490,8 +5498,8 @@ class InstanceSerializer(BaseSerializer): raise serializers.ValidationError("Can only change the state on Kubernetes or OpenShift.") if value != Instance.States.DEPROVISIONING: raise serializers.ValidationError("Can only change instances to the 'deprovisioning' state.") - if self.instance.node_type not in (Instance.Types.EXECUTION,): - raise serializers.ValidationError("Can only deprovision execution nodes.") + if self.instance.node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): + raise serializers.ValidationError("Can only deprovision execution or hop nodes.") else: if value and value != Instance.States.INSTALLED: raise serializers.ValidationError("Can only create instances in the 'installed' state.") diff --git a/awx/api/templates/instance_install_bundle/group_vars/all.yml b/awx/api/templates/instance_install_bundle/group_vars/all.yml index c043547dc6..952de3d6d1 100644 --- a/awx/api/templates/instance_install_bundle/group_vars/all.yml +++ b/awx/api/templates/instance_install_bundle/group_vars/all.yml @@ -16,6 +16,14 @@ custom_ca_certfile: receptor/tls/ca/mesh-CA.crt receptor_protocol: 'tcp' receptor_listener: true receptor_port: {{ instance.listener_port }} +{% if peers %} +receptor_peers: +{% for peer in peers %} + - host: {{ peer.host }} + port: {{ peer.port }} + protocol: tcp +{% endfor %} +{% endif %} receptor_dependencies: - python39-pip {% verbatim %} diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 893788268a..1d9fe9bfd2 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -350,8 +350,9 @@ class InstanceDetail(RetrieveUpdateAPIView): r = super(InstanceDetail, self).update(request, *args, **kwargs) if status.is_success(r.status_code): obj = self.get_object() - obj.set_capacity_value() - obj.save(update_fields=['capacity']) + capacity_changed = obj.set_capacity_value() + if capacity_changed: + obj.save(update_fields=['capacity']) r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj) return r diff --git a/awx/api/views/instance_install_bundle.py b/awx/api/views/instance_install_bundle.py index 8cd3c31e3b..060ccf9f20 100644 --- a/awx/api/views/instance_install_bundle.py +++ b/awx/api/views/instance_install_bundle.py @@ -6,6 +6,7 @@ import io import ipaddress import os import tarfile +import time import asn1 from awx.api import serializers @@ -49,9 +50,9 @@ class InstanceInstallBundle(GenericAPIView): def get(self, request, *args, **kwargs): instance_obj = self.get_object() - if instance_obj.node_type not in ('execution',): + if instance_obj.node_type not in ('execution', 'hop'): return Response( - data=dict(msg=_('Install bundle can only be generated for execution nodes.')), + data=dict(msg=_('Install bundle can only be generated for execution or hop nodes.')), status=status.HTTP_400_BAD_REQUEST, ) @@ -66,37 +67,37 @@ class InstanceInstallBundle(GenericAPIView): # generate and write the receptor key to receptor/tls/receptor.key in the tar file key, cert = generate_receptor_tls(instance_obj) + def tar_addfile(tarinfo, filecontent): + tarinfo.mtime = time.time() + tarinfo.size = len(filecontent) + tar.addfile(tarinfo, io.BytesIO(filecontent)) + key_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/receptor/tls/receptor.key") - key_tarinfo.size = len(key) - tar.addfile(key_tarinfo, io.BytesIO(key)) + tar_addfile(key_tarinfo, key) cert_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/receptor/tls/receptor.crt") cert_tarinfo.size = len(cert) - tar.addfile(cert_tarinfo, io.BytesIO(cert)) + tar_addfile(cert_tarinfo, cert) # generate and write install_receptor.yml to the tar file playbook = generate_playbook().encode('utf-8') playbook_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/install_receptor.yml") - playbook_tarinfo.size = len(playbook) - tar.addfile(playbook_tarinfo, io.BytesIO(playbook)) + tar_addfile(playbook_tarinfo, playbook) # generate and write inventory.yml to the tar file inventory_yml = generate_inventory_yml(instance_obj).encode('utf-8') inventory_yml_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/inventory.yml") - inventory_yml_tarinfo.size = len(inventory_yml) - tar.addfile(inventory_yml_tarinfo, io.BytesIO(inventory_yml)) + tar_addfile(inventory_yml_tarinfo, inventory_yml) # generate and write group_vars/all.yml to the tar file group_vars = generate_group_vars_all_yml(instance_obj).encode('utf-8') group_vars_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/group_vars/all.yml") - group_vars_tarinfo.size = len(group_vars) - tar.addfile(group_vars_tarinfo, io.BytesIO(group_vars)) + tar_addfile(group_vars_tarinfo, group_vars) # generate and write requirements.yml to the tar file requirements_yml = generate_requirements_yml().encode('utf-8') requirements_yml_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/requirements.yml") - requirements_yml_tarinfo.size = len(requirements_yml) - tar.addfile(requirements_yml_tarinfo, io.BytesIO(requirements_yml)) + tar_addfile(requirements_yml_tarinfo, requirements_yml) # respond with the tarfile f.seek(0) @@ -118,7 +119,10 @@ def generate_inventory_yml(instance_obj): def generate_group_vars_all_yml(instance_obj): - return render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj)) + peers = [] + for instance in instance_obj.peers.all(): + peers.append(dict(host=instance.hostname, port=instance.listener_port)) + return render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj, peers=peers)) def generate_receptor_tls(instance_obj): diff --git a/awx/main/migrations/0183_instance_peers_from_control_nodes.py b/awx/main/migrations/0183_instance_peers_from_control_nodes.py new file mode 100644 index 0000000000..e2e0692dba --- /dev/null +++ b/awx/main/migrations/0183_instance_peers_from_control_nodes.py @@ -0,0 +1,17 @@ +# Generated by Django 3.2.16 on 2023-04-25 19:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ('main', '0182_constructed_inventory'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='peers_from_control_nodes', + field=models.BooleanField(default=False, help_text='If True, control plane cluster nodes should automatically peer to it.'), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 888eb8b92e..d2b892b241 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -80,6 +80,9 @@ class InstanceLink(BaseModel): class Instance(HasPolicyEditsMixin, BaseModel): """A model representing an AWX instance running against this database.""" + def __str__(self): + return self.hostname + objects = InstanceManager() # Fields set in instance registration @@ -175,6 +178,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): ) peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target')) + peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) class Meta: app_label = 'main' @@ -279,6 +283,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): return update_fields def set_capacity_value(self): + old_val = self.capacity """Sets capacity according to capacity adjustment rule (no save)""" if self.enabled and self.node_type != 'hop': lower_cap = min(self.mem_capacity, self.cpu_capacity) @@ -286,6 +291,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment else: self.capacity = 0 + return int(self.capacity) != int(old_val) # return True if value changed def refresh_capacity_fields(self): """Update derived capacity fields from cpu and memory (no save)""" @@ -466,7 +472,7 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs @receiver(post_save, sender=Instance) def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): - if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,): + if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP): if instance.node_state == Instance.States.DEPROVISIONING: from awx.main.tasks.receptor import remove_deprovisioned_node # prevents circular import @@ -474,11 +480,10 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): # node and kick off write_receptor_config connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname])) - if instance.node_state == Instance.States.INSTALLED: - from awx.main.tasks.receptor import write_receptor_config # prevents circular import + from awx.main.tasks.receptor import write_receptor_config # prevents circular import - # broadcast to all control instances to update their receptor configs - connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) + # broadcast to all control instances to update their receptor configs + connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) if created or instance.has_policy_changes(): schedule_policy_task() @@ -493,6 +498,11 @@ def on_instance_group_deleted(sender, instance, using, **kwargs): @receiver(post_delete, sender=Instance) def on_instance_deleted(sender, instance, using, **kwargs): schedule_policy_task() + if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP) and instance.peers_from_control_nodes: + from awx.main.tasks.receptor import write_receptor_config # prevents circular import + + # broadcast to all control instances to update their receptor configs + connection.on_commit(lambda: write_receptor_config.apply_async(kwargs=dict(force=True), queue='tower_broadcast_all')) class UnifiedJobTemplateInstanceGroupMembership(models.Model): diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index d887c82e8b..7516a68132 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -676,39 +676,50 @@ RECEPTOR_CONFIG_STARTER = ( @task() -def write_receptor_config(): +def write_receptor_config(force=False): + """ + only control nodes will run this + """ lock = FileLock(__RECEPTOR_CONF_LOCKFILE) with lock: receptor_config = list(RECEPTOR_CONFIG_STARTER) this_inst = Instance.objects.me() - instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION) - existing_peers = {link.target_id for link in InstanceLink.objects.filter(source=this_inst)} - new_links = [] - for instance in instances: - peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} - receptor_config.append(peer) - if instance.id not in existing_peers: - new_links.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING)) + instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP)) + existing_peers = this_inst.peers.all() - InstanceLink.objects.bulk_create(new_links) + links_added = [] + links_removed = False + for instance in instances: + if not instance.peers_from_control_nodes and instance in existing_peers: + this_inst.peers.remove(instance) + links_removed = True + if instance.peers_from_control_nodes: + peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} + receptor_config.append(peer) + if instance not in existing_peers: + links_added.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING)) + + InstanceLink.objects.bulk_create(links_added) with open(__RECEPTOR_CONF, 'w') as file: yaml.dump(receptor_config, file, default_flow_style=False) - # This needs to be outside of the lock because this function itself will acquire the lock. - receptor_ctl = get_receptor_ctl() + if force or links_removed or links_added: + logger.debug("Receptor config changed, reloading receptor") + # This needs to be outside of the lock because this function itself will acquire the lock. + receptor_ctl = get_receptor_ctl() - attempts = 10 - for backoff in range(1, attempts + 1): - try: - receptor_ctl.simple_command("reload") - break - except ValueError: - logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.") - time.sleep(backoff) - else: - raise RuntimeError("Receptor reload failed") + attempts = 10 + for backoff in range(1, attempts + 1): + try: + receptor_ctl.simple_command("reload") + break + except ValueError: + logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.") + time.sleep(backoff) + else: + raise RuntimeError("Receptor reload failed") links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING) links.update(link_state=InstanceLink.States.ESTABLISHED)