mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 23:12:08 -03:30
Merge pull request #12587 from ansible/mesh-scaling-backend
Allow for adding external execution nodes via API
This commit is contained in:
commit
2ae9156a4a
@ -4753,7 +4753,7 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria
|
||||
class InstanceLinkSerializer(BaseSerializer):
|
||||
class Meta:
|
||||
model = InstanceLink
|
||||
fields = ('source', 'target')
|
||||
fields = ('source', 'target', 'link_state')
|
||||
|
||||
source = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
||||
target = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
||||
@ -4762,31 +4762,25 @@ class InstanceLinkSerializer(BaseSerializer):
|
||||
class InstanceNodeSerializer(BaseSerializer):
|
||||
class Meta:
|
||||
model = Instance
|
||||
fields = ('id', 'hostname', 'node_type', 'node_state')
|
||||
|
||||
node_state = serializers.SerializerMethodField()
|
||||
|
||||
def get_node_state(self, obj):
|
||||
if not obj.enabled:
|
||||
return "disabled"
|
||||
return "error" if obj.errors else "healthy"
|
||||
fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled')
|
||||
|
||||
|
||||
class InstanceSerializer(BaseSerializer):
|
||||
|
||||
consumed_capacity = serializers.SerializerMethodField()
|
||||
percent_capacity_remaining = serializers.SerializerMethodField()
|
||||
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance'), read_only=True)
|
||||
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
|
||||
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = Instance
|
||||
read_only_fields = ('uuid', 'hostname', 'version', 'node_type')
|
||||
read_only_fields = ('uuid', 'hostname', 'version', 'node_type', 'node_state')
|
||||
fields = (
|
||||
"id",
|
||||
"type",
|
||||
"url",
|
||||
"related",
|
||||
"summary_fields",
|
||||
"uuid",
|
||||
"hostname",
|
||||
"created",
|
||||
@ -4808,6 +4802,7 @@ class InstanceSerializer(BaseSerializer):
|
||||
"enabled",
|
||||
"managed_by_policy",
|
||||
"node_type",
|
||||
"node_state",
|
||||
)
|
||||
|
||||
def get_related(self, obj):
|
||||
@ -4819,6 +4814,14 @@ class InstanceSerializer(BaseSerializer):
|
||||
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
|
||||
return res
|
||||
|
||||
def get_summary_fields(self, obj):
|
||||
summary = super().get_summary_fields(obj)
|
||||
|
||||
if self.is_detail_view:
|
||||
summary['links'] = InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source').filter(source=obj), many=True).data
|
||||
|
||||
return summary
|
||||
|
||||
def get_consumed_capacity(self, obj):
|
||||
return obj.consumed_capacity
|
||||
|
||||
|
||||
@ -440,6 +440,7 @@ class InstanceHealthCheck(GenericAPIView):
|
||||
def post(self, request, *args, **kwargs):
|
||||
obj = self.get_object()
|
||||
|
||||
# Note: hop nodes are already excluded by the get_queryset method
|
||||
if obj.node_type == 'execution':
|
||||
from awx.main.tasks.system import execution_node_health_check
|
||||
|
||||
|
||||
@ -27,7 +27,9 @@ class Command(BaseCommand):
|
||||
)
|
||||
|
||||
def handle(self, **options):
|
||||
# provides a mapping of hostname to Instance objects
|
||||
nodes = Instance.objects.in_bulk(field_name='hostname')
|
||||
|
||||
if options['source'] not in nodes:
|
||||
raise CommandError(f"Host {options['source']} is not a registered instance.")
|
||||
if not (options['peers'] or options['disconnect'] or options['exact'] is not None):
|
||||
@ -57,7 +59,9 @@ class Command(BaseCommand):
|
||||
|
||||
results = 0
|
||||
for target in options['peers']:
|
||||
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
|
||||
_, created = InstanceLink.objects.update_or_create(
|
||||
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
|
||||
)
|
||||
if created:
|
||||
results += 1
|
||||
|
||||
@ -80,7 +84,9 @@ class Command(BaseCommand):
|
||||
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
|
||||
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
|
||||
for target in peers - links:
|
||||
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
|
||||
_, created = InstanceLink.objects.update_or_create(
|
||||
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
|
||||
)
|
||||
if created:
|
||||
additions += 1
|
||||
|
||||
|
||||
@ -129,10 +129,13 @@ class InstanceManager(models.Manager):
|
||||
# if instance was not retrieved by uuid and hostname was, use the hostname
|
||||
instance = self.filter(hostname=hostname)
|
||||
|
||||
from awx.main.models import Instance
|
||||
|
||||
# Return existing instance
|
||||
if instance.exists():
|
||||
instance = instance.first() # in the unusual occasion that there is more than one, only get one
|
||||
update_fields = []
|
||||
instance.node_state = Instance.States.INSTALLED # Wait for it to show up on the mesh
|
||||
update_fields = ['node_state']
|
||||
# if instance was retrieved by uuid and hostname has changed, update hostname
|
||||
if instance.hostname != hostname:
|
||||
logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname))
|
||||
@ -141,6 +144,7 @@ class InstanceManager(models.Manager):
|
||||
# if any other fields are to be updated
|
||||
if instance.ip_address != ip_address:
|
||||
instance.ip_address = ip_address
|
||||
update_fields.append('ip_address')
|
||||
if instance.node_type != node_type:
|
||||
instance.node_type = node_type
|
||||
update_fields.append('node_type')
|
||||
@ -151,12 +155,12 @@ class InstanceManager(models.Manager):
|
||||
return (False, instance)
|
||||
|
||||
# Create new instance, and fill in default values
|
||||
create_defaults = dict(capacity=0)
|
||||
create_defaults = {'node_state': Instance.States.INSTALLED, 'capacity': 0}
|
||||
if defaults is not None:
|
||||
create_defaults.update(defaults)
|
||||
uuid_option = {}
|
||||
if uuid is not None:
|
||||
uuid_option = dict(uuid=uuid)
|
||||
uuid_option = {'uuid': uuid}
|
||||
if node_type == 'execution' and 'version' not in create_defaults:
|
||||
create_defaults['version'] = RECEPTOR_PENDING
|
||||
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option)
|
||||
|
||||
79
awx/main/migrations/0165_node_and_link_state.py
Normal file
79
awx/main/migrations/0165_node_and_link_state.py
Normal file
@ -0,0 +1,79 @@
|
||||
# Generated by Django 3.2.13 on 2022-08-02 17:53
|
||||
|
||||
import django.core.validators
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
def forwards(apps, schema_editor):
|
||||
# All existing InstanceLink objects need to be in the state
|
||||
# 'Established', which is the default, so nothing needs to be done
|
||||
# for that.
|
||||
|
||||
Instance = apps.get_model('main', 'Instance')
|
||||
for instance in Instance.objects.all():
|
||||
instance.node_state = 'ready' if not instance.errors else 'unavailable'
|
||||
instance.save(update_fields=['node_state'])
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0164_remove_inventorysource_update_on_project_update'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='instance',
|
||||
name='listener_port',
|
||||
field=models.PositiveIntegerField(
|
||||
blank=True,
|
||||
default=27199,
|
||||
help_text='Port that Receptor will listen for incoming connections on.',
|
||||
validators=[django.core.validators.MinValueValidator(1), django.core.validators.MaxValueValidator(65535)],
|
||||
),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='instance',
|
||||
name='node_state',
|
||||
field=models.CharField(
|
||||
choices=[
|
||||
('provisioning', 'Provisioning'),
|
||||
('provision-fail', 'Provisioning Failure'),
|
||||
('installed', 'Installed'),
|
||||
('ready', 'Ready'),
|
||||
('unavailable', 'Unavailable'),
|
||||
('deprovisioning', 'De-provisioning'),
|
||||
('deprovision-fail', 'De-provisioning Failure'),
|
||||
],
|
||||
default='ready',
|
||||
help_text='Indicates the current life cycle stage of this instance.',
|
||||
max_length=16,
|
||||
),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='instancelink',
|
||||
name='link_state',
|
||||
field=models.CharField(
|
||||
choices=[('adding', 'Adding'), ('established', 'Established'), ('removing', 'Removing')],
|
||||
default='established',
|
||||
help_text='Indicates the current life cycle stage of this peer link.',
|
||||
max_length=16,
|
||||
),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='instance',
|
||||
name='node_type',
|
||||
field=models.CharField(
|
||||
choices=[
|
||||
('control', 'Control plane node'),
|
||||
('execution', 'Execution plane node'),
|
||||
('hybrid', 'Controller and execution'),
|
||||
('hop', 'Message-passing node, no execution capability'),
|
||||
],
|
||||
default='hybrid',
|
||||
help_text='Role that this node plays in the mesh.',
|
||||
max_length=16,
|
||||
),
|
||||
),
|
||||
migrations.RunPython(forwards, reverse_code=migrations.RunPython.noop),
|
||||
]
|
||||
@ -5,7 +5,7 @@ from decimal import Decimal
|
||||
import logging
|
||||
import os
|
||||
|
||||
from django.core.validators import MinValueValidator
|
||||
from django.core.validators import MinValueValidator, MaxValueValidator
|
||||
from django.db import models, connection
|
||||
from django.db.models.signals import post_save, post_delete
|
||||
from django.dispatch import receiver
|
||||
@ -58,6 +58,15 @@ class InstanceLink(BaseModel):
|
||||
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
|
||||
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
|
||||
|
||||
class States(models.TextChoices):
|
||||
ADDING = 'adding', _('Adding')
|
||||
ESTABLISHED = 'established', _('Established')
|
||||
REMOVING = 'removing', _('Removing')
|
||||
|
||||
link_state = models.CharField(
|
||||
choices=States.choices, default=States.ESTABLISHED, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
|
||||
)
|
||||
|
||||
class Meta:
|
||||
unique_together = ('source', 'target')
|
||||
|
||||
@ -126,13 +135,33 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
||||
default=0,
|
||||
editable=False,
|
||||
)
|
||||
NODE_TYPE_CHOICES = [
|
||||
("control", "Control plane node"),
|
||||
("execution", "Execution plane node"),
|
||||
("hybrid", "Controller and execution"),
|
||||
("hop", "Message-passing node, no execution capability"),
|
||||
]
|
||||
node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16)
|
||||
|
||||
class Types(models.TextChoices):
|
||||
CONTROL = 'control', _("Control plane node")
|
||||
EXECUTION = 'execution', _("Execution plane node")
|
||||
HYBRID = 'hybrid', _("Controller and execution")
|
||||
HOP = 'hop', _("Message-passing node, no execution capability")
|
||||
|
||||
node_type = models.CharField(default=Types.HYBRID, choices=Types.choices, max_length=16, help_text=_("Role that this node plays in the mesh."))
|
||||
|
||||
class States(models.TextChoices):
|
||||
PROVISIONING = 'provisioning', _('Provisioning')
|
||||
PROVISION_FAIL = 'provision-fail', _('Provisioning Failure')
|
||||
INSTALLED = 'installed', _('Installed')
|
||||
READY = 'ready', _('Ready')
|
||||
UNAVAILABLE = 'unavailable', _('Unavailable')
|
||||
DEPROVISIONING = 'deprovisioning', _('De-provisioning')
|
||||
DEPROVISION_FAIL = 'deprovision-fail', _('De-provisioning Failure')
|
||||
|
||||
node_state = models.CharField(
|
||||
choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.")
|
||||
)
|
||||
listener_port = models.PositiveIntegerField(
|
||||
blank=True,
|
||||
default=27199,
|
||||
validators=[MinValueValidator(1), MaxValueValidator(65535)],
|
||||
help_text=_("Port that Receptor will listen for incoming connections on."),
|
||||
)
|
||||
|
||||
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
|
||||
|
||||
@ -209,15 +238,18 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
||||
return self.last_seen < ref_time - timedelta(seconds=grace_period)
|
||||
|
||||
def mark_offline(self, update_last_seen=False, perform_save=True, errors=''):
|
||||
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen):
|
||||
if self.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||
return
|
||||
if self.node_state == Instance.States.UNAVAILABLE and self.errors == errors and (not update_last_seen):
|
||||
return
|
||||
self.node_state = Instance.States.UNAVAILABLE
|
||||
self.cpu_capacity = self.mem_capacity = self.capacity = 0
|
||||
self.errors = errors
|
||||
if update_last_seen:
|
||||
self.last_seen = now()
|
||||
|
||||
if perform_save:
|
||||
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors']
|
||||
update_fields = ['node_state', 'capacity', 'cpu_capacity', 'mem_capacity', 'errors']
|
||||
if update_last_seen:
|
||||
update_fields += ['last_seen']
|
||||
self.save(update_fields=update_fields)
|
||||
@ -274,6 +306,9 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
||||
if not errors:
|
||||
self.refresh_capacity_fields()
|
||||
self.errors = ''
|
||||
if self.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||
self.node_state = Instance.States.READY
|
||||
update_fields.append('node_state')
|
||||
else:
|
||||
self.mark_offline(perform_save=False, errors=errors)
|
||||
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
|
||||
@ -292,7 +327,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
||||
# playbook event data; we should consider this a zero capacity event
|
||||
redis.Redis.from_url(settings.BROKER_URL).ping()
|
||||
except redis.ConnectionError:
|
||||
errors = _('Failed to connect ot Redis')
|
||||
errors = _('Failed to connect to Redis')
|
||||
|
||||
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors)
|
||||
|
||||
|
||||
@ -38,7 +38,9 @@ class TaskManagerInstances:
|
||||
self.instances_by_hostname = dict()
|
||||
if instances is None:
|
||||
instances = (
|
||||
Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled')
|
||||
Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True)
|
||||
.exclude(node_type='hop')
|
||||
.only('node_type', 'node_state', 'capacity', 'hostname', 'enabled')
|
||||
)
|
||||
for instance in instances:
|
||||
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
|
||||
|
||||
@ -114,7 +114,7 @@ def inform_cluster_of_shutdown():
|
||||
try:
|
||||
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
||||
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
|
||||
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
|
||||
logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname))
|
||||
except Exception:
|
||||
logger.exception('Encountered problem with normal shutdown signal.')
|
||||
|
||||
@ -341,9 +341,13 @@ def _cleanup_images_and_files(**kwargs):
|
||||
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
|
||||
|
||||
# if we are the first instance alphabetically, then run cleanup on execution nodes
|
||||
checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first()
|
||||
checker_instance = (
|
||||
Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0)
|
||||
.order_by('-hostname')
|
||||
.first()
|
||||
)
|
||||
if checker_instance and this_inst.hostname == checker_instance.hostname:
|
||||
for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0):
|
||||
for inst in Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0):
|
||||
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
|
||||
if not runner_cleanup_kwargs:
|
||||
continue
|
||||
@ -399,6 +403,9 @@ def execution_node_health_check(node):
|
||||
if instance.node_type != 'execution':
|
||||
raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}')
|
||||
|
||||
if instance.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||
raise RuntimeError(f"Execution node health check ran against node {instance.hostname} in state {instance.node_state}")
|
||||
|
||||
data = worker_info(node)
|
||||
|
||||
prior_capacity = instance.capacity
|
||||
@ -432,6 +439,7 @@ def inspect_execution_nodes(instance_list):
|
||||
|
||||
nowtime = now()
|
||||
workers = mesh_status['Advertisements']
|
||||
|
||||
for ad in workers:
|
||||
hostname = ad['NodeID']
|
||||
|
||||
@ -445,9 +453,7 @@ def inspect_execution_nodes(instance_list):
|
||||
if instance.node_type in ('control', 'hybrid'):
|
||||
continue
|
||||
|
||||
was_lost = instance.is_lost(ref_time=nowtime)
|
||||
last_seen = parse_date(ad['Time'])
|
||||
|
||||
if instance.last_seen and instance.last_seen >= last_seen:
|
||||
continue
|
||||
instance.last_seen = last_seen
|
||||
@ -455,12 +461,12 @@ def inspect_execution_nodes(instance_list):
|
||||
|
||||
# Only execution nodes should be dealt with by execution_node_health_check
|
||||
if instance.node_type == 'hop':
|
||||
if was_lost and (not instance.is_lost(ref_time=nowtime)):
|
||||
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||
logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh')
|
||||
instance.save_health_data(errors='')
|
||||
continue
|
||||
|
||||
if was_lost:
|
||||
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||
# if the instance *was* lost, but has appeared again,
|
||||
# attempt to re-establish the initial capacity and version
|
||||
# check
|
||||
@ -479,7 +485,7 @@ def inspect_execution_nodes(instance_list):
|
||||
def cluster_node_heartbeat():
|
||||
logger.debug("Cluster node heartbeat task.")
|
||||
nowtime = now()
|
||||
instance_list = list(Instance.objects.all())
|
||||
instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED)))
|
||||
this_inst = None
|
||||
lost_instances = []
|
||||
|
||||
@ -530,9 +536,9 @@ def cluster_node_heartbeat():
|
||||
try:
|
||||
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||
deprovision_hostname = other_inst.hostname
|
||||
other_inst.delete()
|
||||
other_inst.delete() # FIXME: what about associated inbound links?
|
||||
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
||||
elif other_inst.capacity != 0 or (not other_inst.errors):
|
||||
elif other_inst.node_state == Instance.States.READY:
|
||||
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
||||
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user