Allow manually running a health check, and make other adjustments to the health check trigger (#11002)

* Full finalize the planned work for health checks of execution nodes

* Implementation of instance health_check endpoint

* Also do version conditional to node_type

* Do not use receptor mesh to check main cluster nodes health

* Fix bugs from testing health check of cluster nodes, add doc

* Add a few fields to health check serializer missed before

* Light refactoring of error field processing

* Fix errors clearing error, write more unit tests

* Update health check info in docs

* Bump migration of health check after rebase

* Mark string for translation

* Add related health_check link for system auditors too

* Handle health_check cluster node timeout, add errors for peer judgement
This commit is contained in:
Alan Rominger
2021-09-03 16:37:37 -04:00
committed by GitHub
parent 169c0f6642
commit 6a17e5b65b
15 changed files with 285 additions and 53 deletions

View File

@@ -0,0 +1,25 @@
# Generated by Django 2.2.20 on 2021-08-31 17:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0154_set_default_uuid'),
]
operations = [
migrations.AddField(
model_name='instance',
name='errors',
field=models.TextField(blank=True, default='', editable=False, help_text='Any error details from the last health check.'),
),
migrations.AddField(
model_name='instance',
name='last_health_check',
field=models.DateTimeField(
editable=False, help_text='Last time a health check was ran on this instance to refresh cpu, memory, and capacity.', null=True
),
),
]

View File

@@ -82,11 +82,22 @@ class Instance(HasPolicyEditsMixin, BaseModel):
editable=False,
help_text=_('Total system memory of this instance in bytes.'),
)
errors = models.TextField(
default='',
blank=True,
editable=False,
help_text=_('Any error details from the last health check.'),
)
last_seen = models.DateTimeField(
null=True,
editable=False,
help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'),
)
last_health_check = models.DateTimeField(
null=True,
editable=False,
help_text=_('Last time a health check was ran on this instance to refresh cpu, memory, and capacity.'),
)
# Capacity management
capacity = models.PositiveIntegerField(
default=100,
@@ -152,15 +163,16 @@ class Instance(HasPolicyEditsMixin, BaseModel):
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period)
def mark_offline(self, update_last_seen=False, perform_save=True):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen):
def mark_offline(self, update_last_seen=False, perform_save=True, errors=''):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen):
return
self.cpu_capacity = self.mem_capacity = self.capacity = 0
self.errors = errors
if update_last_seen:
self.last_seen = now()
if perform_save:
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity']
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors']
if update_last_seen:
update_fields += ['last_seen']
self.save(update_fields=update_fields)
@@ -180,11 +192,12 @@ class Instance(HasPolicyEditsMixin, BaseModel):
self.mem_capacity = get_mem_effective_capacity(self.memory)
self.set_capacity_value()
def save_health_data(self, version, cpu, memory, uuid=None, last_seen=None, has_error=False):
update_fields = []
def save_health_data(self, version, cpu, memory, uuid=None, update_last_seen=False, errors=''):
self.last_health_check = now()
update_fields = ['last_health_check']
if last_seen is not None and self.last_seen != last_seen:
self.last_seen = last_seen
if update_last_seen:
self.last_seen = self.last_health_check
update_fields.append('last_seen')
if uuid is not None and self.uuid != uuid:
@@ -207,25 +220,26 @@ class Instance(HasPolicyEditsMixin, BaseModel):
self.memory = new_memory
update_fields.append('memory')
if not has_error:
if not errors:
self.refresh_capacity_fields()
self.errors = ''
else:
self.mark_offline(perform_save=False)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
self.mark_offline(perform_save=False, errors=errors)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity', 'errors'])
self.save(update_fields=update_fields)
def local_health_check(self):
"""Only call this method on the instance that this record represents"""
has_error = False
errors = None
try:
# if redis is down for some reason, that means we can't persist
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
has_error = True
errors = _('Failed to connect ot Redis')
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), last_seen=now(), has_error=has_error)
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors)
class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):

View File

@@ -177,7 +177,7 @@ def dispatch_startup():
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
try:
reaper.reap(this_inst)
except Exception:
@@ -408,14 +408,37 @@ def cleanup_execution_environment_images():
logger.debug(f"Failed to delete image {image_name}")
@task(queue=get_local_queuename)
def cluster_node_health_check(node):
'''
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
'''
if node == '':
logger.warn('Local health check incorrectly called with blank string')
return
elif node != settings.CLUSTER_HOST_ID:
logger.warn(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}')
return
try:
this_inst = Instance.objects.me()
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
return
this_inst.local_health_check()
@task(queue=get_local_queuename)
def execution_node_health_check(node):
if node == '':
logger.warn('Remote health check incorrectly called with blank string')
return
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
return
data = worker_info(node)
data = worker_info(node, work_type='ansible-runner' if instance.node_type == 'execution' else 'local')
prior_capacity = instance.capacity
@@ -424,7 +447,7 @@ def execution_node_health_check(node):
cpu=data.get('cpu_count', 0),
memory=data.get('mem_in_bytes', 0),
uuid=data.get('uuid'),
has_error=bool(data.get('errors')),
errors='\n'.join(data.get('errors', [])),
)
if data['errors']:
@@ -436,6 +459,8 @@ def execution_node_health_check(node):
else:
logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2)))
return data
def inspect_execution_nodes(instance_list):
with advisory_lock('inspect_execution_nodes_lock', wait=False):
@@ -488,10 +513,12 @@ def inspect_execution_nodes(instance_list):
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname])
# nodes with proven connection but need remediation run health checks are reduced frequency
if not instance.last_health_check or (nowtime - instance.last_health_check).total_seconds() >= settings.EXECUTION_NODE_REMEDIATION_CHECKS:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename)
@@ -556,7 +583,7 @@ def cluster_node_heartbeat():
# If auto deprovisining is on, don't bother setting the capacity to 0
# since we will delete the node anyway.
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.mark_offline()
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
@@ -3028,12 +3055,17 @@ class AWXReceptorJob:
# We establish a connection to the Receptor socket
receptor_ctl = get_receptor_ctl()
res = None
try:
return self._run_internal(receptor_ctl)
res = self._run_internal(receptor_ctl)
return res
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
receptor_ctl.simple_command(f"work release {self.unit_id}")
# If an error occured without the job itself failing, it could be a broken instance
if self.work_type == 'ansible-runner' and res is None or getattr(res, 'rc', None) is None:
execution_node_health_check(self.task.instance.execution_node)
def _run_internal(self, receptor_ctl):
# Create a socketpair. Where the left side will be used for writing our payload

View File

@@ -1,13 +1,22 @@
import pytest
from awx.api.versioning import reverse
from unittest import mock
from awx.api.versioning import reverse
from awx.main.models.ha import Instance
import redis
# Django
from django.test.utils import override_settings
INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42)
@pytest.mark.django_db
def test_disabled_zeros_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42)
instance = Instance.objects.create(**INSTANCE_KWARGS)
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
@@ -20,7 +29,7 @@ def test_disabled_zeros_capacity(patch, admin_user):
@pytest.mark.django_db
def test_enabled_sets_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', enabled=False, cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42, capacity=0)
instance = Instance.objects.create(enabled=False, capacity=0, **INSTANCE_KWARGS)
assert instance.capacity == 0
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
@@ -30,3 +39,25 @@ def test_enabled_sets_capacity(patch, admin_user):
instance.refresh_from_db()
assert instance.capacity > 0
@pytest.mark.django_db
def test_auditor_user_health_check(get, post, system_auditor):
instance = Instance.objects.create(**INSTANCE_KWARGS)
url = reverse('api:instance_health_check', kwargs={'pk': instance.pk})
r = get(url=url, user=system_auditor, expect=200)
assert r.data['cpu_capacity'] == instance.cpu_capacity
post(url=url, user=system_auditor, expect=403)
@pytest.mark.django_db
@mock.patch.object(redis.client.Redis, 'ping', lambda self: True)
def test_health_check_usage(get, post, admin_user):
instance = Instance.objects.create(**INSTANCE_KWARGS)
url = reverse('api:instance_health_check', kwargs={'pk': instance.pk})
r = get(url=url, user=admin_user, expect=200)
assert r.data['cpu_capacity'] == instance.cpu_capacity
assert r.data['last_health_check'] is None
with override_settings(CLUSTER_HOST_ID=instance.hostname): # force direct call of cluster_node_health_check
r = post(url=url, user=admin_user, expect=200)
assert r.data['last_health_check'] is not None

View File

@@ -324,6 +324,23 @@ def test_instance_group_capacity(instance_factory, instance_group_factory):
assert ig_single.capacity == 100
@pytest.mark.django_db
def test_health_check_clears_errors():
instance = Instance.objects.create(hostname='foo-1', enabled=True, capacity=0, errors='something went wrong')
data = dict(version='ansible-runner-4.2', cpu=782, memory=int(39e9), uuid='asdfasdfasdfasdfasdf', errors='')
instance.save_health_data(**data)
for k, v in data.items():
assert getattr(instance, k) == v
@pytest.mark.django_db
def test_health_check_oh_no():
instance = Instance.objects.create(hostname='foo-2', enabled=True, capacity=52, cpu=8, memory=int(40e9))
instance.save_health_data('', 0, 0, errors='This it not a real instance!')
assert instance.capacity == instance.cpu_capacity == 0
assert instance.errors == 'This it not a real instance!'
@pytest.mark.django_db
class TestInstanceGroupOrdering:
def test_ad_hoc_instance_groups(self, instance_group_factory, inventory, default_instance_group):

View File

@@ -28,13 +28,16 @@ def get_receptor_ctl():
return ReceptorControl(receptor_sockfile)
def worker_info(node_name):
def worker_info(node_name, work_type='ansible-runner'):
receptor_ctl = get_receptor_ctl()
transmit_start = time.time()
error_list = []
data = {'errors': error_list, 'transmit_timing': 0.0}
result = receptor_ctl.submit_work(worktype='ansible-runner', payload='', params={"params": f"--worker-info"}, ttl='20s', node=node_name)
kwargs = {}
if work_type != 'local':
kwargs['ttl'] = '20s'
result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs)
unit_id = result['unitid']
run_start = time.time()
@@ -90,9 +93,11 @@ def worker_info(node_name):
error_list.extend(remote_data.pop('errors', [])) # merge both error lists
data.update(remote_data)
# see tasks.py usage of keys
missing_keys = set(('runner_version', 'mem_in_bytes', 'cpu_count')) - set(data.keys())
if missing_keys:
data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys)))
# If we have a connection error, missing keys would be trivial consequence of that
if not data['errors']:
# see tasks.py usage of keys
missing_keys = set(('runner_version', 'mem_in_bytes', 'cpu_count')) - set(data.keys())
if missing_keys:
data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys)))
return data