Use the ansible-runner worker --worker-info to perform execution node capacity checks (#10825)

* Introduce utilities for --worker-info health check integration

* Handle case where ansible-runner is not installed

* Add ttl parameter for health check

* Reformulate return data structure and add lots of error cases

* Move up the cleanup tasks, close sockets

* Integrate new --worker-info into the execution node capacity check

* Undo the raw value override from the PoC

* Additional refinement to execution node check frequency

* Put in more complete network diagram

* Followup on comment to remove modified from from health check responsibilities
This commit is contained in:
Alan Rominger 2021-08-11 10:14:20 -04:00
parent 4e84c7c4c4
commit 3b1e40d227
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559
5 changed files with 212 additions and 178 deletions

View File

@ -131,6 +131,13 @@ class Instance(HasPolicyEditsMixin, BaseModel):
grace_period = 120
return self.modified < ref_time - timedelta(seconds=grace_period)
def mark_offline(self, on_good_terms=False):
self.cpu = self.cpu_capacity = self.memory = self.mem_capacity = self.capacity = 0
update_fields = ['capacity', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']
if on_good_terms:
update_fields.append('modified')
self.save()
def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()

View File

@ -52,9 +52,6 @@ from gitdb.exc import BadName as BadGitName
# Runner
import ansible_runner
# Receptor
from receptorctl.socket_interface import ReceptorControl
# dateutil
from dateutil.parser import parse as parse_date
@ -85,7 +82,7 @@ from awx.main.models import (
SystemJobEvent,
build_safe_env,
)
from awx.main.constants import ACTIVE_STATES, RECEPTOR_PENDING
from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError, PostRunError
from awx.main.queue import CallbackQueueDispatcher
from awx.main.dispatch.publish import task
@ -101,17 +98,18 @@ from awx.main.utils.common import (
parse_yaml_or_json,
cleanup_new_process,
create_partition,
get_cpu_capacity,
get_mem_capacity,
get_cpu_effective_capacity,
get_mem_effective_capacity,
get_system_task_capacity,
)
from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec, CONTAINER_ROOT, to_container_path
from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.external_logging import reconfigure_rsyslog
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.utils.receptor import get_receptor_ctl, worker_info
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
@ -182,8 +180,7 @@ def dispatch_startup():
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.capacity = 0 # No thank you to new jobs while shut down
this_inst.save(update_fields=['capacity', 'modified'])
this_inst.mark_offline(on_good_terms=True) # No thank you to new jobs while shut down
try:
reaper.reap(this_inst)
except Exception:
@ -407,7 +404,34 @@ def cleanup_execution_environment_images():
@task(queue=get_local_queuename)
def check_heartbeat(node):
AWXReceptorJob.check_heartbeat(node)
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
return
data = worker_info(node)
if data['Errors']:
formatted_error = "\n".join(data["Errors"])
if instance.capacity:
logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
else:
logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}')
instance.mark_offline()
else:
# TODO: spin off new instance method from refresh_capacity that calculates derived fields
instance.cpu = data['CPU Capacity'] # TODO: rename field on runner side to not say "Capacity"
instance.cpu_capacity = get_cpu_effective_capacity(instance.cpu)
instance.memory = data['Memory Capacity'] * 1000 # TODO: double-check the multiplier here
instance.mem_capacity = get_mem_effective_capacity(instance.memory)
instance.capacity = get_system_task_capacity(
instance.capacity_adjustment,
instance.cpu_capacity,
instance.mem_capacity,
)
instance.version = 'ansible-runner-' + data['Version']
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2)))
def discover_receptor_nodes():
@ -421,25 +445,30 @@ def discover_receptor_nodes():
continue
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
was_lost = instance.is_lost(ref_time=nowtime)
if changed:
logger.info("Registered execution node '{}'".format(hostname))
check_heartbeat.apply_async([hostname])
else:
last_seen = parse_date(ad['Time'])
logger.debug("Updated execution node '{}' modified from {} to {}".format(hostname, instance.modified, last_seen))
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile,
# don't save a new modified time
continue
last_seen = parse_date(ad['Time'])
if instance.modified == last_seen:
continue
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile, don't save a new modified time
# this is so multiple cluster nodes do all make repetitive updates
continue
instance.save(update_fields=['modified'])
if was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warning('Execution node attempting to rejoin as instance {}.'.format(hostname))
check_heartbeat.apply_async([hostname])
instance.save(update_fields=['modified'])
if changed:
logger.warn("Registered execution node '{}'".format(hostname))
check_heartbeat.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
check_heartbeat.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
check_heartbeat.apply_async([hostname])
@task(queue=get_local_queuename)
@ -460,7 +489,7 @@ def cluster_node_heartbeat():
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.node_type == 'execution':
elif inst.node_type == 'execution': # TODO: zero out capacity of execution nodes that are MIA
# Only considering control plane for this logic
continue
elif inst.is_lost(ref_time=nowtime):
@ -504,8 +533,7 @@ def cluster_node_heartbeat():
# If auto deprovisining is on, don't bother setting the capacity to 0
# since we will delete the node anyway.
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.capacity = 0
other_inst.save(update_fields=['capacity'])
other_inst.mark_offline()
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
@ -802,10 +830,6 @@ def with_path_cleanup(f):
return _wrapped
def get_receptor_ctl():
return ReceptorControl('/var/run/receptor/receptor.sock')
class BaseTask(object):
model = None
event_model = None
@ -2988,120 +3012,6 @@ class AWXReceptorJob:
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
receptor_ctl.simple_command(f"work release {self.unit_id}")
@classmethod
def check_heartbeat(cls, node): # TODO: rename most of these "heartbeat" things
logger.info(f'Checking capacity of execution node {node}')
# make a private data dir and env dir
private_data_dir = tempfile.mkdtemp(prefix='awx_heartbeat_', dir=settings.AWX_ISOLATION_BASE_PATH)
env_path = os.path.join(private_data_dir, 'env')
os.makedirs(os.path.join(env_path), mode=0o700)
# write a cmdline file for adhoc
f = os.fdopen(os.open(os.path.join(env_path, 'cmdline'), os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE), 'w')
f.write(ansible_runner.utils.args2cmdline('localhost'))
f.close()
# write a custom facts.d to report the runner version
facts_path = os.path.join(private_data_dir, 'facts.d')
os.makedirs(facts_path, mode=0o700)
with open(os.path.join(facts_path, 'ansible_runner.fact'), 'w') as f:
os.chmod(f.name, 0o700)
f.write("""#!/usr/bin/env sh\necho "{\\"version\\": \\"`ansible-runner --version`\\"}"\n""") # noqa
# write a local inventory
inventory_path = os.path.join(private_data_dir, 'inventory')
os.makedirs(inventory_path, mode=0o700)
fn = os.path.join(inventory_path, 'hosts')
with open(fn, 'w') as f:
os.chmod(fn, stat.S_IRUSR | stat.S_IXUSR | stat.S_IWUSR)
f.write('localhost ansible_connection=local')
# we have to create the project directory because it is --workdir and crun needs it to exist
# https://github.com/ansible/ansible-runner/issues/758
project_path = os.path.join(private_data_dir, 'project')
os.makedirs(project_path, mode=0o700)
runner_params = {
'ident': str(uuid4()),
'private_data_dir': private_data_dir,
'module': 'setup',
'module_args': f'fact_path={private_data_dir}/facts.d',
'inventory': inventory_path,
'only_transmit_kwargs': False,
'settings': {
"container_image": get_default_execution_environment().image,
"container_options": ['--user=root'],
"process_isolation": True,
},
}
class _Instance(object):
pk = -1
job_env = {}
@property
def is_container_group_task(self):
return False
@property
def execution_node(self):
return node
class _BaseTask(object):
instance = _Instance()
cpus = 0
mem_mb = 0
version = RECEPTOR_PENDING
def build_execution_environment_params(self, instance, private_data_dir):
return {}
def event_handler(self, event_data):
if event_data.get('event') == 'runner_on_ok':
facts = event_data.get('event_data', {}).get('res', {}).get('ansible_facts', {})
if facts:
self.cpus = facts.get('ansible_processor_vcpus', 0)
self.mem_mb = facts.get('ansible_memtotal_mb', 0)
version = facts.get('ansible_local', {}).get('ansible_runner', {}).get('version', '') # noqa
if version:
self.version = f'ansible-runner-{version}'
# TODO: save event_data["stdout"] and log when errors happen
def finished_callback(self, runner_obj):
pass
def cancel_callback(self):
pass
def status_handler(self, status_data, runner_config):
# TODO: log error cases
pass
def update_model(self, *args, **kw):
pass
task = _BaseTask()
receptor_job = cls(task, runner_params)
res = receptor_job.run(work_type='ansible-runner')
if res.status == 'successful':
cpu = get_cpu_capacity(task.cpus)
mem = get_mem_capacity(task.mem_mb * 1000000)
logger.info(f'Calculated memory capacity: {task.mem_mb}, out: {mem}')
instance = Instance.objects.get(hostname=node)
instance.cpu = cpu[0]
instance.cpu_capacity = cpu[1]
instance.memory = mem[0]
instance.mem_capacity = mem[1]
instance.capacity = get_system_task_capacity(
instance.capacity_adjustment,
instance.cpu_capacity,
instance.mem_capacity,
)
instance.version = task.version
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
logger.info(f'Updated capacity of {node} to cpu: {instance.cpu_capacity} mem: {instance.mem_capacity}')
else:
# TODO: error handling like we do with jobs
# receptorctl work results
# receptorctl work list
logger.info(f'Capacity check not successful for execution node {node}')
def _run_internal(self, receptor_ctl, work_type=None):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for

View File

@ -699,12 +699,25 @@ def parse_yaml_or_json(vars_str, silent_failure=True):
return vars_dict
def get_cpu_capacity(raw=None):
def get_cpu_effective_capacity(cpu_count):
from django.conf import settings
settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None)
env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None)
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
forkcpu = int(settings_forkcpu)
else:
forkcpu = 4
return cpu_count * forkcpu
def get_cpu_capacity():
from django.conf import settings
settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None)
env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None)
@ -713,24 +726,30 @@ def get_cpu_capacity(raw=None):
elif settings_abscpu is not None:
return 0, int(settings_abscpu)
if raw is None:
raw = psutil.cpu_count()
cpu = psutil.cpu_count()
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
forkcpu = int(settings_forkcpu)
else:
forkcpu = 4
return (raw, raw * forkcpu)
return (cpu, get_cpu_effective_capacity(cpu))
def get_mem_capacity(raw_mb=None):
def get_mem_effective_capacity(mem_mb):
from django.conf import settings
settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None)
env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None)
if env_forkmem:
forkmem = int(env_forkmem)
elif settings_forkmem:
forkmem = int(settings_forkmem)
else:
forkmem = 100
return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem)
def get_mem_capacity():
from django.conf import settings
settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None)
env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None)
@ -739,16 +758,8 @@ def get_mem_capacity(raw_mb=None):
elif settings_absmem is not None:
return 0, int(settings_absmem)
if env_forkmem:
forkmem = int(env_forkmem)
elif settings_forkmem:
forkmem = int(settings_forkmem)
else:
forkmem = 100
if raw_mb is None:
raw_mb = psutil.virtual_memory().total
return (raw_mb, max(1, ((raw_mb // 1024 // 1024) - 2048) // forkmem))
mem = psutil.virtual_memory().total
return (mem, get_mem_effective_capacity(mem))
def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None):

View File

@ -0,0 +1,77 @@
import logging
import yaml
import time
from receptorctl.socket_interface import ReceptorControl
logger = logging.getLogger('awx.main.utils.receptor')
def get_receptor_ctl():
return ReceptorControl('/var/run/receptor/receptor.sock')
def worker_info(node_name):
receptor_ctl = get_receptor_ctl()
transmit_start = time.time()
error_list = []
data = {'Errors': error_list, 'transmit_timing': 0.0}
result = receptor_ctl.submit_work(worktype='ansible-runner', payload='', params={"params": f"--worker-info"}, ttl='20s', node=node_name)
unit_id = result['unitid']
run_start = time.time()
data['transmit_timing'] = run_start - transmit_start
data['run_timing'] = 0.0
try:
resultfile = receptor_ctl.get_work_results(unit_id)
stdout = ''
while data['run_timing'] < 20.0:
status = receptor_ctl.simple_command(f'work status {unit_id}')
state_name = status.get('StateName')
if state_name not in ('Pending', 'Running'):
break
data['run_timing'] = time.time() - run_start
time.sleep(0.5)
else:
error_list.append(f'Timeout getting worker info on {node_name}, state remains in {state_name}')
stdout = resultfile.read()
stdout = str(stdout, encoding='utf-8')
finally:
res = receptor_ctl.simple_command(f"work release {unit_id}")
if res != {'released': unit_id}:
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node_name}, data: {res}')
receptor_ctl.close()
if state_name.lower() == 'failed':
work_detail = status.get('Detail', '')
if not work_detail.startswith('exit status'):
error_list.append(f'Receptor error getting worker info from {node_name}, detail:\n{work_detail}')
elif 'unrecognized arguments: --worker-info' in stdout:
error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info')
else:
error_list.append(f'Unknown ansible-runner error on node {node_name}, stdout:\n{stdout}')
else:
yaml_stdout = stdout.strip()
remote_data = {}
try:
remote_data = yaml.safe_load(yaml_stdout)
except Exception as json_e:
error_list.append(f'Failed to parse node {node_name} --worker-info output as YAML, error: {json_e}, data:\n{yaml_stdout}')
if not isinstance(remote_data, dict):
error_list.append(f'Remote node {node_name} --worker-info output is not a YAML dict, output:{stdout}')
else:
error_list.extend(remote_data.pop('Errors')) # merge both error lists
data.update(remote_data)
return data

View File

@ -72,9 +72,28 @@ Control nodes check the receptor network (reported via `receptorctl status`) whe
Nodes on the receptor network are compared against the `Instance` model in the database.
If a node appears in the mesh network which is not in the database, then a "health check" is started.
This will submit a work unit to the execution node which then outputs important node data via `ansible-runner`.
The `capacity` field will obtain a non-zero value through this process, which is necessary to run jobs.
#### Health Check Mechanics
All relevant data for health checks is reported from the ansible-runner command:
```
ansible-runner worker --worker-info
```
This will output YAML data to standard out containing CPU, memory, and other metrics used to compute `capacity`.
AWX invokes this command by submitting a receptor work unit (of type `ansible-runner`) to the target execution node.
If you have the development environment running, you can run a one-off health check of a node with this command:
```
echo "from awx.main.utils.receptor import worker_info; worker_info('receptor-1')" | awx-manage shell_plus --quiet
```
This must be ran as the awx user inside one of the hybrid or control nodes.
This will not affect actual `Instance` record, but will just run the command and report the data.
### Development Environment
A "toy" cluster with execution nodes and a hop node is created by the docker-compose Makefile target.
@ -89,13 +108,23 @@ This will spin up a topology represented below.
(names are the receptor node names, which differ from the AWX Instance names and network address in some cases)
```
<awx_1:2222>---v
-----v
<awx_2:2222>
<receptor-hop:5555>
^-------------- <receptor-1>
^-------------- <receptor-2>
^-------------- <receptor-3>
┌──────────────┐
│ │
┌──────────────┐ ┌──────────┤ receptor-1 │
│ │ │ │ │
│ awx_1 │◄──────────┐ │ └──────────────┘
│ │ │ ▼
└──────┬───────┘ ┌──────┴───────┐ ┌──────────────┐
│ │ │ │ │
│ │ receptor-hop │◄───────┤ receptor-2 │
▼ │ │ │ │
┌──────────────┐ └──────────────┘ └──────────────┘
│ │ ▲
│ awx_2 │ │ ┌──────────────┐
│ │ │ │ │
└──────────────┘ └──────────┤ receptor-3 │
│ │
└──────────────┘
```
All execution (`receptor-*`) nodes connect to the hop node.