mirror of
https://github.com/ansible/awx.git
synced 2026-03-29 14:55:09 -02:30
Fill in errors for hop nodes when Last Seen is out of date, and clear them when not (#11714)
* Process unresponsive and newly responsive hop nodes * Use more natural way to zero hop node capacity, add test * Use warning as opposed to warn for log messages
This commit is contained in:
@@ -233,13 +233,19 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
|
|
||||||
def refresh_capacity_fields(self):
|
def refresh_capacity_fields(self):
|
||||||
"""Update derived capacity fields from cpu and memory (no save)"""
|
"""Update derived capacity fields from cpu and memory (no save)"""
|
||||||
self.cpu_capacity = get_cpu_effective_capacity(self.cpu)
|
if self.node_type == 'hop':
|
||||||
self.mem_capacity = get_mem_effective_capacity(self.memory)
|
self.cpu_capacity = 0
|
||||||
|
self.mem_capacity = 0 # formula has a non-zero offset, so we make sure it is 0 for hop nodes
|
||||||
|
else:
|
||||||
|
self.cpu_capacity = get_cpu_effective_capacity(self.cpu)
|
||||||
|
self.mem_capacity = get_mem_effective_capacity(self.memory)
|
||||||
self.set_capacity_value()
|
self.set_capacity_value()
|
||||||
|
|
||||||
def save_health_data(self, version, cpu, memory, uuid=None, update_last_seen=False, errors=''):
|
def save_health_data(self, version=None, cpu=0, memory=0, uuid=None, update_last_seen=False, errors=''):
|
||||||
self.last_health_check = now()
|
update_fields = ['errors']
|
||||||
update_fields = ['last_health_check']
|
if self.node_type != 'hop':
|
||||||
|
self.last_health_check = now()
|
||||||
|
update_fields.append('last_health_check')
|
||||||
|
|
||||||
if update_last_seen:
|
if update_last_seen:
|
||||||
self.last_seen = self.last_health_check
|
self.last_seen = self.last_health_check
|
||||||
@@ -251,7 +257,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
self.uuid = uuid
|
self.uuid = uuid
|
||||||
update_fields.append('uuid')
|
update_fields.append('uuid')
|
||||||
|
|
||||||
if self.version != version:
|
if version is not None and self.version != version:
|
||||||
self.version = version
|
self.version = version
|
||||||
update_fields.append('version')
|
update_fields.append('version')
|
||||||
|
|
||||||
@@ -270,7 +276,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
self.errors = ''
|
self.errors = ''
|
||||||
else:
|
else:
|
||||||
self.mark_offline(perform_save=False, errors=errors)
|
self.mark_offline(perform_save=False, errors=errors)
|
||||||
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity', 'errors'])
|
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
|
||||||
|
|
||||||
# disabling activity stream will avoid extra queries, which is important for heatbeat actions
|
# disabling activity stream will avoid extra queries, which is important for heatbeat actions
|
||||||
from awx.main.signals import disable_activity_stream
|
from awx.main.signals import disable_activity_stream
|
||||||
|
|||||||
@@ -436,7 +436,6 @@ def inspect_execution_nodes(instance_list):
|
|||||||
workers = mesh_status['Advertisements']
|
workers = mesh_status['Advertisements']
|
||||||
for ad in workers:
|
for ad in workers:
|
||||||
hostname = ad['NodeID']
|
hostname = ad['NodeID']
|
||||||
changed = False
|
|
||||||
|
|
||||||
if hostname in node_lookup:
|
if hostname in node_lookup:
|
||||||
instance = node_lookup[hostname]
|
instance = node_lookup[hostname]
|
||||||
@@ -458,11 +457,11 @@ def inspect_execution_nodes(instance_list):
|
|||||||
|
|
||||||
# Only execution nodes should be dealt with by execution_node_health_check
|
# Only execution nodes should be dealt with by execution_node_health_check
|
||||||
if instance.node_type == 'hop':
|
if instance.node_type == 'hop':
|
||||||
|
logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh')
|
||||||
|
instance.save_health_data(errors='')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if changed:
|
if was_lost:
|
||||||
execution_node_health_check.apply_async([hostname])
|
|
||||||
elif was_lost:
|
|
||||||
# if the instance *was* lost, but has appeared again,
|
# if the instance *was* lost, but has appeared again,
|
||||||
# attempt to re-establish the initial capacity and version
|
# attempt to re-establish the initial capacity and version
|
||||||
# check
|
# check
|
||||||
@@ -534,20 +533,14 @@ def cluster_node_heartbeat():
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
|
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
|
||||||
try:
|
try:
|
||||||
# Capacity could already be 0 because:
|
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
# * It's a new node and it never had a heartbeat
|
|
||||||
# * It was set to 0 by another tower node running this method
|
|
||||||
# * It was set to 0 by this node, but auto deprovisioning is off
|
|
||||||
#
|
|
||||||
# If auto deprovisioning is on, don't bother setting the capacity to 0
|
|
||||||
# since we will delete the node anyway.
|
|
||||||
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
||||||
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
|
||||||
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
|
|
||||||
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
||||||
deprovision_hostname = other_inst.hostname
|
deprovision_hostname = other_inst.hostname
|
||||||
other_inst.delete()
|
other_inst.delete()
|
||||||
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
||||||
|
elif other_inst.capacity != 0 or (not other_inst.errors):
|
||||||
|
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
||||||
|
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
|
||||||
|
|
||||||
except DatabaseError as e:
|
except DatabaseError as e:
|
||||||
if 'did not affect any rows' in str(e):
|
if 'did not affect any rows' in str(e):
|
||||||
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
|
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
|
||||||
|
|||||||
@@ -363,6 +363,23 @@ def test_health_check_oh_no():
|
|||||||
assert instance.errors == 'This it not a real instance!'
|
assert instance.errors == 'This it not a real instance!'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_errors_field_alone():
|
||||||
|
instance = Instance.objects.create(hostname='foo-1', enabled=True, node_type='hop')
|
||||||
|
|
||||||
|
instance.save_health_data(errors='Node went missing!')
|
||||||
|
assert instance.errors == 'Node went missing!'
|
||||||
|
assert instance.capacity == 0
|
||||||
|
assert instance.memory == instance.mem_capacity == 0
|
||||||
|
assert instance.cpu == instance.cpu_capacity == 0
|
||||||
|
|
||||||
|
instance.save_health_data(errors='')
|
||||||
|
assert not instance.errors
|
||||||
|
assert instance.capacity == 0
|
||||||
|
assert instance.memory == instance.mem_capacity == 0
|
||||||
|
assert instance.cpu == instance.cpu_capacity == 0
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
class TestInstanceGroupOrdering:
|
class TestInstanceGroupOrdering:
|
||||||
def test_ad_hoc_instance_groups(self, instance_group_factory, inventory, default_instance_group):
|
def test_ad_hoc_instance_groups(self, instance_group_factory, inventory, default_instance_group):
|
||||||
|
|||||||
Reference in New Issue
Block a user