mirror of
https://github.com/ansible/awx.git
synced 2026-06-27 01:18:02 -02:30
Add functional API tests
add tests for calling write_receptor_config add write_receptor_config test Do not set default listener_port on control node
This commit is contained in:
@@ -30,6 +30,7 @@ from awx.main.tasks.signals import signal_state, signal_callback, SignalExit
|
||||
from awx.main.models import Instance, InstanceLink, UnifiedJob
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
|
||||
# Receptorctl
|
||||
from receptorctl.socket_interface import ReceptorControl
|
||||
@@ -675,55 +676,80 @@ RECEPTOR_CONFIG_STARTER = (
|
||||
)
|
||||
|
||||
|
||||
def should_update_config(instances):
|
||||
'''
|
||||
checks that the list of instances matches the list of
|
||||
tcp-peers in the config
|
||||
'''
|
||||
current_config = read_receptor_config() # this gets receptor conf lock
|
||||
current_peers = []
|
||||
for config_entry in current_config:
|
||||
for key, value in config_entry.items():
|
||||
if key.endswith('-peer'):
|
||||
current_peers.append(value['address'])
|
||||
intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances]
|
||||
# TODO remove this logging line
|
||||
logger.warning(f"current {current_peers} intended {intended_peers}")
|
||||
if set(current_peers) == set(intended_peers):
|
||||
return False # config file is already update to date
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def generate_config_data():
|
||||
# returns two values
|
||||
# receptor config - based on current database peers
|
||||
# should_update - If True, receptor_config differs from the receptor conf file on disk
|
||||
instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True)
|
||||
|
||||
receptor_config = list(RECEPTOR_CONFIG_STARTER)
|
||||
for instance in instances:
|
||||
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
|
||||
receptor_config.append(peer)
|
||||
should_update = should_update_config(instances)
|
||||
return receptor_config, should_update
|
||||
|
||||
|
||||
def reload_receptor():
|
||||
logger.warning("Receptor config changed, reloading receptor")
|
||||
|
||||
# This needs to be outside of the lock because this function itself will acquire the lock.
|
||||
receptor_ctl = get_receptor_ctl()
|
||||
|
||||
attempts = 10
|
||||
for backoff in range(1, attempts + 1):
|
||||
try:
|
||||
receptor_ctl.simple_command("reload")
|
||||
break
|
||||
except ValueError:
|
||||
logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.")
|
||||
time.sleep(backoff)
|
||||
else:
|
||||
raise RuntimeError("Receptor reload failed")
|
||||
|
||||
|
||||
@task()
|
||||
def write_receptor_config(force=False):
|
||||
def write_receptor_config():
|
||||
"""
|
||||
only control nodes will run this
|
||||
force=True means to call receptorctl reload
|
||||
This task runs async on each control node, K8S only.
|
||||
It is triggered whenever remote is added or removed, or if peers_from_control_nodes
|
||||
is flipped.
|
||||
It is possible for write_receptor_config to be called multiple times.
|
||||
For example, if new instances are added in quick succession.
|
||||
To prevent that case, each control node first grabs a DB advisory lock, specific
|
||||
to just that control node (i.e. multiple control nodes can run this function
|
||||
at the same time, since it only writes the local receptor config file)
|
||||
"""
|
||||
lock = FileLock(__RECEPTOR_CONF_LOCKFILE)
|
||||
with lock:
|
||||
receptor_config = list(RECEPTOR_CONFIG_STARTER)
|
||||
with advisory_lock(f"{settings.CLUSTER_HOST_ID}_write_receptor_config", wait=True):
|
||||
# Config file needs to be updated
|
||||
receptor_config, should_update = generate_config_data()
|
||||
if should_update:
|
||||
lock = FileLock(__RECEPTOR_CONF_LOCKFILE)
|
||||
with lock:
|
||||
with open(__RECEPTOR_CONF, 'w') as file:
|
||||
yaml.dump(receptor_config, file, default_flow_style=False)
|
||||
|
||||
this_inst = Instance.objects.me()
|
||||
instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP))
|
||||
existing_peers = this_inst.peers.all()
|
||||
|
||||
links_added = []
|
||||
links_removed = False
|
||||
for instance in instances:
|
||||
if not instance.peers_from_control_nodes and instance in existing_peers:
|
||||
this_inst.peers.remove(instance)
|
||||
links_removed = True
|
||||
if instance.peers_from_control_nodes:
|
||||
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
|
||||
receptor_config.append(peer)
|
||||
if instance not in existing_peers:
|
||||
links_added.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING))
|
||||
|
||||
InstanceLink.objects.bulk_create(links_added)
|
||||
|
||||
with open(__RECEPTOR_CONF, 'w') as file:
|
||||
yaml.dump(receptor_config, file, default_flow_style=False)
|
||||
|
||||
if force or links_removed or links_added:
|
||||
logger.debug("Receptor config changed, reloading receptor")
|
||||
# This needs to be outside of the lock because this function itself will acquire the lock.
|
||||
receptor_ctl = get_receptor_ctl()
|
||||
|
||||
attempts = 10
|
||||
for backoff in range(1, attempts + 1):
|
||||
try:
|
||||
receptor_ctl.simple_command("reload")
|
||||
break
|
||||
except ValueError:
|
||||
logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.")
|
||||
time.sleep(backoff)
|
||||
else:
|
||||
raise RuntimeError("Receptor reload failed")
|
||||
|
||||
links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING)
|
||||
links.update(link_state=InstanceLink.States.ESTABLISHED)
|
||||
reload_receptor()
|
||||
|
||||
|
||||
@task(queue=get_task_queuename)
|
||||
@@ -743,6 +769,3 @@ def remove_deprovisioned_node(hostname):
|
||||
|
||||
# This will as a side effect also delete the InstanceLinks that are tied to it.
|
||||
Instance.objects.filter(hostname=hostname).delete()
|
||||
|
||||
# Update the receptor configs for all of the control-plane.
|
||||
write_receptor_config.apply_async(queue='tower_broadcast_all')
|
||||
|
||||
Reference in New Issue
Block a user