add auto-discovered nodes to default IG

* add advisory_lock to avoid IG update race logic
* update IG by way of policy_instance_list
This commit is contained in:
Jim Ladd 2021-08-24 17:55:09 -07:00
parent 561fc289fb
commit f317fca9e4

View File

@ -429,47 +429,57 @@ def execution_node_health_check(node):
def inspect_execution_nodes(instance_list):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
with advisory_lock('inspect_execution_nodes_lock', wait=True):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad.get('WorkCommands') or []
if 'ansible-runner' not in commands:
continue
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
defaults = dict(enabled=False)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults)
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad.get('WorkCommands') or []
if 'ansible-runner' not in commands:
continue
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
defaults = dict(enabled=False)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults)
logger.warn("Registered execution node '{}' (marked as disabled by default)".format(hostname))
if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
instance.save(update_fields=['last_seen'])
default_ig = InstanceGroup.objects.get(name='default')
if instance.hostname not in default_ig.policy_instance_list:
default_ig.policy_instance_list += [instance.hostname]
default_ig.save()
logger.warn("Updated `default` instance group's policy_instance_list to include execution node '{}'".format(hostname))
else:
logger.warn("`default` instance group's policy_instance_list already listed execution node '{}'".format(hostname))
if changed:
logger.warn("Registered execution node '{}'".format(hostname))
execution_node_health_check.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname])
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
instance.save(update_fields=['last_seen'])
if changed:
execution_node_health_check.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename)