From 3b1e40d227cc22510d03fe6b375a17e43c6b0661 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 11 Aug 2021 10:14:20 -0400 Subject: [PATCH] Use the ansible-runner worker --worker-info to perform execution node capacity checks (#10825) * Introduce utilities for --worker-info health check integration * Handle case where ansible-runner is not installed * Add ttl parameter for health check * Reformulate return data structure and add lots of error cases * Move up the cleanup tasks, close sockets * Integrate new --worker-info into the execution node capacity check * Undo the raw value override from the PoC * Additional refinement to execution node check frequency * Put in more complete network diagram * Followup on comment to remove modified from from health check responsibilities --- awx/main/models/ha.py | 7 ++ awx/main/tasks.py | 208 +++++++++++-------------------------- awx/main/utils/common.py | 53 ++++++---- awx/main/utils/receptor.py | 77 ++++++++++++++ docs/receptor_mesh.md | 45 ++++++-- 5 files changed, 212 insertions(+), 178 deletions(-) create mode 100644 awx/main/utils/receptor.py diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index ebef8dc7be..aaf0b990ec 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -131,6 +131,13 @@ class Instance(HasPolicyEditsMixin, BaseModel): grace_period = 120 return self.modified < ref_time - timedelta(seconds=grace_period) + def mark_offline(self, on_good_terms=False): + self.cpu = self.cpu_capacity = self.memory = self.mem_capacity = self.capacity = 0 + update_fields = ['capacity', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'] + if on_good_terms: + update_fields.append('modified') + self.save() + def refresh_capacity(self): cpu = get_cpu_capacity() mem = get_mem_capacity() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6ca32e83b2..56f10dacc0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -52,9 +52,6 @@ from gitdb.exc import BadName as BadGitName # Runner import ansible_runner -# Receptor -from receptorctl.socket_interface import ReceptorControl - # dateutil from dateutil.parser import parse as parse_date @@ -85,7 +82,7 @@ from awx.main.models import ( SystemJobEvent, build_safe_env, ) -from awx.main.constants import ACTIVE_STATES, RECEPTOR_PENDING +from awx.main.constants import ACTIVE_STATES from awx.main.exceptions import AwxTaskError, PostRunError from awx.main.queue import CallbackQueueDispatcher from awx.main.dispatch.publish import task @@ -101,17 +98,18 @@ from awx.main.utils.common import ( parse_yaml_or_json, cleanup_new_process, create_partition, - get_cpu_capacity, - get_mem_capacity, + get_cpu_effective_capacity, + get_mem_effective_capacity, get_system_task_capacity, ) -from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec, CONTAINER_ROOT, to_container_path +from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path from awx.main.utils.ansible import read_ansible_config from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler +from awx.main.utils.receptor import get_receptor_ctl, worker_info from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -182,8 +180,7 @@ def dispatch_startup(): def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) - this_inst.capacity = 0 # No thank you to new jobs while shut down - this_inst.save(update_fields=['capacity', 'modified']) + this_inst.mark_offline(on_good_terms=True) # No thank you to new jobs while shut down try: reaper.reap(this_inst) except Exception: @@ -407,7 +404,34 @@ def cleanup_execution_environment_images(): @task(queue=get_local_queuename) def check_heartbeat(node): - AWXReceptorJob.check_heartbeat(node) + try: + instance = Instance.objects.get(hostname=node) + except Instance.DoesNotExist: + logger.warn(f'Instance record for {node} missing, could not check capacity.') + return + data = worker_info(node) + + if data['Errors']: + formatted_error = "\n".join(data["Errors"]) + if instance.capacity: + logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}') + else: + logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}') + instance.mark_offline() + else: + # TODO: spin off new instance method from refresh_capacity that calculates derived fields + instance.cpu = data['CPU Capacity'] # TODO: rename field on runner side to not say "Capacity" + instance.cpu_capacity = get_cpu_effective_capacity(instance.cpu) + instance.memory = data['Memory Capacity'] * 1000 # TODO: double-check the multiplier here + instance.mem_capacity = get_mem_effective_capacity(instance.memory) + instance.capacity = get_system_task_capacity( + instance.capacity_adjustment, + instance.cpu_capacity, + instance.mem_capacity, + ) + instance.version = 'ansible-runner-' + data['Version'] + instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) + logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2))) def discover_receptor_nodes(): @@ -421,25 +445,30 @@ def discover_receptor_nodes(): continue (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution') was_lost = instance.is_lost(ref_time=nowtime) - if changed: - logger.info("Registered execution node '{}'".format(hostname)) - check_heartbeat.apply_async([hostname]) - else: - last_seen = parse_date(ad['Time']) - logger.debug("Updated execution node '{}' modified from {} to {}".format(hostname, instance.modified, last_seen)) - instance.modified = last_seen - if instance.is_lost(ref_time=nowtime): - # if the instance hasn't advertised in awhile, - # don't save a new modified time - continue + last_seen = parse_date(ad['Time']) + if instance.modified == last_seen: + continue + instance.modified = last_seen + if instance.is_lost(ref_time=nowtime): + # if the instance hasn't advertised in awhile, don't save a new modified time + # this is so multiple cluster nodes do all make repetitive updates + continue - instance.save(update_fields=['modified']) - if was_lost: - # if the instance *was* lost, but has appeared again, - # attempt to re-establish the initial capacity and version - # check - logger.warning('Execution node attempting to rejoin as instance {}.'.format(hostname)) - check_heartbeat.apply_async([hostname]) + instance.save(update_fields=['modified']) + if changed: + logger.warn("Registered execution node '{}'".format(hostname)) + check_heartbeat.apply_async([hostname]) + elif was_lost: + # if the instance *was* lost, but has appeared again, + # attempt to re-establish the initial capacity and version + # check + logger.warn(f'Execution node attempting to rejoin as instance {hostname}.') + check_heartbeat.apply_async([hostname]) + elif instance.capacity == 0: + # Periodically re-run the health check of errored nodes, in case someone fixed it + # TODO: perhaps decrease the frequency of these checks + logger.debug(f'Restarting health check for execution node {hostname} with known errors.') + check_heartbeat.apply_async([hostname]) @task(queue=get_local_queuename) @@ -460,7 +489,7 @@ def cluster_node_heartbeat(): if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst instance_list.remove(inst) - elif inst.node_type == 'execution': + elif inst.node_type == 'execution': # TODO: zero out capacity of execution nodes that are MIA # Only considering control plane for this logic continue elif inst.is_lost(ref_time=nowtime): @@ -504,8 +533,7 @@ def cluster_node_heartbeat(): # If auto deprovisining is on, don't bother setting the capacity to 0 # since we will delete the node anyway. if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES: - other_inst.capacity = 0 - other_inst.save(update_fields=['capacity']) + other_inst.mark_offline() logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified)) elif settings.AWX_AUTO_DEPROVISION_INSTANCES: deprovision_hostname = other_inst.hostname @@ -802,10 +830,6 @@ def with_path_cleanup(f): return _wrapped -def get_receptor_ctl(): - return ReceptorControl('/var/run/receptor/receptor.sock') - - class BaseTask(object): model = None event_model = None @@ -2988,120 +3012,6 @@ class AWXReceptorJob: if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK: receptor_ctl.simple_command(f"work release {self.unit_id}") - @classmethod - def check_heartbeat(cls, node): # TODO: rename most of these "heartbeat" things - logger.info(f'Checking capacity of execution node {node}') - # make a private data dir and env dir - private_data_dir = tempfile.mkdtemp(prefix='awx_heartbeat_', dir=settings.AWX_ISOLATION_BASE_PATH) - env_path = os.path.join(private_data_dir, 'env') - os.makedirs(os.path.join(env_path), mode=0o700) - # write a cmdline file for adhoc - f = os.fdopen(os.open(os.path.join(env_path, 'cmdline'), os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE), 'w') - f.write(ansible_runner.utils.args2cmdline('localhost')) - f.close() - # write a custom facts.d to report the runner version - facts_path = os.path.join(private_data_dir, 'facts.d') - os.makedirs(facts_path, mode=0o700) - with open(os.path.join(facts_path, 'ansible_runner.fact'), 'w') as f: - os.chmod(f.name, 0o700) - f.write("""#!/usr/bin/env sh\necho "{\\"version\\": \\"`ansible-runner --version`\\"}"\n""") # noqa - # write a local inventory - inventory_path = os.path.join(private_data_dir, 'inventory') - os.makedirs(inventory_path, mode=0o700) - fn = os.path.join(inventory_path, 'hosts') - with open(fn, 'w') as f: - os.chmod(fn, stat.S_IRUSR | stat.S_IXUSR | stat.S_IWUSR) - f.write('localhost ansible_connection=local') - # we have to create the project directory because it is --workdir and crun needs it to exist - # https://github.com/ansible/ansible-runner/issues/758 - project_path = os.path.join(private_data_dir, 'project') - os.makedirs(project_path, mode=0o700) - - runner_params = { - 'ident': str(uuid4()), - 'private_data_dir': private_data_dir, - 'module': 'setup', - 'module_args': f'fact_path={private_data_dir}/facts.d', - 'inventory': inventory_path, - 'only_transmit_kwargs': False, - 'settings': { - "container_image": get_default_execution_environment().image, - "container_options": ['--user=root'], - "process_isolation": True, - }, - } - - class _Instance(object): - pk = -1 - job_env = {} - - @property - def is_container_group_task(self): - return False - - @property - def execution_node(self): - return node - - class _BaseTask(object): - instance = _Instance() - cpus = 0 - mem_mb = 0 - version = RECEPTOR_PENDING - - def build_execution_environment_params(self, instance, private_data_dir): - return {} - - def event_handler(self, event_data): - if event_data.get('event') == 'runner_on_ok': - facts = event_data.get('event_data', {}).get('res', {}).get('ansible_facts', {}) - if facts: - self.cpus = facts.get('ansible_processor_vcpus', 0) - self.mem_mb = facts.get('ansible_memtotal_mb', 0) - version = facts.get('ansible_local', {}).get('ansible_runner', {}).get('version', '') # noqa - if version: - self.version = f'ansible-runner-{version}' - # TODO: save event_data["stdout"] and log when errors happen - - def finished_callback(self, runner_obj): - pass - - def cancel_callback(self): - pass - - def status_handler(self, status_data, runner_config): - # TODO: log error cases - pass - - def update_model(self, *args, **kw): - pass - - task = _BaseTask() - receptor_job = cls(task, runner_params) - res = receptor_job.run(work_type='ansible-runner') - if res.status == 'successful': - cpu = get_cpu_capacity(task.cpus) - mem = get_mem_capacity(task.mem_mb * 1000000) - logger.info(f'Calculated memory capacity: {task.mem_mb}, out: {mem}') - instance = Instance.objects.get(hostname=node) - instance.cpu = cpu[0] - instance.cpu_capacity = cpu[1] - instance.memory = mem[0] - instance.mem_capacity = mem[1] - instance.capacity = get_system_task_capacity( - instance.capacity_adjustment, - instance.cpu_capacity, - instance.mem_capacity, - ) - instance.version = task.version - instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) - logger.info(f'Updated capacity of {node} to cpu: {instance.cpu_capacity} mem: {instance.mem_capacity}') - else: - # TODO: error handling like we do with jobs - # receptorctl work results - # receptorctl work list - logger.info(f'Capacity check not successful for execution node {node}') - def _run_internal(self, receptor_ctl, work_type=None): # Create a socketpair. Where the left side will be used for writing our payload # (private data dir, kwargs). The right side will be passed to Receptor for diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index e034dda02c..9194fa51d9 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -699,12 +699,25 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -def get_cpu_capacity(raw=None): +def get_cpu_effective_capacity(cpu_count): from django.conf import settings settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) + if env_forkcpu: + forkcpu = int(env_forkcpu) + elif settings_forkcpu: + forkcpu = int(settings_forkcpu) + else: + forkcpu = 4 + + return cpu_count * forkcpu + + +def get_cpu_capacity(): + from django.conf import settings + settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None) @@ -713,24 +726,30 @@ def get_cpu_capacity(raw=None): elif settings_abscpu is not None: return 0, int(settings_abscpu) - if raw is None: - raw = psutil.cpu_count() + cpu = psutil.cpu_count() - if env_forkcpu: - forkcpu = int(env_forkcpu) - elif settings_forkcpu: - forkcpu = int(settings_forkcpu) - else: - forkcpu = 4 - return (raw, raw * forkcpu) + return (cpu, get_cpu_effective_capacity(cpu)) -def get_mem_capacity(raw_mb=None): +def get_mem_effective_capacity(mem_mb): from django.conf import settings settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + if env_forkmem: + forkmem = int(env_forkmem) + elif settings_forkmem: + forkmem = int(settings_forkmem) + else: + forkmem = 100 + + return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem) + + +def get_mem_capacity(): + from django.conf import settings + settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None) @@ -739,16 +758,8 @@ def get_mem_capacity(raw_mb=None): elif settings_absmem is not None: return 0, int(settings_absmem) - if env_forkmem: - forkmem = int(env_forkmem) - elif settings_forkmem: - forkmem = int(settings_forkmem) - else: - forkmem = 100 - - if raw_mb is None: - raw_mb = psutil.virtual_memory().total - return (raw_mb, max(1, ((raw_mb // 1024 // 1024) - 2048) // forkmem)) + mem = psutil.virtual_memory().total + return (mem, get_mem_effective_capacity(mem)) def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None): diff --git a/awx/main/utils/receptor.py b/awx/main/utils/receptor.py new file mode 100644 index 0000000000..8a205334c1 --- /dev/null +++ b/awx/main/utils/receptor.py @@ -0,0 +1,77 @@ +import logging +import yaml +import time + +from receptorctl.socket_interface import ReceptorControl + + +logger = logging.getLogger('awx.main.utils.receptor') + + +def get_receptor_ctl(): + return ReceptorControl('/var/run/receptor/receptor.sock') + + +def worker_info(node_name): + receptor_ctl = get_receptor_ctl() + transmit_start = time.time() + error_list = [] + data = {'Errors': error_list, 'transmit_timing': 0.0} + + result = receptor_ctl.submit_work(worktype='ansible-runner', payload='', params={"params": f"--worker-info"}, ttl='20s', node=node_name) + + unit_id = result['unitid'] + run_start = time.time() + data['transmit_timing'] = run_start - transmit_start + data['run_timing'] = 0.0 + + try: + + resultfile = receptor_ctl.get_work_results(unit_id) + + stdout = '' + + while data['run_timing'] < 20.0: + status = receptor_ctl.simple_command(f'work status {unit_id}') + state_name = status.get('StateName') + if state_name not in ('Pending', 'Running'): + break + data['run_timing'] = time.time() - run_start + time.sleep(0.5) + else: + error_list.append(f'Timeout getting worker info on {node_name}, state remains in {state_name}') + + stdout = resultfile.read() + stdout = str(stdout, encoding='utf-8') + + finally: + + res = receptor_ctl.simple_command(f"work release {unit_id}") + if res != {'released': unit_id}: + logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node_name}, data: {res}') + + receptor_ctl.close() + + if state_name.lower() == 'failed': + work_detail = status.get('Detail', '') + if not work_detail.startswith('exit status'): + error_list.append(f'Receptor error getting worker info from {node_name}, detail:\n{work_detail}') + elif 'unrecognized arguments: --worker-info' in stdout: + error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info') + else: + error_list.append(f'Unknown ansible-runner error on node {node_name}, stdout:\n{stdout}') + else: + yaml_stdout = stdout.strip() + remote_data = {} + try: + remote_data = yaml.safe_load(yaml_stdout) + except Exception as json_e: + error_list.append(f'Failed to parse node {node_name} --worker-info output as YAML, error: {json_e}, data:\n{yaml_stdout}') + + if not isinstance(remote_data, dict): + error_list.append(f'Remote node {node_name} --worker-info output is not a YAML dict, output:{stdout}') + else: + error_list.extend(remote_data.pop('Errors')) # merge both error lists + data.update(remote_data) + + return data diff --git a/docs/receptor_mesh.md b/docs/receptor_mesh.md index f1023eb7f9..78c893b119 100644 --- a/docs/receptor_mesh.md +++ b/docs/receptor_mesh.md @@ -72,9 +72,28 @@ Control nodes check the receptor network (reported via `receptorctl status`) whe Nodes on the receptor network are compared against the `Instance` model in the database. If a node appears in the mesh network which is not in the database, then a "health check" is started. -This will submit a work unit to the execution node which then outputs important node data via `ansible-runner`. The `capacity` field will obtain a non-zero value through this process, which is necessary to run jobs. +#### Health Check Mechanics + +All relevant data for health checks is reported from the ansible-runner command: + +``` +ansible-runner worker --worker-info +``` + +This will output YAML data to standard out containing CPU, memory, and other metrics used to compute `capacity`. + +AWX invokes this command by submitting a receptor work unit (of type `ansible-runner`) to the target execution node. +If you have the development environment running, you can run a one-off health check of a node with this command: + +``` +echo "from awx.main.utils.receptor import worker_info; worker_info('receptor-1')" | awx-manage shell_plus --quiet +``` + +This must be ran as the awx user inside one of the hybrid or control nodes. +This will not affect actual `Instance` record, but will just run the command and report the data. + ### Development Environment A "toy" cluster with execution nodes and a hop node is created by the docker-compose Makefile target. @@ -89,13 +108,23 @@ This will spin up a topology represented below. (names are the receptor node names, which differ from the AWX Instance names and network address in some cases) ``` ----v - -----v - - - ^-------------- - ^-------------- - ^-------------- + ┌──────────────┐ + │ │ +┌──────────────┐ ┌──────────┤ receptor-1 │ +│ │ │ │ │ +│ awx_1 │◄──────────┐ │ └──────────────┘ +│ │ │ ▼ +└──────┬───────┘ ┌──────┴───────┐ ┌──────────────┐ + │ │ │ │ │ + │ │ receptor-hop │◄───────┤ receptor-2 │ + ▼ │ │ │ │ +┌──────────────┐ └──────────────┘ └──────────────┘ +│ │ ▲ +│ awx_2 │ │ ┌──────────────┐ +│ │ │ │ │ +└──────────────┘ └──────────┤ receptor-3 │ + │ │ + └──────────────┘ ``` All execution (`receptor-*`) nodes connect to the hop node.