Add support in hop nodes in API

This commit is contained in:
Seth Foster 2023-04-20 01:58:43 -04:00 committed by Seth Foster
parent 4fbdc412ad
commit d8abd4912b
7 changed files with 118 additions and 59 deletions

View File

@ -5368,6 +5368,11 @@ class InstanceNodeSerializer(BaseSerializer):
fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled')
class PeersSerializer(serializers.StringRelatedField):
def to_internal_value(self, value):
return Instance.objects.get(hostname=value)
class InstanceSerializer(BaseSerializer):
show_capabilities = ['edit']
@ -5376,6 +5381,7 @@ class InstanceSerializer(BaseSerializer):
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
health_check_pending = serializers.SerializerMethodField()
peers = PeersSerializer(many=True)
class Meta:
model = Instance
@ -5412,6 +5418,8 @@ class InstanceSerializer(BaseSerializer):
'node_state',
'ip_address',
'listener_port',
'peers',
'peers_from_control_nodes',
)
extra_kwargs = {
'node_type': {'initial': Instance.Types.EXECUTION, 'default': Instance.Types.EXECUTION},
@ -5464,22 +5472,22 @@ class InstanceSerializer(BaseSerializer):
def get_health_check_pending(self, obj):
return obj.health_check_pending
def validate(self, data):
if self.instance:
if self.instance.node_type == Instance.Types.HOP:
raise serializers.ValidationError("Hop node instances may not be changed.")
else:
if not settings.IS_K8S:
raise serializers.ValidationError("Can only create instances on Kubernetes or OpenShift.")
return data
def validate(self, attrs):
def get_field_from_model_or_attrs(fd):
return attrs.get(fd, self.instance and getattr(self.instance, fd) or None)
if not self.instance and not settings.IS_K8S:
raise serializers.ValidationError("Can only create instances on Kubernetes or OpenShift.")
node_type = get_field_from_model_or_attrs("node_type")
peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes")
if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP):
raise serializers.ValidationError("peers_from_control_nodes can only be enabled for execution or hop nodes.")
return super().validate(attrs)
def validate_node_type(self, value):
if not self.instance:
if value not in (Instance.Types.EXECUTION,):
raise serializers.ValidationError("Can only create execution nodes.")
else:
if self.instance.node_type != value:
raise serializers.ValidationError("Cannot change node type.")
if self.instance and self.instance.node_type != value:
raise serializers.ValidationError("Cannot change node type.")
return value
@ -5490,8 +5498,8 @@ class InstanceSerializer(BaseSerializer):
raise serializers.ValidationError("Can only change the state on Kubernetes or OpenShift.")
if value != Instance.States.DEPROVISIONING:
raise serializers.ValidationError("Can only change instances to the 'deprovisioning' state.")
if self.instance.node_type not in (Instance.Types.EXECUTION,):
raise serializers.ValidationError("Can only deprovision execution nodes.")
if self.instance.node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP):
raise serializers.ValidationError("Can only deprovision execution or hop nodes.")
else:
if value and value != Instance.States.INSTALLED:
raise serializers.ValidationError("Can only create instances in the 'installed' state.")

View File

@ -16,6 +16,14 @@ custom_ca_certfile: receptor/tls/ca/mesh-CA.crt
receptor_protocol: 'tcp'
receptor_listener: true
receptor_port: {{ instance.listener_port }}
{% if peers %}
receptor_peers:
{% for peer in peers %}
- host: {{ peer.host }}
port: {{ peer.port }}
protocol: tcp
{% endfor %}
{% endif %}
receptor_dependencies:
- python39-pip
{% verbatim %}

View File

@ -350,8 +350,9 @@ class InstanceDetail(RetrieveUpdateAPIView):
r = super(InstanceDetail, self).update(request, *args, **kwargs)
if status.is_success(r.status_code):
obj = self.get_object()
obj.set_capacity_value()
obj.save(update_fields=['capacity'])
capacity_changed = obj.set_capacity_value()
if capacity_changed:
obj.save(update_fields=['capacity'])
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r

View File

@ -6,6 +6,7 @@ import io
import ipaddress
import os
import tarfile
import time
import asn1
from awx.api import serializers
@ -49,9 +50,9 @@ class InstanceInstallBundle(GenericAPIView):
def get(self, request, *args, **kwargs):
instance_obj = self.get_object()
if instance_obj.node_type not in ('execution',):
if instance_obj.node_type not in ('execution', 'hop'):
return Response(
data=dict(msg=_('Install bundle can only be generated for execution nodes.')),
data=dict(msg=_('Install bundle can only be generated for execution or hop nodes.')),
status=status.HTTP_400_BAD_REQUEST,
)
@ -66,37 +67,37 @@ class InstanceInstallBundle(GenericAPIView):
# generate and write the receptor key to receptor/tls/receptor.key in the tar file
key, cert = generate_receptor_tls(instance_obj)
def tar_addfile(tarinfo, filecontent):
tarinfo.mtime = time.time()
tarinfo.size = len(filecontent)
tar.addfile(tarinfo, io.BytesIO(filecontent))
key_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/receptor/tls/receptor.key")
key_tarinfo.size = len(key)
tar.addfile(key_tarinfo, io.BytesIO(key))
tar_addfile(key_tarinfo, key)
cert_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/receptor/tls/receptor.crt")
cert_tarinfo.size = len(cert)
tar.addfile(cert_tarinfo, io.BytesIO(cert))
tar_addfile(cert_tarinfo, cert)
# generate and write install_receptor.yml to the tar file
playbook = generate_playbook().encode('utf-8')
playbook_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/install_receptor.yml")
playbook_tarinfo.size = len(playbook)
tar.addfile(playbook_tarinfo, io.BytesIO(playbook))
tar_addfile(playbook_tarinfo, playbook)
# generate and write inventory.yml to the tar file
inventory_yml = generate_inventory_yml(instance_obj).encode('utf-8')
inventory_yml_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/inventory.yml")
inventory_yml_tarinfo.size = len(inventory_yml)
tar.addfile(inventory_yml_tarinfo, io.BytesIO(inventory_yml))
tar_addfile(inventory_yml_tarinfo, inventory_yml)
# generate and write group_vars/all.yml to the tar file
group_vars = generate_group_vars_all_yml(instance_obj).encode('utf-8')
group_vars_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/group_vars/all.yml")
group_vars_tarinfo.size = len(group_vars)
tar.addfile(group_vars_tarinfo, io.BytesIO(group_vars))
tar_addfile(group_vars_tarinfo, group_vars)
# generate and write requirements.yml to the tar file
requirements_yml = generate_requirements_yml().encode('utf-8')
requirements_yml_tarinfo = tarfile.TarInfo(f"{instance_obj.hostname}_install_bundle/requirements.yml")
requirements_yml_tarinfo.size = len(requirements_yml)
tar.addfile(requirements_yml_tarinfo, io.BytesIO(requirements_yml))
tar_addfile(requirements_yml_tarinfo, requirements_yml)
# respond with the tarfile
f.seek(0)
@ -118,7 +119,10 @@ def generate_inventory_yml(instance_obj):
def generate_group_vars_all_yml(instance_obj):
return render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj))
peers = []
for instance in instance_obj.peers.all():
peers.append(dict(host=instance.hostname, port=instance.listener_port))
return render_to_string("instance_install_bundle/group_vars/all.yml", context=dict(instance=instance_obj, peers=peers))
def generate_receptor_tls(instance_obj):

View File

@ -0,0 +1,17 @@
# Generated by Django 3.2.16 on 2023-04-25 19:46
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0182_constructed_inventory'),
]
operations = [
migrations.AddField(
model_name='instance',
name='peers_from_control_nodes',
field=models.BooleanField(default=False, help_text='If True, control plane cluster nodes should automatically peer to it.'),
),
]

View File

@ -80,6 +80,9 @@ class InstanceLink(BaseModel):
class Instance(HasPolicyEditsMixin, BaseModel):
"""A model representing an AWX instance running against this database."""
def __str__(self):
return self.hostname
objects = InstanceManager()
# Fields set in instance registration
@ -175,6 +178,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
)
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it."))
class Meta:
app_label = 'main'
@ -279,6 +283,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
return update_fields
def set_capacity_value(self):
old_val = self.capacity
"""Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled and self.node_type != 'hop':
lower_cap = min(self.mem_capacity, self.cpu_capacity)
@ -286,6 +291,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
else:
self.capacity = 0
return int(self.capacity) != int(old_val) # return True if value changed
def refresh_capacity_fields(self):
"""Update derived capacity fields from cpu and memory (no save)"""
@ -466,7 +472,7 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs
@receiver(post_save, sender=Instance)
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,):
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP):
if instance.node_state == Instance.States.DEPROVISIONING:
from awx.main.tasks.receptor import remove_deprovisioned_node # prevents circular import
@ -474,11 +480,10 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
# node and kick off write_receptor_config
connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname]))
if instance.node_state == Instance.States.INSTALLED:
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
# broadcast to all control instances to update their receptor configs
connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all'))
# broadcast to all control instances to update their receptor configs
connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all'))
if created or instance.has_policy_changes():
schedule_policy_task()
@ -493,6 +498,11 @@ def on_instance_group_deleted(sender, instance, using, **kwargs):
@receiver(post_delete, sender=Instance)
def on_instance_deleted(sender, instance, using, **kwargs):
schedule_policy_task()
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP) and instance.peers_from_control_nodes:
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
# broadcast to all control instances to update their receptor configs
connection.on_commit(lambda: write_receptor_config.apply_async(kwargs=dict(force=True), queue='tower_broadcast_all'))
class UnifiedJobTemplateInstanceGroupMembership(models.Model):

View File

@ -676,39 +676,50 @@ RECEPTOR_CONFIG_STARTER = (
@task()
def write_receptor_config():
def write_receptor_config(force=False):
"""
only control nodes will run this
"""
lock = FileLock(__RECEPTOR_CONF_LOCKFILE)
with lock:
receptor_config = list(RECEPTOR_CONFIG_STARTER)
this_inst = Instance.objects.me()
instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION)
existing_peers = {link.target_id for link in InstanceLink.objects.filter(source=this_inst)}
new_links = []
for instance in instances:
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
receptor_config.append(peer)
if instance.id not in existing_peers:
new_links.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING))
instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP))
existing_peers = this_inst.peers.all()
InstanceLink.objects.bulk_create(new_links)
links_added = []
links_removed = False
for instance in instances:
if not instance.peers_from_control_nodes and instance in existing_peers:
this_inst.peers.remove(instance)
links_removed = True
if instance.peers_from_control_nodes:
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
receptor_config.append(peer)
if instance not in existing_peers:
links_added.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING))
InstanceLink.objects.bulk_create(links_added)
with open(__RECEPTOR_CONF, 'w') as file:
yaml.dump(receptor_config, file, default_flow_style=False)
# This needs to be outside of the lock because this function itself will acquire the lock.
receptor_ctl = get_receptor_ctl()
if force or links_removed or links_added:
logger.debug("Receptor config changed, reloading receptor")
# This needs to be outside of the lock because this function itself will acquire the lock.
receptor_ctl = get_receptor_ctl()
attempts = 10
for backoff in range(1, attempts + 1):
try:
receptor_ctl.simple_command("reload")
break
except ValueError:
logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.")
time.sleep(backoff)
else:
raise RuntimeError("Receptor reload failed")
attempts = 10
for backoff in range(1, attempts + 1):
try:
receptor_ctl.simple_command("reload")
break
except ValueError:
logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.")
time.sleep(backoff)
else:
raise RuntimeError("Receptor reload failed")
links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING)
links.update(link_state=InstanceLink.States.ESTABLISHED)