mirror of
https://github.com/ansible/awx.git
synced 2026-03-28 22:35:08 -02:30
Merge pull request #11591 from ansible/enable-hop-nodes-endpoints
Turn off the filtering of hop nodes from the Instance endpoints
This commit is contained in:
@@ -4833,7 +4833,8 @@ class InstanceSerializer(BaseSerializer):
|
|||||||
res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk})
|
res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk})
|
||||||
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
|
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
|
||||||
if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor:
|
if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor:
|
||||||
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
|
if obj.node_type != 'hop':
|
||||||
|
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def get_consumed_capacity(self, obj):
|
def get_consumed_capacity(self, obj):
|
||||||
|
|||||||
@@ -365,14 +365,11 @@ class InstanceList(ListAPIView):
|
|||||||
serializer_class = serializers.InstanceSerializer
|
serializer_class = serializers.InstanceSerializer
|
||||||
search_fields = ('hostname',)
|
search_fields = ('hostname',)
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
return super().get_queryset().exclude(node_type='hop')
|
|
||||||
|
|
||||||
|
|
||||||
class InstanceDetail(RetrieveUpdateAPIView):
|
class InstanceDetail(RetrieveUpdateAPIView):
|
||||||
|
|
||||||
name = _("Instance Detail")
|
name = _("Instance Detail")
|
||||||
queryset = models.Instance.objects.exclude(node_type='hop')
|
model = models.Instance
|
||||||
serializer_class = serializers.InstanceSerializer
|
serializer_class = serializers.InstanceSerializer
|
||||||
|
|
||||||
def update(self, request, *args, **kwargs):
|
def update(self, request, *args, **kwargs):
|
||||||
@@ -418,10 +415,14 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta
|
|||||||
class InstanceHealthCheck(GenericAPIView):
|
class InstanceHealthCheck(GenericAPIView):
|
||||||
|
|
||||||
name = _('Instance Health Check')
|
name = _('Instance Health Check')
|
||||||
queryset = models.Instance.objects.exclude(node_type='hop')
|
model = models.Instance
|
||||||
serializer_class = serializers.InstanceHealthCheckSerializer
|
serializer_class = serializers.InstanceHealthCheckSerializer
|
||||||
permission_classes = (IsSystemAdminOrAuditor,)
|
permission_classes = (IsSystemAdminOrAuditor,)
|
||||||
|
|
||||||
|
def get_queryset(self):
|
||||||
|
# FIXME: For now, we don't have a good way of checking the health of a hop node.
|
||||||
|
return super().get_queryset().exclude(node_type='hop')
|
||||||
|
|
||||||
def get(self, request, *args, **kwargs):
|
def get(self, request, *args, **kwargs):
|
||||||
obj = self.get_object()
|
obj = self.get_object()
|
||||||
data = self.get_serializer(data=request.data).to_representation(obj)
|
data = self.get_serializer(data=request.data).to_representation(obj)
|
||||||
|
|||||||
@@ -11,13 +11,16 @@ class Ungrouped(object):
|
|||||||
policy_instance_percentage = None
|
policy_instance_percentage = None
|
||||||
policy_instance_minimum = None
|
policy_instance_minimum = None
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.qs = Instance.objects.filter(rampart_groups__isnull=True)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def instances(self):
|
def instances(self):
|
||||||
return Instance.objects.filter(rampart_groups__isnull=True).exclude(node_type='hop')
|
return self.qs
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def capacity(self):
|
def capacity(self):
|
||||||
return sum(x.capacity for x in self.instances)
|
return sum(x.capacity for x in self.instances.all())
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
@@ -29,26 +32,29 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
groups = list(InstanceGroup.objects.all())
|
groups = list(InstanceGroup.objects.all())
|
||||||
ungrouped = Ungrouped()
|
ungrouped = Ungrouped()
|
||||||
if len(ungrouped.instances):
|
if len(ungrouped.instances.all()):
|
||||||
groups.append(ungrouped)
|
groups.append(ungrouped)
|
||||||
|
|
||||||
for instance_group in groups:
|
for ig in groups:
|
||||||
fmt = '[{0.name} capacity={0.capacity}'
|
policy = ''
|
||||||
if instance_group.policy_instance_percentage:
|
if ig.policy_instance_percentage:
|
||||||
fmt += ' policy={0.policy_instance_percentage}%'
|
policy = f' policy={ig.policy_instance_percentage}%'
|
||||||
if instance_group.policy_instance_minimum:
|
if ig.policy_instance_minimum:
|
||||||
fmt += ' policy>={0.policy_instance_minimum}'
|
policy = f' policy>={ig.policy_instance_minimum}'
|
||||||
print((fmt + ']').format(instance_group))
|
print(f'[{ig.name} capacity={ig.capacity}{policy}]')
|
||||||
for x in instance_group.instances.all():
|
|
||||||
|
for x in ig.instances.all():
|
||||||
color = '\033[92m'
|
color = '\033[92m'
|
||||||
if x.capacity == 0:
|
if x.capacity == 0 and x.node_type != 'hop':
|
||||||
color = '\033[91m'
|
color = '\033[91m'
|
||||||
if x.enabled is False:
|
if not x.enabled:
|
||||||
color = '\033[90m[DISABLED] '
|
color = '\033[90m[DISABLED] '
|
||||||
if no_color:
|
if no_color:
|
||||||
color = ''
|
color = ''
|
||||||
fmt = '\t' + color + '{0.hostname} capacity={0.capacity} node_type={0.node_type} version={1}'
|
|
||||||
if x.capacity:
|
capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else ''
|
||||||
fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"'
|
version = f" version={x.version or '?'}" if x.node_type != 'hop' else ''
|
||||||
print((fmt + '\033[0m').format(x, x.version or '?'))
|
heartbeat = f' heartbeat="{x.modified:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else ''
|
||||||
print('')
|
print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}\033[0m')
|
||||||
|
|
||||||
|
print()
|
||||||
|
|||||||
@@ -53,14 +53,14 @@ class RegisterQueue:
|
|||||||
def add_instances_to_group(self, ig):
|
def add_instances_to_group(self, ig):
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
instance_list_unique = set([x.strip() for x in self.hostname_list if x])
|
instance_list_unique = {x for x in (x.strip() for x in self.hostname_list) if x}
|
||||||
instances = []
|
instances = []
|
||||||
for inst_name in instance_list_unique:
|
for inst_name in instance_list_unique:
|
||||||
instance = Instance.objects.filter(hostname=inst_name)
|
instance = Instance.objects.filter(hostname=inst_name).exclude(node_type='hop')
|
||||||
if instance.exists():
|
if instance.exists():
|
||||||
instances.append(instance[0])
|
instances.append(instance[0])
|
||||||
else:
|
else:
|
||||||
raise InstanceNotFound("Instance does not exist: {}".format(inst_name), changed)
|
raise InstanceNotFound("Instance does not exist or cannot run jobs: {}".format(inst_name), changed)
|
||||||
|
|
||||||
ig.instances.add(*instances)
|
ig.instances.add(*instances)
|
||||||
|
|
||||||
|
|||||||
@@ -195,7 +195,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
if ref_time is None:
|
if ref_time is None:
|
||||||
ref_time = now()
|
ref_time = now()
|
||||||
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
|
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
|
||||||
if self.node_type == 'execution':
|
if self.node_type in ('execution', 'hop'):
|
||||||
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
|
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
|
||||||
return self.last_seen < ref_time - timedelta(seconds=grace_period)
|
return self.last_seen < ref_time - timedelta(seconds=grace_period)
|
||||||
|
|
||||||
|
|||||||
@@ -436,14 +436,16 @@ def inspect_execution_nodes(instance_list):
|
|||||||
workers = mesh_status['Advertisements']
|
workers = mesh_status['Advertisements']
|
||||||
for ad in workers:
|
for ad in workers:
|
||||||
hostname = ad['NodeID']
|
hostname = ad['NodeID']
|
||||||
if not any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []):
|
|
||||||
continue
|
|
||||||
|
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
if hostname in node_lookup:
|
if hostname in node_lookup:
|
||||||
instance = node_lookup[hostname]
|
instance = node_lookup[hostname]
|
||||||
else:
|
else:
|
||||||
logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}")
|
logger.warn(f"Unrecognized node advertising on mesh: {hostname}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Control-plane nodes are dealt with via local_health_check instead.
|
||||||
|
if instance.node_type in ('control', 'hybrid'):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
was_lost = instance.is_lost(ref_time=nowtime)
|
was_lost = instance.is_lost(ref_time=nowtime)
|
||||||
@@ -454,6 +456,10 @@ def inspect_execution_nodes(instance_list):
|
|||||||
instance.last_seen = last_seen
|
instance.last_seen = last_seen
|
||||||
instance.save(update_fields=['last_seen'])
|
instance.save(update_fields=['last_seen'])
|
||||||
|
|
||||||
|
# Only execution nodes should be dealt with by execution_node_health_check
|
||||||
|
if instance.node_type == 'hop':
|
||||||
|
continue
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
execution_node_health_check.apply_async([hostname])
|
execution_node_health_check.apply_async([hostname])
|
||||||
elif was_lost:
|
elif was_lost:
|
||||||
@@ -482,7 +488,6 @@ def cluster_node_heartbeat():
|
|||||||
for inst in instance_list:
|
for inst in instance_list:
|
||||||
if inst.hostname == settings.CLUSTER_HOST_ID:
|
if inst.hostname == settings.CLUSTER_HOST_ID:
|
||||||
this_inst = inst
|
this_inst = inst
|
||||||
instance_list.remove(inst)
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
(changed, this_inst) = Instance.objects.get_or_register()
|
(changed, this_inst) = Instance.objects.get_or_register()
|
||||||
@@ -492,6 +497,8 @@ def cluster_node_heartbeat():
|
|||||||
inspect_execution_nodes(instance_list)
|
inspect_execution_nodes(instance_list)
|
||||||
|
|
||||||
for inst in list(instance_list):
|
for inst in list(instance_list):
|
||||||
|
if inst == this_inst:
|
||||||
|
continue
|
||||||
if inst.is_lost(ref_time=nowtime):
|
if inst.is_lost(ref_time=nowtime):
|
||||||
lost_instances.append(inst)
|
lost_instances.append(inst)
|
||||||
instance_list.remove(inst)
|
instance_list.remove(inst)
|
||||||
@@ -506,7 +513,9 @@ def cluster_node_heartbeat():
|
|||||||
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
|
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
|
||||||
# IFF any node has a greater version than we do, then we'll shutdown services
|
# IFF any node has a greater version than we do, then we'll shutdown services
|
||||||
for other_inst in instance_list:
|
for other_inst in instance_list:
|
||||||
if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution':
|
if other_inst.node_type in ('execution', 'hop'):
|
||||||
|
continue
|
||||||
|
if other_inst.version == "" or other_inst.version.startswith('ansible-runner'):
|
||||||
continue
|
continue
|
||||||
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
||||||
logger.error(
|
logger.error(
|
||||||
@@ -530,7 +539,7 @@ def cluster_node_heartbeat():
|
|||||||
# * It was set to 0 by another tower node running this method
|
# * It was set to 0 by another tower node running this method
|
||||||
# * It was set to 0 by this node, but auto deprovisioning is off
|
# * It was set to 0 by this node, but auto deprovisioning is off
|
||||||
#
|
#
|
||||||
# If auto deprovisining is on, don't bother setting the capacity to 0
|
# If auto deprovisioning is on, don't bother setting the capacity to 0
|
||||||
# since we will delete the node anyway.
|
# since we will delete the node anyway.
|
||||||
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
||||||
|
|||||||
Reference in New Issue
Block a user