Model changes for instance last_seen field to replace modified (#10870)

* Model changes for instance last_seen field to replace modified

* Break up refresh_capacity into smaller units

* Rename execution node methods, fix last_seen clustering

* Use update_fields to make it clear save only affects capacity

* Restructing to pass unit tests

* Fix bug where a PATCH did not update capacity value
This commit is contained in:
Alan Rominger 2021-08-17 11:52:57 -04:00
parent 1a9fcdccc2
commit 928c35ede5
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559
11 changed files with 246 additions and 127 deletions

View File

@ -370,6 +370,15 @@ class InstanceDetail(RetrieveUpdateAPIView):
model = models.Instance
serializer_class = serializers.InstanceSerializer
def update(self, request, *args, **kwargs):
r = super(InstanceDetail, self).update(request, *args, **kwargs)
if status.is_success(r.status_code):
obj = self.get_object()
obj.set_capacity_value()
obj.save(update_fields=['capacity'])
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r
class InstanceUnifiedJobsList(SubListAPIView):

View File

@ -105,7 +105,7 @@ class InstanceManager(models.Manager):
"""Return the currently active instance."""
# If we are running unit tests, return a stub record.
if settings.IS_TESTING(sys.argv) or hasattr(sys, '_called_from_test'):
return self.model(id=1, hostname='localhost', uuid='00000000-0000-0000-0000-000000000000')
return self.model(id=1, hostname=settings.CLUSTER_HOST_ID, uuid='00000000-0000-0000-0000-000000000000')
node = self.filter(hostname=settings.CLUSTER_HOST_ID)
if node.exists():

View File

@ -0,0 +1,27 @@
# Generated by Django 2.2.20 on 2021-08-12 13:55
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0152_instance_node_type'),
]
operations = [
migrations.AddField(
model_name='instance',
name='last_seen',
field=models.DateTimeField(
editable=False,
help_text='Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.',
null=True,
),
),
migrations.AlterField(
model_name='instance',
name='memory',
field=models.BigIntegerField(default=0, editable=False, help_text='Total system memory of this instance in bytes.'),
),
]

View File

@ -21,7 +21,7 @@ from awx.main.managers import InstanceManager, InstanceGroupManager
from awx.main.fields import JSONField
from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity
from awx.main.utils.common import measure_cpu, get_corrected_cpu, get_cpu_effective_capacity, measure_memory, get_corrected_memory, get_mem_effective_capacity
from awx.main.models.mixins import RelatedJobsMixin
__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState')
@ -52,6 +52,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
objects = InstanceManager()
# Fields set in instance registration
uuid = models.CharField(max_length=40)
hostname = models.CharField(max_length=250, unique=True)
ip_address = models.CharField(
@ -61,16 +62,11 @@ class Instance(HasPolicyEditsMixin, BaseModel):
max_length=50,
unique=True,
)
# Auto-fields, implementation is different from BaseModel
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
# Fields defined in health check or heartbeat
version = models.CharField(max_length=120, blank=True)
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)
cpu = models.IntegerField(
default=0,
editable=False,
@ -78,7 +74,22 @@ class Instance(HasPolicyEditsMixin, BaseModel):
memory = models.BigIntegerField(
default=0,
editable=False,
help_text=_('Total system memory of this instance in bytes.'),
)
last_seen = models.DateTimeField(
null=True,
editable=False,
help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'),
)
# Capacity management
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)
cpu_capacity = models.IntegerField(
default=0,
editable=False,
@ -126,39 +137,83 @@ class Instance(HasPolicyEditsMixin, BaseModel):
return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True))
def is_lost(self, ref_time=None):
if self.last_seen is None:
return True
if ref_time is None:
ref_time = now()
grace_period = 120
return self.modified < ref_time - timedelta(seconds=grace_period)
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
if self.node_type == 'execution':
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period)
def mark_offline(self, on_good_terms=False):
self.cpu = self.cpu_capacity = self.memory = self.mem_capacity = self.capacity = 0
update_fields = ['capacity', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']
if on_good_terms:
update_fields.append('modified')
self.save()
def mark_offline(self, update_last_seen=False, perform_save=True):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen):
return
self.cpu_capacity = self.mem_capacity = self.capacity = 0
if update_last_seen:
self.last_seen = now()
def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()
if perform_save:
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity']
if update_last_seen:
update_fields += ['last_seen']
self.save(update_fields=update_fields)
def set_capacity_value(self):
"""Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled:
self.capacity = get_system_task_capacity(self.capacity_adjustment)
lower_cap = min(self.mem_capacity, self.cpu_capacity)
higher_cap = max(self.mem_capacity, self.cpu_capacity)
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
else:
self.capacity = 0
def refresh_capacity_fields(self):
"""Update derived capacity fields from cpu and memory (no save)"""
self.cpu_capacity = get_cpu_effective_capacity(self.cpu)
self.mem_capacity = get_mem_effective_capacity(self.memory)
self.set_capacity_value()
def save_health_data(self, version, cpu, memory, last_seen=None, has_error=False):
update_fields = []
if last_seen is not None and self.last_seen != last_seen:
self.last_seen = last_seen
update_fields.append('last_seen')
if self.version != version:
self.version = version
update_fields.append('version')
new_cpu = get_corrected_cpu(cpu)
if new_cpu != self.cpu:
self.cpu = new_cpu
update_fields.append('cpu')
new_memory = get_corrected_memory(memory)
if new_memory != self.memory:
self.memory = new_memory
update_fields.append('memory')
if not has_error:
self.refresh_capacity_fields()
else:
self.mark_offline(perform_save=False)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
self.save(update_fields=update_fields)
def local_health_check(self):
"""Only call this method on the instance that this record represents"""
has_error = False
try:
# if redis is down for some reason, that means we can't persist
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
self.capacity = 0
has_error = True
self.cpu = cpu[0]
self.memory = mem[0]
self.cpu_capacity = cpu[1]
self.mem_capacity = mem[1]
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
self.save_health_data(awx_application_version, measure_cpu(), measure_memory(), last_seen=now(), has_error=has_error)
class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):

View File

@ -98,9 +98,6 @@ from awx.main.utils.common import (
parse_yaml_or_json,
cleanup_new_process,
create_partition,
get_cpu_effective_capacity,
get_mem_effective_capacity,
get_system_task_capacity,
)
from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path
from awx.main.utils.ansible import read_ansible_config
@ -180,7 +177,7 @@ def dispatch_startup():
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(on_good_terms=True) # No thank you to new jobs while shut down
this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down
try:
reaper.reap(this_inst)
except Exception:
@ -403,7 +400,7 @@ def cleanup_execution_environment_images():
@task(queue=get_local_queuename)
def check_heartbeat(node):
def execution_node_health_check(node):
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
@ -411,64 +408,66 @@ def check_heartbeat(node):
return
data = worker_info(node)
prior_capacity = instance.capacity
instance.save_health_data(
'ansible-runner-' + data.get('Version', '???'),
data.get('CPU Capacity', 0), # TODO: rename field on runner side to not say "Capacity"
data.get('Memory Capacity', 0) * 1000, # TODO: double-check the multiplier here
has_error=bool(data.get('Errors')),
)
if data['Errors']:
formatted_error = "\n".join(data["Errors"])
if instance.capacity:
if prior_capacity:
logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
else:
logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}')
instance.mark_offline()
else:
# TODO: spin off new instance method from refresh_capacity that calculates derived fields
instance.cpu = data['CPU Capacity'] # TODO: rename field on runner side to not say "Capacity"
instance.cpu_capacity = get_cpu_effective_capacity(instance.cpu)
instance.memory = data['Memory Capacity'] * 1000 # TODO: double-check the multiplier here
instance.mem_capacity = get_mem_effective_capacity(instance.memory)
instance.capacity = get_system_task_capacity(
instance.capacity_adjustment,
instance.cpu_capacity,
instance.mem_capacity,
)
instance.version = 'ansible-runner-' + data['Version']
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2)))
def discover_receptor_nodes():
def inspect_execution_nodes(instance_list):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad['WorkCommands'] or []
commands = ad.get('WorkCommands') or []
if 'ansible-runner' not in commands:
continue
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
if instance.modified == last_seen:
continue
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile, don't save a new modified time
# this is so multiple cluster nodes do all make repetitive updates
continue
instance.save(update_fields=['modified'])
if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
instance.save(update_fields=['last_seen'])
if changed:
logger.warn("Registered execution node '{}'".format(hostname))
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename)
@ -479,34 +478,34 @@ def cluster_node_heartbeat():
this_inst = None
lost_instances = []
(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower control node '{}'".format(instance.hostname))
discover_receptor_nodes()
for inst in list(instance_list):
for inst in instance_list:
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.node_type == 'execution': # TODO: zero out capacity of execution nodes that are MIA
# Only considering control plane for this logic
continue
elif inst.is_lost(ref_time=nowtime):
break
else:
(changed, this_inst) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower control node '{}'".format(this_inst.hostname))
inspect_execution_nodes(instance_list)
for inst in list(instance_list):
if inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
this_inst.refresh_capacity()
if startup_event:
this_inst.local_health_check()
if startup_event and this_inst.capacity != 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
return
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.version == "" or other_inst.version.startswith('ansible-runner'):
if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution':
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error(
@ -534,7 +533,7 @@ def cluster_node_heartbeat():
# since we will delete the node anyway.
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.mark_offline()
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified))
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
other_inst.delete()

View File

@ -0,0 +1,32 @@
import pytest
from awx.api.versioning import reverse
from awx.main.models.ha import Instance
@pytest.mark.django_db
def test_disabled_zeros_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42)
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
r = patch(url=url, data={'enabled': False}, user=admin_user)
assert r.data['capacity'] == 0
instance.refresh_from_db()
assert instance.capacity == 0
@pytest.mark.django_db
def test_enabled_sets_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', enabled=False, cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42, capacity=0)
assert instance.capacity == 0
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
r = patch(url=url, data={'enabled': True}, user=admin_user)
assert r.data['capacity'] > 0
instance.refresh_from_db()
assert instance.capacity > 0

View File

@ -20,24 +20,27 @@ def test_orphan_unified_job_creation(instance, inventory):
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
@mock.patch('awx.main.tasks.inspect_execution_nodes', lambda *args, **kwargs: None)
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
with mock.patch.object(redis.client.Redis, 'ping', lambda self: True):
i.refresh_capacity()
i.save_health_data('18.0.1', 2, 8000)
assert i.enabled is True
assert i.capacity_adjustment == 1.0
assert i.capacity == 62
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):
cluster_node_heartbeat()
with mock.patch.object(redis.client.Redis, 'ping', lambda self: True):
cluster_node_heartbeat()
i = Instance.objects.get(id=i.id)
assert i.capacity == 0
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
def test_job_capacity_with_redis_disabled():
i = Instance.objects.create(hostname='test-1')
@ -45,7 +48,7 @@ def test_job_capacity_with_redis_disabled():
raise redis.ConnectionError()
with mock.patch.object(redis.client.Redis, 'ping', _raise):
i.refresh_capacity()
i.local_health_check()
assert i.capacity == 0

View File

@ -1,10 +1,19 @@
import pytest
from unittest import mock
from unittest.mock import Mock
from decimal import Decimal
from awx.main.models import (
InstanceGroup,
)
from awx.main.models import InstanceGroup, Instance
@pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3])
def test_capacity_adjustment_no_save(capacity_adjustment):
inst = Instance(hostname='test-host', capacity_adjustment=Decimal(capacity_adjustment), capacity=0, cpu_capacity=10, mem_capacity=1000)
assert inst.capacity == 0
assert inst.capacity_adjustment == capacity_adjustment # sanity
inst.set_capacity_value()
assert inst.capacity > 0
assert inst.capacity == (float(inst.capacity_adjustment) * abs(inst.mem_capacity - inst.cpu_capacity) + min(inst.mem_capacity, inst.cpu_capacity))
def T(impact):

View File

@ -18,8 +18,6 @@ import tempfile
import psutil
from functools import reduce, wraps
from decimal import Decimal
# Django
from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
from django.utils.dateparse import parse_datetime
@ -72,9 +70,6 @@ __all__ = [
'set_current_apps',
'extract_ansible_vars',
'get_search_fields',
'get_system_task_capacity',
'get_cpu_capacity',
'get_mem_capacity',
'model_to_dict',
'NullablePromptPseudoField',
'model_instance_diff',
@ -715,7 +710,14 @@ def get_cpu_effective_capacity(cpu_count):
return cpu_count * forkcpu
def get_cpu_capacity():
def measure_cpu(): # TODO: replace with import from ansible-runner
return psutil.cpu_count()
def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity
"""Some environments will do a correction to the reported CPU number
because the given OpenShift value is a lie
"""
from django.conf import settings
settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None)
@ -726,9 +728,7 @@ def get_cpu_capacity():
elif settings_abscpu is not None:
return 0, int(settings_abscpu)
cpu = psutil.cpu_count()
return (cpu, get_cpu_effective_capacity(cpu))
return cpu_count # no correction
def get_mem_effective_capacity(mem_mb):
@ -747,7 +747,11 @@ def get_mem_effective_capacity(mem_mb):
return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem)
def get_mem_capacity():
def measure_memory(): # TODO: replace with import from ansible-runner
return psutil.virtual_memory().total
def get_corrected_memory(memory):
from django.conf import settings
settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None)
@ -758,33 +762,7 @@ def get_mem_capacity():
elif settings_absmem is not None:
return 0, int(settings_absmem)
mem = psutil.virtual_memory().total
return (mem, get_mem_effective_capacity(mem))
def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None):
"""
Measure system memory and use it as a baseline for determining the system's capacity
"""
from django.conf import settings
settings_forks = getattr(settings, 'SYSTEM_TASK_FORKS_CAPACITY', None)
env_forks = os.getenv('SYSTEM_TASK_FORKS_CAPACITY', None)
if env_forks:
return int(env_forks)
elif settings_forks:
return int(settings_forks)
if cpu_capacity is None:
_, cpu_cap = get_cpu_capacity()
else:
cpu_cap = cpu_capacity
if mem_capacity is None:
_, mem_cap = get_mem_capacity()
else:
mem_cap = mem_capacity
return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale)
return memory
_inventory_updates = threading.local()

View File

@ -419,10 +419,18 @@ DEVSERVER_DEFAULT_PORT = '8013'
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
# heartbeat period can factor into some forms of logic, so it is maintained as a setting here
CLUSTER_NODE_HEARTBEAT_PERIOD = 60
RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34
BROKER_URL = 'unix:///var/run/redis/redis.sock'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},
'cluster_heartbeat': {'task': 'awx.main.tasks.cluster_node_heartbeat', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'cluster_heartbeat': {
'task': 'awx.main.tasks.cluster_node_heartbeat',
'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD),
'options': {'expires': 50},
},
'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},

View File

@ -2,8 +2,7 @@
- node:
id: awx_{{ item }}
- log-level:
debug
- log-level: info
- tcp-listener:
port: 2222