diff --git a/awx/main/tasks.py b/awx/main/tasks.py index aaa09485e8..62e056f738 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -429,47 +429,57 @@ def execution_node_health_check(node): def inspect_execution_nodes(instance_list): - node_lookup = {} - for inst in instance_list: - if inst.node_type == 'execution': - node_lookup[inst.hostname] = inst + with advisory_lock('inspect_execution_nodes_lock', wait=True): + node_lookup = {} + for inst in instance_list: + if inst.node_type == 'execution': + node_lookup[inst.hostname] = inst - ctl = get_receptor_ctl() - connections = ctl.simple_command('status')['Advertisements'] - nowtime = now() - for ad in connections: - hostname = ad['NodeID'] - commands = ad.get('WorkCommands') or [] - if 'ansible-runner' not in commands: - continue - changed = False - if hostname in node_lookup: - instance = node_lookup[hostname] - else: - defaults = dict(enabled=False) - (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults) - was_lost = instance.is_lost(ref_time=nowtime) - last_seen = parse_date(ad['Time']) + ctl = get_receptor_ctl() + connections = ctl.simple_command('status')['Advertisements'] + nowtime = now() + for ad in connections: + hostname = ad['NodeID'] + commands = ad.get('WorkCommands') or [] + if 'ansible-runner' not in commands: + continue + changed = False + if hostname in node_lookup: + instance = node_lookup[hostname] + else: + defaults = dict(enabled=False) + (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults) + logger.warn("Registered execution node '{}' (marked as disabled by default)".format(hostname)) - if instance.last_seen and instance.last_seen >= last_seen: - continue - instance.last_seen = last_seen - instance.save(update_fields=['last_seen']) + default_ig = InstanceGroup.objects.get(name='default') + if instance.hostname not in default_ig.policy_instance_list: + default_ig.policy_instance_list += [instance.hostname] + default_ig.save() + logger.warn("Updated `default` instance group's policy_instance_list to include execution node '{}'".format(hostname)) + else: + logger.warn("`default` instance group's policy_instance_list already listed execution node '{}'".format(hostname)) - if changed: - logger.warn("Registered execution node '{}'".format(hostname)) - execution_node_health_check.apply_async([hostname]) - elif was_lost: - # if the instance *was* lost, but has appeared again, - # attempt to re-establish the initial capacity and version - # check - logger.warn(f'Execution node attempting to rejoin as instance {hostname}.') - execution_node_health_check.apply_async([hostname]) - elif instance.capacity == 0: - # Periodically re-run the health check of errored nodes, in case someone fixed it - # TODO: perhaps decrease the frequency of these checks - logger.debug(f'Restarting health check for execution node {hostname} with known errors.') - execution_node_health_check.apply_async([hostname]) + was_lost = instance.is_lost(ref_time=nowtime) + last_seen = parse_date(ad['Time']) + + if instance.last_seen and instance.last_seen >= last_seen: + continue + instance.last_seen = last_seen + instance.save(update_fields=['last_seen']) + + if changed: + execution_node_health_check.apply_async([hostname]) + elif was_lost: + # if the instance *was* lost, but has appeared again, + # attempt to re-establish the initial capacity and version + # check + logger.warn(f'Execution node attempting to rejoin as instance {hostname}.') + execution_node_health_check.apply_async([hostname]) + elif instance.capacity == 0: + # Periodically re-run the health check of errored nodes, in case someone fixed it + # TODO: perhaps decrease the frequency of these checks + logger.debug(f'Restarting health check for execution node {hostname} with known errors.') + execution_node_health_check.apply_async([hostname]) @task(queue=get_local_queuename)