Adopt the node_type field in receptor logic (#10802)

* Adopt the node_type field in receptor logic

* Refactor Instance.objects.register so we do not reset capacity to 0
This commit is contained in:
Alan Rominger 2021-08-04 10:59:44 -04:00
parent 5d4ab13386
commit f47eb126e2
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559
5 changed files with 26 additions and 23 deletions

View File

@ -77,3 +77,7 @@ LOGGER_BLOCKLIST = (
# loggers that may be called getting logging settings
'awx.conf',
)
# Reported version for node seen in receptor mesh but for which capacity check
# failed or is in progress
RECEPTOR_PENDING = 'ansible-runner-???'

View File

@ -10,6 +10,7 @@ from django.conf import settings
from awx.main.utils.filters import SmartFilter
from awx.main.utils.pglock import advisory_lock
from awx.main.constants import RECEPTOR_PENDING
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager']
@ -111,13 +112,11 @@ class InstanceManager(models.Manager):
return node[0]
raise RuntimeError("No instance found with the current cluster host id")
def register(self, uuid=None, hostname=None, ip_address=None, node_type=None):
def register(self, uuid=None, hostname=None, ip_address=None, node_type='hybrid', defaults=None):
if not uuid:
uuid = settings.SYSTEM_UUID
if not hostname:
hostname = settings.CLUSTER_HOST_ID
if not node_type:
node_type = "hybrid"
with advisory_lock('instance_registration_%s' % hostname):
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
# detect any instances with the same IP address.
@ -130,6 +129,7 @@ class InstanceManager(models.Manager):
other_inst.save(update_fields=['ip_address'])
logger.warning("IP address {0} conflict detected, ip address unset for host {1}.".format(ip_address, other_hostname))
# Return existing instance that matches hostname
instance = self.filter(hostname=hostname)
if instance.exists():
instance = instance.get()
@ -145,7 +145,14 @@ class InstanceManager(models.Manager):
return (True, instance)
else:
return (False, instance)
instance = self.create(uuid=uuid, hostname=hostname, ip_address=ip_address, capacity=0, node_type=node_type)
# Create new instance, and fill in default values
create_defaults = dict(capacity=0, uuid=uuid)
if defaults is not None:
create_defaults.update(defaults)
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults)
return (True, instance)
def get_or_register(self):

View File

@ -107,11 +107,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def remaining_capacity(self):
return self.capacity - self.consumed_capacity
@property
def role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "awx"
@property
def jobs_running(self):
return UnifiedJob.objects.filter(
@ -128,8 +123,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
@staticmethod
def choose_online_control_plane_node():
# TODO: update query to use node_type field
return random.choice(Instance.objects.filter(enabled=True).exclude(version__startswith='ansible-runner-').values_list('hostname', flat=True))
return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True))
def is_lost(self, ref_time=None):
if ref_time is None:
@ -206,8 +200,7 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
@property
def execution_capacity(self):
# TODO: update query to exclude based on node_type field
return sum([inst.capacity for inst in self.instances.all()])
return sum([inst.capacity for inst in self.instances.filter(node_type__in=['hybrid', 'execution'])])
@property
def jobs_running(self):
@ -231,7 +224,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in instances:
# TODO: continue if node is control-only node type
if i.node_type == 'control':
continue
if i.remaining_capacity >= task.task_impact and (
instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity
):

View File

@ -76,6 +76,7 @@ class TaskManager:
SimpleNamespace(
obj=instance,
version=instance.version,
node_type=instance.node_type,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,

View File

@ -85,7 +85,7 @@ from awx.main.models import (
SystemJobEvent,
build_safe_env,
)
from awx.main.constants import ACTIVE_STATES
from awx.main.constants import ACTIVE_STATES, RECEPTOR_PENDING
from awx.main.exceptions import AwxTaskError, PostRunError
from awx.main.queue import CallbackQueueDispatcher
from awx.main.dispatch.publish import task
@ -121,7 +121,6 @@ from awx.main.analytics.subsystem_metrics import Metrics
from rest_framework.exceptions import PermissionDenied
RECEPTOR_SOCK = '/var/run/receptor/receptor.sock'
RECEPTOR_PENDING = 'ansible-runner-???'
__all__ = [
@ -422,17 +421,14 @@ def discover_receptor_nodes():
commands = ad['WorkCommands'] or []
if 'ansible-runner' not in commands:
continue
(changed, instance) = Instance.objects.register(hostname=hostname)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
was_lost = instance.is_lost(ref_time=nowtime)
if changed:
logger.info("Registered tower execution node '{}'".format(hostname))
instance.capacity = 0
instance.version = RECEPTOR_PENDING
instance.save(update_fields=['capacity', 'version', 'modified'])
logger.info("Registered execution node '{}'".format(hostname))
check_heartbeat.apply_async([hostname])
else:
last_seen = parse_date(ad['Time'])
logger.debug("Updated tower control node '{}' last seen {}".format(hostname, last_seen))
logger.debug("Updated execution node '{}' modified from {} to {}".format(hostname, instance.modified, last_seen))
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile,
@ -466,7 +462,8 @@ def cluster_node_heartbeat():
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.version.startswith('ansible-runner'): # TODO: use proper field when introduced
elif inst.node_type == 'execution':
# Only considering control plane for this logic
continue
elif inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)