Control the visibility and use of hop node Instances

- the list, detail, and health check API views should not include them
- the Instance-InstanceGroup association views should not allow them
  to be changed
- the ping view excludes them
- list_instances management command excludes them
- Instance.set_capacity_value sets hop nodes to 0 capacity
- TaskManager will exclude them from the nodes available for job execution
- TaskManager.reap_jobs_from_orphaned_instances will consider hop nodes
  to be an orphaned instance
- The apply_cluster_membership_policies task will not manipulate hop nodes
- get_broadcast_hosts will ignore hop nodes
- active_count also will ignore hop nodes
This commit is contained in:
Jeff Bradberry
2021-12-16 15:53:15 -05:00
parent c8f1e714e1
commit f340f491dc
9 changed files with 25 additions and 16 deletions

View File

@@ -364,11 +364,14 @@ class InstanceList(ListAPIView):
serializer_class = serializers.InstanceSerializer serializer_class = serializers.InstanceSerializer
search_fields = ('hostname',) search_fields = ('hostname',)
def get_queryset(self):
return super().get_queryset().exclude(node_type='hop')
class InstanceDetail(RetrieveUpdateAPIView): class InstanceDetail(RetrieveUpdateAPIView):
name = _("Instance Detail") name = _("Instance Detail")
model = models.Instance queryset = models.Instance.objects.exclude(node_type='hop')
serializer_class = serializers.InstanceSerializer serializer_class = serializers.InstanceSerializer
def update(self, request, *args, **kwargs): def update(self, request, *args, **kwargs):
@@ -406,13 +409,15 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta
def is_valid_relation(self, parent, sub, created=False): def is_valid_relation(self, parent, sub, created=False):
if parent.node_type == 'control': if parent.node_type == 'control':
return {'msg': _(f"Cannot change instance group membership of control-only node: {parent.hostname}.")} return {'msg': _(f"Cannot change instance group membership of control-only node: {parent.hostname}.")}
if parent.node_type == 'hop':
return {'msg': _(f"Cannot change instance group membership of hop node: {parent.hostname}.")}
return None return None
class InstanceHealthCheck(GenericAPIView): class InstanceHealthCheck(GenericAPIView):
name = _('Instance Health Check') name = _('Instance Health Check')
model = models.Instance queryset = models.Instance.objects.exclude(node_type='hop')
serializer_class = serializers.InstanceHealthCheckSerializer serializer_class = serializers.InstanceHealthCheckSerializer
permission_classes = (IsSystemAdminOrAuditor,) permission_classes = (IsSystemAdminOrAuditor,)
@@ -503,6 +508,8 @@ class InstanceGroupInstanceList(InstanceGroupMembershipMixin, SubListAttachDetac
def is_valid_relation(self, parent, sub, created=False): def is_valid_relation(self, parent, sub, created=False):
if sub.node_type == 'control': if sub.node_type == 'control':
return {'msg': _(f"Cannot change instance group membership of control-only node: {sub.hostname}.")} return {'msg': _(f"Cannot change instance group membership of control-only node: {sub.hostname}.")}
if sub.node_type == 'hop':
return {'msg': _(f"Cannot change instance group membership of hop node: {sub.hostname}.")}
return None return None

View File

@@ -149,7 +149,7 @@ class ApiV2PingView(APIView):
response = {'ha': is_ha_environment(), 'version': get_awx_version(), 'active_node': settings.CLUSTER_HOST_ID, 'install_uuid': settings.INSTALL_UUID} response = {'ha': is_ha_environment(), 'version': get_awx_version(), 'active_node': settings.CLUSTER_HOST_ID, 'install_uuid': settings.INSTALL_UUID}
response['instances'] = [] response['instances'] = []
for instance in Instance.objects.all(): for instance in Instance.objects.exclude(node_type='hop'):
response['instances'].append( response['instances'].append(
dict( dict(
node=instance.hostname, node=instance.hostname,

View File

@@ -13,7 +13,7 @@ class Ungrouped(object):
@property @property
def instances(self): def instances(self):
return Instance.objects.filter(rampart_groups__isnull=True) return Instance.objects.filter(rampart_groups__isnull=True).exclude(node_type='hop')
@property @property
def capacity(self): def capacity(self):

View File

@@ -188,7 +188,7 @@ class InstanceManager(models.Manager):
def active_count(self): def active_count(self):
"""Return count of active Tower nodes for licensing.""" """Return count of active Tower nodes for licensing."""
return self.all().count() return self.exclude(node_type='hop').count()
class InstanceGroupManager(models.Manager): class InstanceGroupManager(models.Manager):

View File

@@ -29,7 +29,7 @@ from awx.main.models.mixins import RelatedJobsMixin
# ansible-runner # ansible-runner
from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes
__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState') __all__ = ('Instance', 'InstanceGroup', 'InstanceLink', 'TowerScheduleState')
logger = logging.getLogger('awx.main.models.ha') logger = logging.getLogger('awx.main.models.ha')
@@ -215,7 +215,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def set_capacity_value(self): def set_capacity_value(self):
"""Sets capacity according to capacity adjustment rule (no save)""" """Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled: if self.enabled and self.node_type != 'hop':
lower_cap = min(self.mem_capacity, self.cpu_capacity) lower_cap = min(self.mem_capacity, self.cpu_capacity)
higher_cap = max(self.mem_capacity, self.cpu_capacity) higher_cap = max(self.mem_capacity, self.cpu_capacity)
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
@@ -320,7 +320,7 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
@property @property
def capacity(self): def capacity(self):
return sum([inst.capacity for inst in self.instances.all()]) return sum(inst.capacity for inst in self.instances.all())
@property @property
def jobs_running(self): def jobs_running(self):

View File

@@ -13,7 +13,6 @@ from django.db import transaction, connection
from django.utils.translation import ugettext_lazy as _, gettext_noop from django.utils.translation import ugettext_lazy as _, gettext_noop
from django.utils.timezone import now as tz_now from django.utils.timezone import now as tz_now
from django.conf import settings from django.conf import settings
from django.db.models import Q
# AWX # AWX
from awx.main.dispatch.reaper import reap_job from awx.main.dispatch.reaper import reap_job
@@ -69,7 +68,7 @@ class TaskManager:
""" """
Init AFTER we know this instance of the task manager will run because the lock is acquired. Init AFTER we know this instance of the task manager will run because the lock is acquired.
""" """
instances = Instance.objects.filter(~Q(hostname=None), enabled=True) instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances} self.real_instances = {i.hostname: i for i in instances}
instances_partial = [ instances_partial = [
@@ -484,7 +483,7 @@ class TaskManager:
return created_dependencies return created_dependencies
def process_pending_tasks(self, pending_tasks): def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = [] tasks_to_update_job_explanation = []
for task in pending_tasks: for task in pending_tasks:
if self.start_task_limit <= 0: if self.start_task_limit <= 0:
@@ -593,7 +592,7 @@ class TaskManager:
# elsewhere # elsewhere
for j in UnifiedJob.objects.filter( for j in UnifiedJob.objects.filter(
status__in=['pending', 'waiting', 'running'], status__in=['pending', 'waiting', 'running'],
).exclude(execution_node__in=Instance.objects.values_list('hostname', flat=True)): ).exclude(execution_node__in=Instance.objects.exclude(node_type='hop').values_list('hostname', flat=True)):
if j.execution_node and not j.is_container_group_task: if j.execution_node and not j.is_container_group_task:
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}') logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
reap_job(j, 'failed') reap_job(j, 'failed')

View File

@@ -202,7 +202,8 @@ def apply_cluster_membership_policies():
to_log = logger.debug to_log = logger.debug
to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time)) to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time))
started_compute = time.time() started_compute = time.time()
all_instances = list(Instance.objects.order_by('id')) # Hop nodes should never get assigned to an InstanceGroup.
all_instances = list(Instance.objects.exclude(node_type='hop').order_by('id'))
all_groups = list(InstanceGroup.objects.prefetch_related('instances')) all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
total_instances = len(all_instances) total_instances = len(all_instances)
@@ -253,7 +254,7 @@ def apply_cluster_membership_policies():
# Finally, process instance policy percentages # Finally, process instance policy percentages
for g in sorted(actual_groups, key=lambda x: len(x.instances)): for g in sorted(actual_groups, key=lambda x: len(x.instances)):
exclude_type = 'execution' if g.obj.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME else 'control' exclude_type = 'execution' if g.obj.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME else 'control'
candidate_pool_ct = len([i for i in actual_instances if i.obj.node_type != exclude_type]) candidate_pool_ct = sum(1 for i in actual_instances if i.obj.node_type != exclude_type)
if not candidate_pool_ct: if not candidate_pool_ct:
continue continue
policy_per_added = [] policy_per_added = []

View File

@@ -84,8 +84,9 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
list_response2 = get(reverse('api:instance_list'), user=org_admin) list_response2 = get(reverse('api:instance_list'), user=org_admin)
api_num_instances_oa = list(list_response2.data.items())[0][1] api_num_instances_oa = list(list_response2.data.items())[0][1]
assert actual_num_instances == api_num_instances_auditor assert api_num_instances_auditor == actual_num_instances
# Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected # Note: The org_admin will not see the default 'tower' node
# (instance fixture) because it is not in its group, as expected
assert api_num_instances_oa == (actual_num_instances - 1) assert api_num_instances_oa == (actual_num_instances - 1)

View File

@@ -35,6 +35,7 @@ def get_broadcast_hosts():
instances = ( instances = (
Instance.objects.exclude(hostname=Instance.objects.me().hostname) Instance.objects.exclude(hostname=Instance.objects.me().hostname)
.exclude(node_type='execution') .exclude(node_type='execution')
.exclude(node_type='hop')
.order_by('hostname') .order_by('hostname')
.values('hostname', 'ip_address') .values('hostname', 'ip_address')
.distinct() .distinct()