Compare commits

...

7 Commits

Author SHA1 Message Date
Jeff Bradberry
2ae9156a4a Merge pull request #12587 from ansible/mesh-scaling-backend
Allow for adding external execution nodes via API
2022-08-03 11:09:37 -04:00
Jeff Bradberry
4890c15eeb Update task management to only do things with ready instances 2022-08-02 15:58:43 -04:00
Jeff Bradberry
bc6b8fc4ae Check state when processing receptorctl advertisements
Nodes that show up and were in one of the unready states need to be
transitioned to ready, even if the logic in Instance.is_lost was not
met.
2022-08-02 15:58:20 -04:00
Jeff Bradberry
03c70077f9 Make sure that the health checks handle the state transitions properly
- nodes with states Provisioning, Provisioning Fail, Deprovisioning,
  and Deprovisioning Fail should bypass health checks and should never
  transition due to the existing machinery
- nodes with states Unavailable and Installed can transition to Ready
  if they check out as healthy
- nodes in the Ready state should transition to Unavailable if they
  fail a check
2022-08-02 13:55:35 -04:00
Jeff Bradberry
dab8c3ef55 Update node and link registration to put them in the right state
'Installed' for the nodes, 'Established' for the links.
2022-08-02 13:55:35 -04:00
Jeff Bradberry
d2a6be7ca9 Add the state fields and the peer relationships to the serializers 2022-08-02 13:55:35 -04:00
Jeff Bradberry
170795ab76 Add state fields to Instance and InstanceLink
Also, listener_port to Instance.
2022-08-02 13:55:03 -04:00
8 changed files with 174 additions and 38 deletions

View File

@@ -4753,7 +4753,7 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria
class InstanceLinkSerializer(BaseSerializer):
class Meta:
model = InstanceLink
fields = ('source', 'target')
fields = ('source', 'target', 'link_state')
source = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
target = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
@@ -4762,31 +4762,25 @@ class InstanceLinkSerializer(BaseSerializer):
class InstanceNodeSerializer(BaseSerializer):
class Meta:
model = Instance
fields = ('id', 'hostname', 'node_type', 'node_state')
node_state = serializers.SerializerMethodField()
def get_node_state(self, obj):
if not obj.enabled:
return "disabled"
return "error" if obj.errors else "healthy"
fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled')
class InstanceSerializer(BaseSerializer):
consumed_capacity = serializers.SerializerMethodField()
percent_capacity_remaining = serializers.SerializerMethodField()
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance'), read_only=True)
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
class Meta:
model = Instance
read_only_fields = ('uuid', 'hostname', 'version', 'node_type')
read_only_fields = ('uuid', 'hostname', 'version', 'node_type', 'node_state')
fields = (
"id",
"type",
"url",
"related",
"summary_fields",
"uuid",
"hostname",
"created",
@@ -4808,6 +4802,7 @@ class InstanceSerializer(BaseSerializer):
"enabled",
"managed_by_policy",
"node_type",
"node_state",
)
def get_related(self, obj):
@@ -4819,6 +4814,14 @@ class InstanceSerializer(BaseSerializer):
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
return res
def get_summary_fields(self, obj):
summary = super().get_summary_fields(obj)
if self.is_detail_view:
summary['links'] = InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source').filter(source=obj), many=True).data
return summary
def get_consumed_capacity(self, obj):
return obj.consumed_capacity

View File

@@ -440,6 +440,7 @@ class InstanceHealthCheck(GenericAPIView):
def post(self, request, *args, **kwargs):
obj = self.get_object()
# Note: hop nodes are already excluded by the get_queryset method
if obj.node_type == 'execution':
from awx.main.tasks.system import execution_node_health_check

View File

@@ -27,7 +27,9 @@ class Command(BaseCommand):
)
def handle(self, **options):
# provides a mapping of hostname to Instance objects
nodes = Instance.objects.in_bulk(field_name='hostname')
if options['source'] not in nodes:
raise CommandError(f"Host {options['source']} is not a registered instance.")
if not (options['peers'] or options['disconnect'] or options['exact'] is not None):
@@ -57,7 +59,9 @@ class Command(BaseCommand):
results = 0
for target in options['peers']:
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
results += 1
@@ -80,7 +84,9 @@ class Command(BaseCommand):
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
for target in peers - links:
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
_, created = InstanceLink.objects.update_or_create(
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
)
if created:
additions += 1

View File

@@ -129,10 +129,13 @@ class InstanceManager(models.Manager):
# if instance was not retrieved by uuid and hostname was, use the hostname
instance = self.filter(hostname=hostname)
from awx.main.models import Instance
# Return existing instance
if instance.exists():
instance = instance.first() # in the unusual occasion that there is more than one, only get one
update_fields = []
instance.node_state = Instance.States.INSTALLED # Wait for it to show up on the mesh
update_fields = ['node_state']
# if instance was retrieved by uuid and hostname has changed, update hostname
if instance.hostname != hostname:
logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname))
@@ -141,6 +144,7 @@ class InstanceManager(models.Manager):
# if any other fields are to be updated
if instance.ip_address != ip_address:
instance.ip_address = ip_address
update_fields.append('ip_address')
if instance.node_type != node_type:
instance.node_type = node_type
update_fields.append('node_type')
@@ -151,12 +155,12 @@ class InstanceManager(models.Manager):
return (False, instance)
# Create new instance, and fill in default values
create_defaults = dict(capacity=0)
create_defaults = {'node_state': Instance.States.INSTALLED, 'capacity': 0}
if defaults is not None:
create_defaults.update(defaults)
uuid_option = {}
if uuid is not None:
uuid_option = dict(uuid=uuid)
uuid_option = {'uuid': uuid}
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option)

View File

@@ -0,0 +1,79 @@
# Generated by Django 3.2.13 on 2022-08-02 17:53
import django.core.validators
from django.db import migrations, models
def forwards(apps, schema_editor):
# All existing InstanceLink objects need to be in the state
# 'Established', which is the default, so nothing needs to be done
# for that.
Instance = apps.get_model('main', 'Instance')
for instance in Instance.objects.all():
instance.node_state = 'ready' if not instance.errors else 'unavailable'
instance.save(update_fields=['node_state'])
class Migration(migrations.Migration):
dependencies = [
('main', '0164_remove_inventorysource_update_on_project_update'),
]
operations = [
migrations.AddField(
model_name='instance',
name='listener_port',
field=models.PositiveIntegerField(
blank=True,
default=27199,
help_text='Port that Receptor will listen for incoming connections on.',
validators=[django.core.validators.MinValueValidator(1), django.core.validators.MaxValueValidator(65535)],
),
),
migrations.AddField(
model_name='instance',
name='node_state',
field=models.CharField(
choices=[
('provisioning', 'Provisioning'),
('provision-fail', 'Provisioning Failure'),
('installed', 'Installed'),
('ready', 'Ready'),
('unavailable', 'Unavailable'),
('deprovisioning', 'De-provisioning'),
('deprovision-fail', 'De-provisioning Failure'),
],
default='ready',
help_text='Indicates the current life cycle stage of this instance.',
max_length=16,
),
),
migrations.AddField(
model_name='instancelink',
name='link_state',
field=models.CharField(
choices=[('adding', 'Adding'), ('established', 'Established'), ('removing', 'Removing')],
default='established',
help_text='Indicates the current life cycle stage of this peer link.',
max_length=16,
),
),
migrations.AlterField(
model_name='instance',
name='node_type',
field=models.CharField(
choices=[
('control', 'Control plane node'),
('execution', 'Execution plane node'),
('hybrid', 'Controller and execution'),
('hop', 'Message-passing node, no execution capability'),
],
default='hybrid',
help_text='Role that this node plays in the mesh.',
max_length=16,
),
),
migrations.RunPython(forwards, reverse_code=migrations.RunPython.noop),
]

View File

@@ -5,7 +5,7 @@ from decimal import Decimal
import logging
import os
from django.core.validators import MinValueValidator
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models, connection
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@@ -58,6 +58,15 @@ class InstanceLink(BaseModel):
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
class States(models.TextChoices):
ADDING = 'adding', _('Adding')
ESTABLISHED = 'established', _('Established')
REMOVING = 'removing', _('Removing')
link_state = models.CharField(
choices=States.choices, default=States.ESTABLISHED, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
)
class Meta:
unique_together = ('source', 'target')
@@ -126,13 +135,33 @@ class Instance(HasPolicyEditsMixin, BaseModel):
default=0,
editable=False,
)
NODE_TYPE_CHOICES = [
("control", "Control plane node"),
("execution", "Execution plane node"),
("hybrid", "Controller and execution"),
("hop", "Message-passing node, no execution capability"),
]
node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16)
class Types(models.TextChoices):
CONTROL = 'control', _("Control plane node")
EXECUTION = 'execution', _("Execution plane node")
HYBRID = 'hybrid', _("Controller and execution")
HOP = 'hop', _("Message-passing node, no execution capability")
node_type = models.CharField(default=Types.HYBRID, choices=Types.choices, max_length=16, help_text=_("Role that this node plays in the mesh."))
class States(models.TextChoices):
PROVISIONING = 'provisioning', _('Provisioning')
PROVISION_FAIL = 'provision-fail', _('Provisioning Failure')
INSTALLED = 'installed', _('Installed')
READY = 'ready', _('Ready')
UNAVAILABLE = 'unavailable', _('Unavailable')
DEPROVISIONING = 'deprovisioning', _('De-provisioning')
DEPROVISION_FAIL = 'deprovision-fail', _('De-provisioning Failure')
node_state = models.CharField(
choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.")
)
listener_port = models.PositiveIntegerField(
blank=True,
default=27199,
validators=[MinValueValidator(1), MaxValueValidator(65535)],
help_text=_("Port that Receptor will listen for incoming connections on."),
)
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
@@ -209,15 +238,18 @@ class Instance(HasPolicyEditsMixin, BaseModel):
return self.last_seen < ref_time - timedelta(seconds=grace_period)
def mark_offline(self, update_last_seen=False, perform_save=True, errors=''):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen):
if self.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
return
if self.node_state == Instance.States.UNAVAILABLE and self.errors == errors and (not update_last_seen):
return
self.node_state = Instance.States.UNAVAILABLE
self.cpu_capacity = self.mem_capacity = self.capacity = 0
self.errors = errors
if update_last_seen:
self.last_seen = now()
if perform_save:
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors']
update_fields = ['node_state', 'capacity', 'cpu_capacity', 'mem_capacity', 'errors']
if update_last_seen:
update_fields += ['last_seen']
self.save(update_fields=update_fields)
@@ -274,6 +306,9 @@ class Instance(HasPolicyEditsMixin, BaseModel):
if not errors:
self.refresh_capacity_fields()
self.errors = ''
if self.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
self.node_state = Instance.States.READY
update_fields.append('node_state')
else:
self.mark_offline(perform_save=False, errors=errors)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
@@ -292,7 +327,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
errors = _('Failed to connect ot Redis')
errors = _('Failed to connect to Redis')
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors)

View File

@@ -38,7 +38,9 @@ class TaskManagerInstances:
self.instances_by_hostname = dict()
if instances is None:
instances = (
Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled')
Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True)
.exclude(node_type='hop')
.only('node_type', 'node_state', 'capacity', 'hostname', 'enabled')
)
for instance in instances:
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)

View File

@@ -114,7 +114,7 @@ def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname))
except Exception:
logger.exception('Encountered problem with normal shutdown signal.')
@@ -341,9 +341,13 @@ def _cleanup_images_and_files(**kwargs):
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
# if we are the first instance alphabetically, then run cleanup on execution nodes
checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first()
checker_instance = (
Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0)
.order_by('-hostname')
.first()
)
if checker_instance and this_inst.hostname == checker_instance.hostname:
for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0):
for inst in Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0):
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
if not runner_cleanup_kwargs:
continue
@@ -399,6 +403,9 @@ def execution_node_health_check(node):
if instance.node_type != 'execution':
raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}')
if instance.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
raise RuntimeError(f"Execution node health check ran against node {instance.hostname} in state {instance.node_state}")
data = worker_info(node)
prior_capacity = instance.capacity
@@ -432,6 +439,7 @@ def inspect_execution_nodes(instance_list):
nowtime = now()
workers = mesh_status['Advertisements']
for ad in workers:
hostname = ad['NodeID']
@@ -445,9 +453,7 @@ def inspect_execution_nodes(instance_list):
if instance.node_type in ('control', 'hybrid'):
continue
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
@@ -455,12 +461,12 @@ def inspect_execution_nodes(instance_list):
# Only execution nodes should be dealt with by execution_node_health_check
if instance.node_type == 'hop':
if was_lost and (not instance.is_lost(ref_time=nowtime)):
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh')
instance.save_health_data(errors='')
continue
if was_lost:
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
@@ -479,7 +485,7 @@ def inspect_execution_nodes(instance_list):
def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all())
instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED)))
this_inst = None
lost_instances = []
@@ -530,9 +536,9 @@ def cluster_node_heartbeat():
try:
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
other_inst.delete()
other_inst.delete() # FIXME: what about associated inbound links?
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
elif other_inst.capacity != 0 or (not other_inst.errors):
elif other_inst.node_state == Instance.States.READY:
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))