When deprovisioning a node, kick off a task that waits on running jobs

After all jobs on the node are complete, delete the node then
broadcast the write_receptor_config task.

Also, make sure that write_receptor_config updates the state of links
that are in 'adding' state.
This commit is contained in:
Jeff Bradberry 2022-08-29 16:40:08 -04:00
parent b6946c7e35
commit 1b650d6927
2 changed files with 54 additions and 26 deletions

View File

@ -423,12 +423,19 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs
@receiver(post_save, sender=Instance)
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
# TODO: handle update to instance
if settings.IS_K8S and created and instance.node_type in (Instance.Types.EXECUTION,):
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,):
if instance.node_state == Instance.States.DEPROVISIONING:
from awx.main.tasks.receptor import wait_for_jobs # prevents circular import
# on commit broadcast to all control instance to update their receptor configs
connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all'))
# wait for jobs on the node to complete, then delete the
# node and kick off write_receptor_config
connection.on_commit(lambda: wait_for_jobs.apply_async(instance.hostname))
if instance.node_state == Instance.States.INSTALLED:
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
# broadcast to all control instances to update their receptor configs
connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all'))
if created or instance.has_policy_changes():
schedule_policy_task()

View File

@ -27,7 +27,7 @@ from awx.main.utils.common import (
)
from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER
from awx.main.tasks.signals import signal_state, signal_callback, SignalExit
from awx.main.models import Instance
from awx.main.models import Instance, InstanceLink, UnifiedJob
from awx.main.dispatch.publish import task
# Receptorctl
@ -639,29 +639,50 @@ RECEPTOR_CONFIG_STARTER = (
@task()
def write_receptor_config():
receptor_config = list(RECEPTOR_CONFIG_STARTER)
instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION)
for instance in instances:
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
receptor_config.append(peer)
lock = FileLock(__RECEPTOR_CONF_LOCKFILE)
with lock:
receptor_config = list(RECEPTOR_CONFIG_STARTER)
this_inst = Instance.objects.me()
instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION)
for instance in instances:
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
receptor_config.append(peer)
with open(__RECEPTOR_CONF, 'w') as file:
yaml.dump(receptor_config, file, default_flow_style=False)
receptor_ctl = get_receptor_ctl()
receptor_ctl = get_receptor_ctl()
attempts = 10
backoff = 1
for attempt in range(attempts):
try:
receptor_ctl.simple_command("reload")
break
except ValueError:
logger.warning(f"Unable to reload Receptor configuration. {attempts-attempt} attempts left.")
time.sleep(backoff)
backoff += 1
else:
raise RuntimeError("Receptor reload failed")
attempts = 10
for backoff in range(1, attempts + 1):
try:
receptor_ctl.simple_command("reload")
break
except ValueError:
logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.")
time.sleep(backoff)
else:
raise RuntimeError("Receptor reload failed")
links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING)
links.update(link_state=InstanceLink.States.ESTABLISHED)
@task()
def wait_for_jobs(hostname):
node_jobs = UnifiedJob.objects.filter(
execution_node=hostname,
status__in=(
'running',
'waiting',
),
)
while node_jobs.exists():
time.sleep(60)
# This will as a side effect also delete the InstanceLinks that are tied to it.
Instance.objects.filter(hostname=hostname).delete()
# Update the receptor configs for all of the control-plane.
write_receptor_config.apply_async(queue='tower_broadcast_all')