Capture hop nodes and links in the automatic discovery machinery

Also, make sure that the control service is turned on in the dev
environment's hop node, so that it shows up in the Advertisements
list.
This commit is contained in:
Jeff Bradberry
2021-12-20 15:23:32 -05:00
parent 4449555abe
commit ce5aefd3d8
2 changed files with 28 additions and 12 deletions

View File

@@ -67,6 +67,7 @@ from awx.main.models import (
TowerScheduleState, TowerScheduleState,
Instance, Instance,
InstanceGroup, InstanceGroup,
InstanceLink,
UnifiedJob, UnifiedJob,
Notification, Notification,
Inventory, Inventory,
@@ -495,28 +496,29 @@ def execution_node_health_check(node):
def inspect_execution_nodes(instance_list): def inspect_execution_nodes(instance_list):
with advisory_lock('inspect_execution_nodes_lock', wait=False): with advisory_lock('inspect_execution_nodes_lock', wait=False):
node_lookup = {} node_lookup = {inst.hostname: inst for inst in instance_list}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
ctl = get_receptor_ctl() ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements'] mesh_status = ctl.simple_command('status')
nowtime = now() nowtime = now()
for ad in connections: workers = mesh_status['Advertisements']
for ad in workers:
hostname = ad['NodeID'] hostname = ad['NodeID']
commands = ad.get('WorkCommands') or [] if ad.get('WorkCommands') is None:
worktypes = [] node_type = 'hop'
for c in commands: elif any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []):
worktypes.append(c["WorkType"]) node_type = 'execution'
if 'ansible-runner' not in worktypes: else:
continue continue
changed = False changed = False
if hostname in node_lookup: if hostname in node_lookup:
instance = node_lookup[hostname] instance = node_lookup[hostname]
elif settings.MESH_AUTODISCOVERY_ENABLED: elif settings.MESH_AUTODISCOVERY_ENABLED:
defaults = dict(enabled=False) defaults = dict(enabled=False)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults) (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, defaults=defaults)
node_lookup[hostname] = instance
logger.warn(f"Registered execution node '{hostname}' (marked disabled by default)") logger.warn(f"Registered execution node '{hostname}' (marked disabled by default)")
else: else:
logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}") logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}")
@@ -546,6 +548,17 @@ def inspect_execution_nodes(instance_list):
logger.debug(f'Restarting health check for execution node {hostname} with known errors.') logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname]) execution_node_health_check.apply_async([hostname])
links = {tuple(sorted((node, peer))) for node, peers in mesh_status['KnownConnectionCosts'].items() for peer in peers}
for a, b in links:
if a not in node_lookup:
logger.warn(f"Cannot link {a} to {b}: {a} not registered.")
continue
if b not in node_lookup:
logger.warn(f"Cannot link {a} to {b}: {b} not registered.")
continue
a_obj, b_obj = node_lookup[a], node_lookup[b]
InstanceLink.objects.get_or_create(source=a_obj, target=b_obj)
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def cluster_node_heartbeat(): def cluster_node_heartbeat():

View File

@@ -10,3 +10,6 @@
- tcp-listener: - tcp-listener:
port: 5555 port: 5555
- control-service:
service: control