From 1b650d69270d700602483bf4f97572579fce0cf6 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 29 Aug 2022 16:40:08 -0400 Subject: [PATCH] When deprovisioning a node, kick off a task that waits on running jobs After all jobs on the node are complete, delete the node then broadcast the write_receptor_config task. Also, make sure that write_receptor_config updates the state of links that are in 'adding' state. --- awx/main/models/ha.py | 17 +++++++--- awx/main/tasks/receptor.py | 63 +++++++++++++++++++++++++------------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index e09b7cb9d4..cd6313ecaa 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -423,12 +423,19 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs @receiver(post_save, sender=Instance) def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): - # TODO: handle update to instance - if settings.IS_K8S and created and instance.node_type in (Instance.Types.EXECUTION,): - from awx.main.tasks.receptor import write_receptor_config # prevents circular import + if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,): + if instance.node_state == Instance.States.DEPROVISIONING: + from awx.main.tasks.receptor import wait_for_jobs # prevents circular import - # on commit broadcast to all control instance to update their receptor configs - connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) + # wait for jobs on the node to complete, then delete the + # node and kick off write_receptor_config + connection.on_commit(lambda: wait_for_jobs.apply_async(instance.hostname)) + + if instance.node_state == Instance.States.INSTALLED: + from awx.main.tasks.receptor import write_receptor_config # prevents circular import + + # broadcast to all control instances to update their receptor configs + connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) if created or instance.has_policy_changes(): schedule_policy_task() diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 95f211bed2..11a896ee1a 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -27,7 +27,7 @@ from awx.main.utils.common import ( ) from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER from awx.main.tasks.signals import signal_state, signal_callback, SignalExit -from awx.main.models import Instance +from awx.main.models import Instance, InstanceLink, UnifiedJob from awx.main.dispatch.publish import task # Receptorctl @@ -639,29 +639,50 @@ RECEPTOR_CONFIG_STARTER = ( @task() def write_receptor_config(): - receptor_config = list(RECEPTOR_CONFIG_STARTER) - - instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION) - for instance in instances: - peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} - receptor_config.append(peer) - lock = FileLock(__RECEPTOR_CONF_LOCKFILE) with lock: + receptor_config = list(RECEPTOR_CONFIG_STARTER) + + this_inst = Instance.objects.me() + instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION) + for instance in instances: + peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} + receptor_config.append(peer) + with open(__RECEPTOR_CONF, 'w') as file: yaml.dump(receptor_config, file, default_flow_style=False) - receptor_ctl = get_receptor_ctl() + receptor_ctl = get_receptor_ctl() - attempts = 10 - backoff = 1 - for attempt in range(attempts): - try: - receptor_ctl.simple_command("reload") - break - except ValueError: - logger.warning(f"Unable to reload Receptor configuration. {attempts-attempt} attempts left.") - time.sleep(backoff) - backoff += 1 - else: - raise RuntimeError("Receptor reload failed") + attempts = 10 + for backoff in range(1, attempts + 1): + try: + receptor_ctl.simple_command("reload") + break + except ValueError: + logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.") + time.sleep(backoff) + else: + raise RuntimeError("Receptor reload failed") + + links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING) + links.update(link_state=InstanceLink.States.ESTABLISHED) + + +@task() +def wait_for_jobs(hostname): + node_jobs = UnifiedJob.objects.filter( + execution_node=hostname, + status__in=( + 'running', + 'waiting', + ), + ) + while node_jobs.exists(): + time.sleep(60) + + # This will as a side effect also delete the InstanceLinks that are tied to it. + Instance.objects.filter(hostname=hostname).delete() + + # Update the receptor configs for all of the control-plane. + write_receptor_config.apply_async(queue='tower_broadcast_all')