From f47eb126e2d08976b5ce8c2ddf33664480bc88dc Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 4 Aug 2021 10:59:44 -0400 Subject: [PATCH] Adopt the node_type field in receptor logic (#10802) * Adopt the node_type field in receptor logic * Refactor Instance.objects.register so we do not reset capacity to 0 --- awx/main/constants.py | 4 ++++ awx/main/managers.py | 15 +++++++++++---- awx/main/models/ha.py | 14 ++++---------- awx/main/scheduler/task_manager.py | 1 + awx/main/tasks.py | 15 ++++++--------- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/awx/main/constants.py b/awx/main/constants.py index f2af99d167..b7a5813f7f 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -77,3 +77,7 @@ LOGGER_BLOCKLIST = ( # loggers that may be called getting logging settings 'awx.conf', ) + +# Reported version for node seen in receptor mesh but for which capacity check +# failed or is in progress +RECEPTOR_PENDING = 'ansible-runner-???' diff --git a/awx/main/managers.py b/awx/main/managers.py index b26d476fd1..01fb223df5 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -10,6 +10,7 @@ from django.conf import settings from awx.main.utils.filters import SmartFilter from awx.main.utils.pglock import advisory_lock +from awx.main.constants import RECEPTOR_PENDING ___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager'] @@ -111,13 +112,11 @@ class InstanceManager(models.Manager): return node[0] raise RuntimeError("No instance found with the current cluster host id") - def register(self, uuid=None, hostname=None, ip_address=None, node_type=None): + def register(self, uuid=None, hostname=None, ip_address=None, node_type='hybrid', defaults=None): if not uuid: uuid = settings.SYSTEM_UUID if not hostname: hostname = settings.CLUSTER_HOST_ID - if not node_type: - node_type = "hybrid" with advisory_lock('instance_registration_%s' % hostname): if settings.AWX_AUTO_DEPROVISION_INSTANCES: # detect any instances with the same IP address. @@ -130,6 +129,7 @@ class InstanceManager(models.Manager): other_inst.save(update_fields=['ip_address']) logger.warning("IP address {0} conflict detected, ip address unset for host {1}.".format(ip_address, other_hostname)) + # Return existing instance that matches hostname instance = self.filter(hostname=hostname) if instance.exists(): instance = instance.get() @@ -145,7 +145,14 @@ class InstanceManager(models.Manager): return (True, instance) else: return (False, instance) - instance = self.create(uuid=uuid, hostname=hostname, ip_address=ip_address, capacity=0, node_type=node_type) + + # Create new instance, and fill in default values + create_defaults = dict(capacity=0, uuid=uuid) + if defaults is not None: + create_defaults.update(defaults) + if node_type == 'execution' and 'version' not in create_defaults: + create_defaults['version'] = RECEPTOR_PENDING + instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults) return (True, instance) def get_or_register(self): diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 36217ebf45..ebef8dc7be 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -107,11 +107,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): def remaining_capacity(self): return self.capacity - self.consumed_capacity - @property - def role(self): - # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing - return "awx" - @property def jobs_running(self): return UnifiedJob.objects.filter( @@ -128,8 +123,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): @staticmethod def choose_online_control_plane_node(): - # TODO: update query to use node_type field - return random.choice(Instance.objects.filter(enabled=True).exclude(version__startswith='ansible-runner-').values_list('hostname', flat=True)) + return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True)) def is_lost(self, ref_time=None): if ref_time is None: @@ -206,8 +200,7 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): @property def execution_capacity(self): - # TODO: update query to exclude based on node_type field - return sum([inst.capacity for inst in self.instances.all()]) + return sum([inst.capacity for inst in self.instances.filter(node_type__in=['hybrid', 'execution'])]) @property def jobs_running(self): @@ -231,7 +224,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(task, instances): instance_most_capacity = None for i in instances: - # TODO: continue if node is control-only node type + if i.node_type == 'control': + continue if i.remaining_capacity >= task.task_impact and ( instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity ): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 56a18a1412..4eaf2743cc 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -76,6 +76,7 @@ class TaskManager: SimpleNamespace( obj=instance, version=instance.version, + node_type=instance.node_type, remaining_capacity=instance.remaining_capacity, capacity=instance.capacity, jobs_running=instance.jobs_running, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1e9e57ba45..830bc2ad0c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -85,7 +85,7 @@ from awx.main.models import ( SystemJobEvent, build_safe_env, ) -from awx.main.constants import ACTIVE_STATES +from awx.main.constants import ACTIVE_STATES, RECEPTOR_PENDING from awx.main.exceptions import AwxTaskError, PostRunError from awx.main.queue import CallbackQueueDispatcher from awx.main.dispatch.publish import task @@ -121,7 +121,6 @@ from awx.main.analytics.subsystem_metrics import Metrics from rest_framework.exceptions import PermissionDenied RECEPTOR_SOCK = '/var/run/receptor/receptor.sock' -RECEPTOR_PENDING = 'ansible-runner-???' __all__ = [ @@ -422,17 +421,14 @@ def discover_receptor_nodes(): commands = ad['WorkCommands'] or [] if 'ansible-runner' not in commands: continue - (changed, instance) = Instance.objects.register(hostname=hostname) + (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution') was_lost = instance.is_lost(ref_time=nowtime) if changed: - logger.info("Registered tower execution node '{}'".format(hostname)) - instance.capacity = 0 - instance.version = RECEPTOR_PENDING - instance.save(update_fields=['capacity', 'version', 'modified']) + logger.info("Registered execution node '{}'".format(hostname)) check_heartbeat.apply_async([hostname]) else: last_seen = parse_date(ad['Time']) - logger.debug("Updated tower control node '{}' last seen {}".format(hostname, last_seen)) + logger.debug("Updated execution node '{}' modified from {} to {}".format(hostname, instance.modified, last_seen)) instance.modified = last_seen if instance.is_lost(ref_time=nowtime): # if the instance hasn't advertised in awhile, @@ -466,7 +462,8 @@ def cluster_node_heartbeat(): if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst instance_list.remove(inst) - elif inst.version.startswith('ansible-runner'): # TODO: use proper field when introduced + elif inst.node_type == 'execution': + # Only considering control plane for this logic continue elif inst.is_lost(ref_time=nowtime): lost_instances.append(inst)