From 170795ab76a50478e5aac89d478bcfcf49e20f5e Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 18 Jul 2022 11:10:59 -0400 Subject: [PATCH 1/6] Add state fields to Instance and InstanceLink Also, listener_port to Instance. --- .../migrations/0165_node_and_link_state.py | 79 +++++++++++++++++++ awx/main/models/ha.py | 45 +++++++++-- 2 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 awx/main/migrations/0165_node_and_link_state.py diff --git a/awx/main/migrations/0165_node_and_link_state.py b/awx/main/migrations/0165_node_and_link_state.py new file mode 100644 index 0000000000..1e088db4a1 --- /dev/null +++ b/awx/main/migrations/0165_node_and_link_state.py @@ -0,0 +1,79 @@ +# Generated by Django 3.2.13 on 2022-08-02 17:53 + +import django.core.validators +from django.db import migrations, models + + +def forwards(apps, schema_editor): + # All existing InstanceLink objects need to be in the state + # 'Established', which is the default, so nothing needs to be done + # for that. + + Instance = apps.get_model('main', 'Instance') + for instance in Instance.objects.all(): + instance.node_state = 'ready' if not instance.errors else 'unavailable' + instance.save(update_fields=['node_state']) + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0164_remove_inventorysource_update_on_project_update'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='listener_port', + field=models.PositiveIntegerField( + blank=True, + default=27199, + help_text='Port that Receptor will listen for incoming connections on.', + validators=[django.core.validators.MinValueValidator(1), django.core.validators.MaxValueValidator(65535)], + ), + ), + migrations.AddField( + model_name='instance', + name='node_state', + field=models.CharField( + choices=[ + ('provisioning', 'Provisioning'), + ('provision-fail', 'Provisioning Failure'), + ('installed', 'Installed'), + ('ready', 'Ready'), + ('unavailable', 'Unavailable'), + ('deprovisioning', 'De-provisioning'), + ('deprovision-fail', 'De-provisioning Failure'), + ], + default='ready', + help_text='Indicates the current life cycle stage of this instance.', + max_length=16, + ), + ), + migrations.AddField( + model_name='instancelink', + name='link_state', + field=models.CharField( + choices=[('adding', 'Adding'), ('established', 'Established'), ('removing', 'Removing')], + default='established', + help_text='Indicates the current life cycle stage of this peer link.', + max_length=16, + ), + ), + migrations.AlterField( + model_name='instance', + name='node_type', + field=models.CharField( + choices=[ + ('control', 'Control plane node'), + ('execution', 'Execution plane node'), + ('hybrid', 'Controller and execution'), + ('hop', 'Message-passing node, no execution capability'), + ], + default='hybrid', + help_text='Role that this node plays in the mesh.', + max_length=16, + ), + ), + migrations.RunPython(forwards, reverse_code=migrations.RunPython.noop), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 782ca59344..09b0e4aabb 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -5,7 +5,7 @@ from decimal import Decimal import logging import os -from django.core.validators import MinValueValidator +from django.core.validators import MinValueValidator, MaxValueValidator from django.db import models, connection from django.db.models.signals import post_save, post_delete from django.dispatch import receiver @@ -58,6 +58,15 @@ class InstanceLink(BaseModel): source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+') target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers') + class States(models.TextChoices): + ADDING = 'adding', _('Adding') + ESTABLISHED = 'established', _('Established') + REMOVING = 'removing', _('Removing') + + link_state = models.CharField( + choices=States.choices, default=States.ESTABLISHED, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.") + ) + class Meta: unique_together = ('source', 'target') @@ -126,13 +135,33 @@ class Instance(HasPolicyEditsMixin, BaseModel): default=0, editable=False, ) - NODE_TYPE_CHOICES = [ - ("control", "Control plane node"), - ("execution", "Execution plane node"), - ("hybrid", "Controller and execution"), - ("hop", "Message-passing node, no execution capability"), - ] - node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16) + + class Types(models.TextChoices): + CONTROL = 'control', _("Control plane node") + EXECUTION = 'execution', _("Execution plane node") + HYBRID = 'hybrid', _("Controller and execution") + HOP = 'hop', _("Message-passing node, no execution capability") + + node_type = models.CharField(default=Types.HYBRID, choices=Types.choices, max_length=16, help_text=_("Role that this node plays in the mesh.")) + + class States(models.TextChoices): + PROVISIONING = 'provisioning', _('Provisioning') + PROVISION_FAIL = 'provision-fail', _('Provisioning Failure') + INSTALLED = 'installed', _('Installed') + READY = 'ready', _('Ready') + UNAVAILABLE = 'unavailable', _('Unavailable') + DEPROVISIONING = 'deprovisioning', _('De-provisioning') + DEPROVISION_FAIL = 'deprovision-fail', _('De-provisioning Failure') + + node_state = models.CharField( + choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.") + ) + listener_port = models.PositiveIntegerField( + blank=True, + default=27199, + validators=[MinValueValidator(1), MaxValueValidator(65535)], + help_text=_("Port that Receptor will listen for incoming connections on."), + ) peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target')) From d2a6be7ca98cf9588b3fb107c4c0044d08f72ae3 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 18 Jul 2022 13:38:05 -0400 Subject: [PATCH 2/6] Add the state fields and the peer relationships to the serializers --- awx/api/serializers.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 018301f63f..f8cec32e6f 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4753,7 +4753,7 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria class InstanceLinkSerializer(BaseSerializer): class Meta: model = InstanceLink - fields = ('source', 'target') + fields = ('source', 'target', 'link_state') source = serializers.SlugRelatedField(slug_field="hostname", read_only=True) target = serializers.SlugRelatedField(slug_field="hostname", read_only=True) @@ -4762,31 +4762,25 @@ class InstanceLinkSerializer(BaseSerializer): class InstanceNodeSerializer(BaseSerializer): class Meta: model = Instance - fields = ('id', 'hostname', 'node_type', 'node_state') - - node_state = serializers.SerializerMethodField() - - def get_node_state(self, obj): - if not obj.enabled: - return "disabled" - return "error" if obj.errors else "healthy" + fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled') class InstanceSerializer(BaseSerializer): consumed_capacity = serializers.SerializerMethodField() percent_capacity_remaining = serializers.SerializerMethodField() - jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance'), read_only=True) + jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True) jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True) class Meta: model = Instance - read_only_fields = ('uuid', 'hostname', 'version', 'node_type') + read_only_fields = ('uuid', 'hostname', 'version', 'node_type', 'node_state') fields = ( "id", "type", "url", "related", + "summary_fields", "uuid", "hostname", "created", @@ -4808,6 +4802,7 @@ class InstanceSerializer(BaseSerializer): "enabled", "managed_by_policy", "node_type", + "node_state", ) def get_related(self, obj): @@ -4819,6 +4814,14 @@ class InstanceSerializer(BaseSerializer): res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) return res + def get_summary_fields(self, obj): + summary = super().get_summary_fields(obj) + + if self.is_detail_view: + summary['links'] = InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source').filter(source=obj), many=True).data + + return summary + def get_consumed_capacity(self, obj): return obj.consumed_capacity From dab8c3ef55314429087bc5f94557d36e860485df Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 25 Jul 2022 15:50:11 -0400 Subject: [PATCH 3/6] Update node and link registration to put them in the right state 'Installed' for the nodes, 'Established' for the links. --- awx/main/management/commands/register_peers.py | 10 ++++++++-- awx/main/managers.py | 10 +++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 6d26ebfbb2..078edb08c7 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -27,7 +27,9 @@ class Command(BaseCommand): ) def handle(self, **options): + # provides a mapping of hostname to Instance objects nodes = Instance.objects.in_bulk(field_name='hostname') + if options['source'] not in nodes: raise CommandError(f"Host {options['source']} is not a registered instance.") if not (options['peers'] or options['disconnect'] or options['exact'] is not None): @@ -57,7 +59,9 @@ class Command(BaseCommand): results = 0 for target in options['peers']: - _, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + _, created = InstanceLink.objects.update_or_create( + source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + ) if created: results += 1 @@ -80,7 +84,9 @@ class Command(BaseCommand): links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True)) removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete() for target in peers - links: - _, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + _, created = InstanceLink.objects.update_or_create( + source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED} + ) if created: additions += 1 diff --git a/awx/main/managers.py b/awx/main/managers.py index 23acd15139..88e8384c43 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -129,10 +129,13 @@ class InstanceManager(models.Manager): # if instance was not retrieved by uuid and hostname was, use the hostname instance = self.filter(hostname=hostname) + from awx.main.models import Instance + # Return existing instance if instance.exists(): instance = instance.first() # in the unusual occasion that there is more than one, only get one - update_fields = [] + instance.node_state = Instance.States.INSTALLED # Wait for it to show up on the mesh + update_fields = ['node_state'] # if instance was retrieved by uuid and hostname has changed, update hostname if instance.hostname != hostname: logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname)) @@ -141,6 +144,7 @@ class InstanceManager(models.Manager): # if any other fields are to be updated if instance.ip_address != ip_address: instance.ip_address = ip_address + update_fields.append('ip_address') if instance.node_type != node_type: instance.node_type = node_type update_fields.append('node_type') @@ -151,12 +155,12 @@ class InstanceManager(models.Manager): return (False, instance) # Create new instance, and fill in default values - create_defaults = dict(capacity=0) + create_defaults = {'node_state': Instance.States.INSTALLED, 'capacity': 0} if defaults is not None: create_defaults.update(defaults) uuid_option = {} if uuid is not None: - uuid_option = dict(uuid=uuid) + uuid_option = {'uuid': uuid} if node_type == 'execution' and 'version' not in create_defaults: create_defaults['version'] = RECEPTOR_PENDING instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option) From 03c70077f958cc17275060fabdc58beb3a4d0857 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Wed, 27 Jul 2022 17:17:13 -0400 Subject: [PATCH 4/6] Make sure that the health checks handle the state transitions properly - nodes with states Provisioning, Provisioning Fail, Deprovisioning, and Deprovisioning Fail should bypass health checks and should never transition due to the existing machinery - nodes with states Unavailable and Installed can transition to Ready if they check out as healthy - nodes in the Ready state should transition to Unavailable if they fail a check --- awx/api/views/__init__.py | 1 + awx/main/models/ha.py | 12 +++++++++--- awx/main/tasks/system.py | 13 ++++++++----- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0d46c05834..307327522a 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -440,6 +440,7 @@ class InstanceHealthCheck(GenericAPIView): def post(self, request, *args, **kwargs): obj = self.get_object() + # Note: hop nodes are already excluded by the get_queryset method if obj.node_type == 'execution': from awx.main.tasks.system import execution_node_health_check diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 09b0e4aabb..6bc711503f 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -238,15 +238,18 @@ class Instance(HasPolicyEditsMixin, BaseModel): return self.last_seen < ref_time - timedelta(seconds=grace_period) def mark_offline(self, update_last_seen=False, perform_save=True, errors=''): - if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen): + if self.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED): return + if self.node_state == Instance.States.UNAVAILABLE and self.errors == errors and (not update_last_seen): + return + self.node_state = Instance.States.UNAVAILABLE self.cpu_capacity = self.mem_capacity = self.capacity = 0 self.errors = errors if update_last_seen: self.last_seen = now() if perform_save: - update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors'] + update_fields = ['node_state', 'capacity', 'cpu_capacity', 'mem_capacity', 'errors'] if update_last_seen: update_fields += ['last_seen'] self.save(update_fields=update_fields) @@ -303,6 +306,9 @@ class Instance(HasPolicyEditsMixin, BaseModel): if not errors: self.refresh_capacity_fields() self.errors = '' + if self.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED): + self.node_state = Instance.States.READY + update_fields.append('node_state') else: self.mark_offline(perform_save=False, errors=errors) update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity']) @@ -321,7 +327,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): # playbook event data; we should consider this a zero capacity event redis.Redis.from_url(settings.BROKER_URL).ping() except redis.ConnectionError: - errors = _('Failed to connect ot Redis') + errors = _('Failed to connect to Redis') self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index b828326339..90ee318106 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -114,7 +114,7 @@ def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) - logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname)) + logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname)) except Exception: logger.exception('Encountered problem with normal shutdown signal.') @@ -399,6 +399,9 @@ def execution_node_health_check(node): if instance.node_type != 'execution': raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}') + if instance.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED): + raise RuntimeError(f"Execution node health check ran against node {instance.hostname} in state {instance.node_state}") + data = worker_info(node) prior_capacity = instance.capacity @@ -455,7 +458,7 @@ def inspect_execution_nodes(instance_list): # Only execution nodes should be dealt with by execution_node_health_check if instance.node_type == 'hop': - if was_lost and (not instance.is_lost(ref_time=nowtime)): + if was_lost: logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh') instance.save_health_data(errors='') continue @@ -479,7 +482,7 @@ def inspect_execution_nodes(instance_list): def cluster_node_heartbeat(): logger.debug("Cluster node heartbeat task.") nowtime = now() - instance_list = list(Instance.objects.all()) + instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED))) this_inst = None lost_instances = [] @@ -530,9 +533,9 @@ def cluster_node_heartbeat(): try: if settings.AWX_AUTO_DEPROVISION_INSTANCES: deprovision_hostname = other_inst.hostname - other_inst.delete() + other_inst.delete() # FIXME: what about associated inbound links? logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname)) - elif other_inst.capacity != 0 or (not other_inst.errors): + elif other_inst.node_state == Instance.States.READY: other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive')) logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen)) From bc6b8fc4ae13cefcc583b0be57bae5bff996bd62 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 29 Jul 2022 10:54:30 -0400 Subject: [PATCH 5/6] Check state when processing receptorctl advertisements Nodes that show up and were in one of the unready states need to be transitioned to ready, even if the logic in Instance.is_lost was not met. --- awx/main/tasks/system.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 90ee318106..0d1d049784 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -435,6 +435,7 @@ def inspect_execution_nodes(instance_list): nowtime = now() workers = mesh_status['Advertisements'] + for ad in workers: hostname = ad['NodeID'] @@ -448,9 +449,7 @@ def inspect_execution_nodes(instance_list): if instance.node_type in ('control', 'hybrid'): continue - was_lost = instance.is_lost(ref_time=nowtime) last_seen = parse_date(ad['Time']) - if instance.last_seen and instance.last_seen >= last_seen: continue instance.last_seen = last_seen @@ -458,12 +457,12 @@ def inspect_execution_nodes(instance_list): # Only execution nodes should be dealt with by execution_node_health_check if instance.node_type == 'hop': - if was_lost: + if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED): logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh') instance.save_health_data(errors='') continue - if was_lost: + if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED): # if the instance *was* lost, but has appeared again, # attempt to re-establish the initial capacity and version # check From 4890c15eeb8c68a302efa8c67c8383c627295312 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 29 Jul 2022 16:11:34 -0400 Subject: [PATCH 6/6] Update task management to only do things with ready instances --- awx/main/scheduler/task_manager_models.py | 4 +++- awx/main/tasks/system.py | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index 678e545152..c0580ca646 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -38,7 +38,9 @@ class TaskManagerInstances: self.instances_by_hostname = dict() if instances is None: instances = ( - Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled') + Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True) + .exclude(node_type='hop') + .only('node_type', 'node_state', 'capacity', 'hostname', 'enabled') ) for instance in instances: self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 0d1d049784..2f35109213 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -341,9 +341,13 @@ def _cleanup_images_and_files(**kwargs): logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}') # if we are the first instance alphabetically, then run cleanup on execution nodes - checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first() + checker_instance = ( + Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0) + .order_by('-hostname') + .first() + ) if checker_instance and this_inst.hostname == checker_instance.hostname: - for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0): + for inst in Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0): runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs) if not runner_cleanup_kwargs: continue