implement an initial development environment for receptor-based clusters

This commit is contained in:
Ryan Petrello
2021-03-16 13:30:56 -04:00
committed by Alan Rominger
parent 4a271d6897
commit 05cb876df5
22 changed files with 396 additions and 286 deletions

View File

@@ -55,7 +55,11 @@ import ansible_runner
# Receptor
from receptorctl.socket_interface import ReceptorControl
# dateutil
from dateutil.parser import parse as parse_date
# AWX
from awx import MODE
from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS
from awx.main.access import access_registry
@@ -98,6 +102,9 @@ from awx.main.utils.common import (
parse_yaml_or_json,
cleanup_new_process,
create_partition,
get_mem_capacity,
get_cpu_capacity,
get_system_task_capacity,
)
from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path
from awx.main.utils.ansible import read_ansible_config
@@ -114,6 +121,9 @@ from awx.main.analytics.subsystem_metrics import Metrics
from rest_framework.exceptions import PermissionDenied
RECEPTOR_SOCK = '/var/run/receptor/receptor.sock'
RECEPTOR_PENDING = 'ansible-runner-???'
__all__ = [
'RunJob',
@@ -398,48 +408,74 @@ def cleanup_execution_environment_images():
if process.returncode != 0:
logger.debug(f"Failed to delete image {image_name}")
@task(queue=get_local_queuename)
def check_heartbeat(node):
AWXReceptorJob.check_heartbeat(node)
def discover_receptor_nodes():
ctl = ReceptorControl(RECEPTOR_SOCK)
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad['WorkCommands'] or []
if 'ansible-runner' not in commands:
if 'local' in commands:
# this node is strictly a control plane node, and does not
# provide ansible-runner as a work command
(changed, instance) = Instance.objects.register(hostname=hostname)
if changed:
logger.info("Registered tower control node '{}'".format(hostname))
instance.capacity = instance.cpu = instance.memory = instance.cpu_capacity = instance.mem_capacity = 0 # noqa
instance.version = get_awx_version()
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
continue
(changed, instance) = Instance.objects.register(hostname=hostname)
was_lost = instance.is_lost(ref_time=nowtime)
if changed:
logger.info("Registered tower execution node '{}'".format(hostname))
instance.capacity = 0
instance.version = RECEPTOR_PENDING
instance.save(update_fields=['capacity', 'version', 'modified'])
check_heartbeat.apply_async([hostname])
else:
last_seen = parse_date(ad['Time'])
logger.debug("Updated tower control node '{}' last seen {}".format(hostname, last_seen))
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile,
# don't save a new modified time
continue
instance.save(update_fields=['modified'])
if was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warning('Attempting to rejoin the cluster as instance {}.'.format(hostname))
check_heartbeat.apply_async([hostname])
@task(queue=get_local_queuename)
def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all())
this_inst = None
lost_instances = []
(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower node '{}'".format(instance.hostname))
logger.info("Registered tower control node '{}'".format(instance.hostname))
discover_receptor_nodes()
for inst in list(instance_list):
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
this_inst.refresh_capacity()
if startup_event:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
return
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.version == "":
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error(
"Host {} reports version {}, but this node {} is at {}, shutting down".format(
other_inst.hostname, other_inst.version, this_inst.hostname, this_inst.version
)
)
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
# The heartbeat task will reset the capacity to the system capacity after upgrade.
stop_local_services(communicate=False)
raise RuntimeError("Shutting down.")
for other_inst in lost_instances:
try:
reaper.reap(other_inst)
@@ -1720,6 +1756,7 @@ class RunJob(BaseTask):
]
)
params['process_isolation'] = False if MODE == 'development' else True
return params
def pre_run_hook(self, job, private_data_dir):
@@ -2793,6 +2830,10 @@ class RunAdHocCommand(BaseTask):
d[r'Password:\s*?$'] = 'ssh_password'
return d
def build_execution_environment_params(self, instance, private_data_dir):
params = super(RunAdHocCommand, self).build_execution_environment_params(instance)
params['process_isolation'] = False if MODE == 'development' else True
return params
@task(queue=get_local_queuename)
class RunSystemJob(BaseTask):
@@ -2927,18 +2968,120 @@ class AWXReceptorJob:
execution_environment_params = self.task.build_execution_environment_params(self.task.instance, runner_params['private_data_dir'])
self.runner_params['settings'].update(execution_environment_params)
def run(self):
def run(self, work_type=None):
# We establish a connection to the Receptor socket
receptor_ctl = get_receptor_ctl()
# TODO: Seems like we already have a method for doing this now?
# receptor_ctl = get_receptor_ctl()
receptor_ctl = ReceptorControl(RECEPTOR_SOCK)
try:
return self._run_internal(receptor_ctl)
return self._run_internal(receptor_ctl, work_type=work_type)
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
receptor_ctl.simple_command(f"work release {self.unit_id}")
def _run_internal(self, receptor_ctl):
@classmethod
def check_heartbeat(cls, node):
# make a private data dir and env dir
private_data_dir = tempfile.mkdtemp(prefix='awx_heartbeat_', dir=settings.AWX_ISOLATION_BASE_PATH)
env_path = os.path.join(private_data_dir, 'env')
os.makedirs(os.path.join(env_path), mode=0o700)
# write a cmdline file for adhoc
f = os.fdopen(os.open(os.path.join(env_path, 'cmdline'), os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE), 'w')
f.write(ansible_runner.utils.args2cmdline('localhost'))
f.close()
# write a custom facts.d to report the runner version
facts_path = os.path.join(private_data_dir, 'facts.d')
os.makedirs(facts_path, mode=0o700)
with open(os.path.join(facts_path, 'ansible_runner.fact'), 'w') as f:
os.chmod(f.name, 0o700)
f.write("""#!/usr/bin/env sh\necho "{\\"version\\": \\"`ansible-runner --version`\\"}"\n""") # noqa
# write a local inventory
inventory_path = os.path.join(private_data_dir, 'inventory')
os.makedirs(inventory_path, mode=0o700)
fn = os.path.join(inventory_path, 'hosts')
with open(fn, 'w') as f:
os.chmod(fn, stat.S_IRUSR | stat.S_IXUSR | stat.S_IWUSR)
f.write('localhost ansible_connection=local')
runner_params = {
'ident': str(uuid4()),
'private_data_dir': private_data_dir,
'module': 'setup',
'module_args': f'fact_path={private_data_dir}/facts.d',
'inventory': inventory_path,
'only_transmit_kwargs': False,
'settings': {
"container_image": get_default_execution_environment().image,
"container_options": ['--user=root'],
"process_isolation": False if MODE == 'development' else True,
},
}
class _Instance(object):
pk = -1
job_env = {}
@property
def is_container_group_task(self):
return False
@property
def execution_node(self):
return node
class _BaseTask(object):
instance = _Instance()
cpus = 0
mem_mb = 0
version = RECEPTOR_PENDING
def build_execution_environment_params(self, instance, private_data_dir):
return {}
def event_handler(self, event_data):
if event_data.get('event') == 'runner_on_ok':
facts = event_data.get('event_data', {}).get('res', {}).get('ansible_facts', {})
if facts:
self.cpus = facts.get('ansible_processor_vcpus', 0)
self.mem_mb = facts.get('ansible_memtotal_mb', 0)
version = facts.get('ansible_local', {}).get('ansible_runner', {}).get('version', '') # noqa
if version:
self.version = f'ansible-runner-{version}'
def finished_callback(self, runner_obj):
pass
def cancel_callback(self):
pass
def status_handler(self, status_data, runner_config):
pass
def update_model(self, *args, **kw):
pass
task = _BaseTask()
receptor_job = cls(task, runner_params)
res = receptor_job.run(work_type='ansible-runner')
if res.status == 'successful':
cpu = get_cpu_capacity(task.cpus)
mem = get_cpu_capacity(task.mem_mb)
instance = Instance.objects.get(hostname=node)
instance.cpu = cpu[0]
instance.cpu_capacity = cpu[1]
instance.memory = mem[0]
instance.memory_capacity = mem[1]
instance.capacity = get_system_task_capacity(
instance.capacity_adjustment,
instance.cpu_capacity,
instance.mem_capacity,
)
instance.version = task.version
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
def _run_internal(self, receptor_ctl, work_type=None):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
@@ -2949,7 +3092,14 @@ class AWXReceptorJob:
# submit our work, passing
# in the right side of our socketpair for reading.
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params)
_kw = {}
work_type = work_type or self.work_type
if work_type == 'ansible-runner':
_kw['node'] = self.task.instance.execution_node
logger.debug(f'receptorctl.submit_work(node={_kw["node"]})')
else:
logger.debug(f'receptorctl.submit_work({work_type})')
result = receptor_ctl.submit_work(worktype=work_type, payload=sockout.makefile('rb'), params=self.receptor_params, **_kw)
self.unit_id = result['unitid']
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
@@ -3005,7 +3155,7 @@ class AWXReceptorJob:
# write our payload to the left side of our socketpair.
@cleanup_new_process
def transmit(self, _socket):
if not settings.IS_K8S and self.work_type == 'local':
if not settings.IS_K8S and self.work_type == 'local' and 'only_transmit_kwargs' not in self.runner_params:
self.runner_params['only_transmit_kwargs'] = True
try:
@@ -3052,6 +3202,8 @@ class AWXReceptorJob:
work_type = 'kubernetes-runtime-auth'
else:
work_type = 'kubernetes-incluster-auth'
elif isinstance(self.task.instance, (Job, AdHocCommand)):
work_type = 'ansible-runner'
else:
work_type = 'local'