diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e38efff349..31d1103786 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4833,7 +4833,8 @@ class InstanceSerializer(BaseSerializer): res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk}) res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk}) if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor: - res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) + if obj.node_type != 'hop': + res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) return res def get_consumed_capacity(self, obj): diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 30eee980e2..7993a25de6 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -365,14 +365,11 @@ class InstanceList(ListAPIView): serializer_class = serializers.InstanceSerializer search_fields = ('hostname',) - def get_queryset(self): - return super().get_queryset().exclude(node_type='hop') - class InstanceDetail(RetrieveUpdateAPIView): name = _("Instance Detail") - queryset = models.Instance.objects.exclude(node_type='hop') + model = models.Instance serializer_class = serializers.InstanceSerializer def update(self, request, *args, **kwargs): @@ -418,10 +415,14 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta class InstanceHealthCheck(GenericAPIView): name = _('Instance Health Check') - queryset = models.Instance.objects.exclude(node_type='hop') + model = models.Instance serializer_class = serializers.InstanceHealthCheckSerializer permission_classes = (IsSystemAdminOrAuditor,) + def get_queryset(self): + # FIXME: For now, we don't have a good way of checking the health of a hop node. + return super().get_queryset().exclude(node_type='hop') + def get(self, request, *args, **kwargs): obj = self.get_object() data = self.get_serializer(data=request.data).to_representation(obj) diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index 15a8df4cb0..8a22c55b5a 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -11,13 +11,16 @@ class Ungrouped(object): policy_instance_percentage = None policy_instance_minimum = None + def __init__(self): + self.qs = Instance.objects.filter(rampart_groups__isnull=True) + @property def instances(self): - return Instance.objects.filter(rampart_groups__isnull=True).exclude(node_type='hop') + return self.qs @property def capacity(self): - return sum(x.capacity for x in self.instances) + return sum(x.capacity for x in self.instances.all()) class Command(BaseCommand): @@ -29,26 +32,29 @@ class Command(BaseCommand): groups = list(InstanceGroup.objects.all()) ungrouped = Ungrouped() - if len(ungrouped.instances): + if len(ungrouped.instances.all()): groups.append(ungrouped) - for instance_group in groups: - fmt = '[{0.name} capacity={0.capacity}' - if instance_group.policy_instance_percentage: - fmt += ' policy={0.policy_instance_percentage}%' - if instance_group.policy_instance_minimum: - fmt += ' policy>={0.policy_instance_minimum}' - print((fmt + ']').format(instance_group)) - for x in instance_group.instances.all(): + for ig in groups: + policy = '' + if ig.policy_instance_percentage: + policy = f' policy={ig.policy_instance_percentage}%' + if ig.policy_instance_minimum: + policy = f' policy>={ig.policy_instance_minimum}' + print(f'[{ig.name} capacity={ig.capacity}{policy}]') + + for x in ig.instances.all(): color = '\033[92m' - if x.capacity == 0: + if x.capacity == 0 and x.node_type != 'hop': color = '\033[91m' - if x.enabled is False: + if not x.enabled: color = '\033[90m[DISABLED] ' if no_color: color = '' - fmt = '\t' + color + '{0.hostname} capacity={0.capacity} node_type={0.node_type} version={1}' - if x.capacity: - fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"' - print((fmt + '\033[0m').format(x, x.version or '?')) - print('') + + capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else '' + version = f" version={x.version or '?'}" if x.node_type != 'hop' else '' + heartbeat = f' heartbeat="{x.modified:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else '' + print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}\033[0m') + + print() diff --git a/awx/main/management/commands/register_queue.py b/awx/main/management/commands/register_queue.py index 8f4c564279..3268dc0ebc 100644 --- a/awx/main/management/commands/register_queue.py +++ b/awx/main/management/commands/register_queue.py @@ -53,14 +53,14 @@ class RegisterQueue: def add_instances_to_group(self, ig): changed = False - instance_list_unique = set([x.strip() for x in self.hostname_list if x]) + instance_list_unique = {x for x in (x.strip() for x in self.hostname_list) if x} instances = [] for inst_name in instance_list_unique: - instance = Instance.objects.filter(hostname=inst_name) + instance = Instance.objects.filter(hostname=inst_name).exclude(node_type='hop') if instance.exists(): instances.append(instance[0]) else: - raise InstanceNotFound("Instance does not exist: {}".format(inst_name), changed) + raise InstanceNotFound("Instance does not exist or cannot run jobs: {}".format(inst_name), changed) ig.instances.add(*instances) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 653b4f1814..03b6dbfe79 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -195,7 +195,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): if ref_time is None: ref_time = now() grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 - if self.node_type == 'execution': + if self.node_type in ('execution', 'hop'): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index e596668f89..43ac6c2b26 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -436,14 +436,16 @@ def inspect_execution_nodes(instance_list): workers = mesh_status['Advertisements'] for ad in workers: hostname = ad['NodeID'] - if not any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []): - continue - changed = False + if hostname in node_lookup: instance = node_lookup[hostname] else: - logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}") + logger.warn(f"Unrecognized node advertising on mesh: {hostname}") + continue + + # Control-plane nodes are dealt with via local_health_check instead. + if instance.node_type in ('control', 'hybrid'): continue was_lost = instance.is_lost(ref_time=nowtime) @@ -454,6 +456,10 @@ def inspect_execution_nodes(instance_list): instance.last_seen = last_seen instance.save(update_fields=['last_seen']) + # Only execution nodes should be dealt with by execution_node_health_check + if instance.node_type == 'hop': + continue + if changed: execution_node_health_check.apply_async([hostname]) elif was_lost: @@ -482,7 +488,6 @@ def cluster_node_heartbeat(): for inst in instance_list: if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst - instance_list.remove(inst) break else: (changed, this_inst) = Instance.objects.get_or_register() @@ -492,6 +497,8 @@ def cluster_node_heartbeat(): inspect_execution_nodes(instance_list) for inst in list(instance_list): + if inst == this_inst: + continue if inst.is_lost(ref_time=nowtime): lost_instances.append(inst) instance_list.remove(inst) @@ -506,7 +513,9 @@ def cluster_node_heartbeat(): raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) # IFF any node has a greater version than we do, then we'll shutdown services for other_inst in instance_list: - if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution': + if other_inst.node_type in ('execution', 'hop'): + continue + if other_inst.version == "" or other_inst.version.startswith('ansible-runner'): continue if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: logger.error( @@ -530,7 +539,7 @@ def cluster_node_heartbeat(): # * It was set to 0 by another tower node running this method # * It was set to 0 by this node, but auto deprovisioning is off # - # If auto deprovisining is on, don't bother setting the capacity to 0 + # If auto deprovisioning is on, don't bother setting the capacity to 0 # since we will delete the node anyway. if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES: other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))