diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 849b39adbc..0830b390c7 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -1,3 +1,4 @@ +import fnmatch import json import os import shutil @@ -5,12 +6,11 @@ import stat import tempfile import time import logging -from io import StringIO from django.conf import settings +import ansible_runner import awx -from awx.main.expect import run from awx.main.utils import get_system_task_capacity from awx.main.queue import CallbackQueueDispatcher @@ -18,73 +18,96 @@ logger = logging.getLogger('awx.isolated.manager') playbook_logger = logging.getLogger('awx.isolated.manager.playbooks') +def set_pythonpath(venv_libdir, env): + env.pop('PYTHONPATH', None) # default to none if no python_ver matches + for version in os.listdir(venv_libdir): + if fnmatch.fnmatch(version, 'python[23].*'): + if os.path.isdir(os.path.join(venv_libdir, version)): + env['PYTHONPATH'] = os.path.join(venv_libdir, version, "site-packages") + ":" + break + + class IsolatedManager(object): - def __init__(self, env, cancelled_callback=None, job_timeout=0, - idle_timeout=None): + def __init__(self, cancelled_callback=None): """ - :param env: a dict containing environment variables for the - subprocess, ala `os.environ` :param cancelled_callback: a callable - which returns `True` or `False` - signifying if the job has been prematurely cancelled - :param job_timeout a timeout (in seconds); if the total job runtime - exceeds this, the process will be killed - :param idle_timeout a timeout (in seconds); if new output is not - sent to stdout in this interval, the process - will be terminated """ - self.management_env = self._base_management_env() self.cancelled_callback = cancelled_callback - self.job_timeout = job_timeout - self.idle_timeout = idle_timeout + self.idle_timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) self.started_at = None - @staticmethod - def _base_management_env(): - ''' - Returns environment variables to use when running a playbook - that manages the isolated instance. - Use of normal job callback and other such configurations are avoided. - ''' + def build_runner_params(self, hosts, verbosity=1): env = dict(os.environ.items()) env['ANSIBLE_RETRY_FILES_ENABLED'] = 'False' env['ANSIBLE_HOST_KEY_CHECKING'] = 'False' env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'plugins', 'isolated') - return env + set_pythonpath(os.path.join(settings.ANSIBLE_VENV_PATH, 'lib'), env) - @staticmethod - def _build_args(playbook, hosts, extra_vars=None): - ''' - Returns list of Ansible CLI command arguments for a management task + def finished_callback(runner_obj): + if runner_obj.status == 'failed' and runner_obj.config.playbook != 'check_isolated.yml': + # failed for clean_isolated.yml just means the playbook hasn't + # exited on the isolated host + stdout = runner_obj.stdout.read() + playbook_logger.error(stdout) + elif runner_obj.status == 'timeout': + # this means that the default idle timeout of + # (2 * AWX_ISOLATED_CONNECTION_TIMEOUT) was exceeded + # (meaning, we tried to sync with an isolated node, and we got + # no new output for 2 * AWX_ISOLATED_CONNECTION_TIMEOUT seconds) + # this _usually_ means SSH key auth from the controller -> + # isolated didn't work, and ssh is hung waiting on interactive + # input e.g., + # + # awx@isolated's password: + stdout = runner_obj.stdout.read() + playbook_logger.error(stdout) + else: + playbook_logger.info(runner_obj.stdout.read()) - :param playbook: name of the playbook to run - :param hosts: host pattern to operate on, ex. "localhost," - :param extra_vars: optional dictionary of extra_vars to apply - ''' - args = [ - 'ansible-playbook', - playbook, - '-u', settings.AWX_ISOLATED_USERNAME, - '-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), - '-i', hosts - ] - if extra_vars: - args.extend(['-e', json.dumps(extra_vars)]) - if settings.AWX_ISOLATED_VERBOSITY: - args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY))) - return args + inventory = '\n'.join([ + '{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME) + for host in hosts + ]) - @classmethod - def awx_playbook_path(cls): - return os.path.abspath(os.path.join( - os.path.dirname(awx.__file__), - 'playbooks' - )) + return { + 'project_dir': os.path.abspath(os.path.join( + os.path.dirname(awx.__file__), + 'playbooks' + )), + 'inventory': inventory, + 'envvars': env, + 'finished_callback': finished_callback, + 'verbosity': verbosity, + 'cancel_callback': self.cancelled_callback, + 'settings': { + 'idle_timeout': self.idle_timeout, + 'job_timeout': settings.AWX_ISOLATED_LAUNCH_TIMEOUT, + 'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5), + }, + } def path_to(self, *args): return os.path.join(self.private_data_dir, *args) + def run_management_playbook(self, playbook, private_data_dir, **kw): + iso_dir = tempfile.mkdtemp( + prefix=playbook, + dir=private_data_dir + ) + params = self.runner_params.copy() + params['playbook'] = playbook + params['private_data_dir'] = iso_dir + params.update(**kw) + if all([ + getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, + getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) + ]): + params['ssh_key'] = settings.AWX_ISOLATED_PRIVATE_KEY + return ansible_runner.interface.run(**params) + def dispatch(self, playbook=None, module=None, module_args=None): ''' Ship the runner payload to a remote host for isolated execution. @@ -92,71 +115,7 @@ class IsolatedManager(object): self.handled_events = set() self.started_at = time.time() - self.build_isolated_job_data() - extra_vars = { - 'src': self.private_data_dir, - 'dest': settings.AWX_PROOT_BASE_PATH, - 'ident': self.ident - } - if playbook: - extra_vars['playbook'] = playbook - if module and module_args: - extra_vars['module'] = module - extra_vars['module_args'] = module_args - - # Run ansible-playbook to launch a job on the isolated host. This: - # - # - sets up a temporary directory for proot/bwrap (if necessary) - # - copies encrypted job data from the controlling host to the isolated host (with rsync) - # - writes the encryption secret to a named pipe on the isolated host - # - launches ansible-runner - args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars) - if self.instance.verbosity: - args.append('-%s' % ('v' * min(5, self.instance.verbosity))) - buff = StringIO() - logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id)) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - idle_timeout=self.idle_timeout, - job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT, - pexpect_timeout=5 - ) - output = buff.getvalue() - playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output)) - if status != 'successful': - for event_data in [ - {'event': 'verbose', 'stdout': output}, - {'event': 'EOF', 'final_counter': 1}, - ]: - event_data.setdefault(self.event_data_key, self.instance.id) - CallbackQueueDispatcher().dispatch(event_data) - return status, rc - - @classmethod - def run_pexpect(cls, pexpect_args, *args, **kw): - isolated_ssh_path = None - try: - if all([ - getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, - getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) - ]): - isolated_ssh_path = tempfile.mkdtemp(prefix='awx_isolated', dir=settings.AWX_PROOT_BASE_PATH) - os.chmod(isolated_ssh_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - isolated_key = os.path.join(isolated_ssh_path, '.isolated') - ssh_sock = os.path.join(isolated_ssh_path, '.isolated_ssh_auth.sock') - run.open_fifo_write(isolated_key, settings.AWX_ISOLATED_PRIVATE_KEY) - pexpect_args = run.wrap_args_with_ssh_agent(pexpect_args, isolated_key, ssh_sock, silence_ssh_add=True) - return run.run_pexpect(pexpect_args, *args, **kw) - finally: - if isolated_ssh_path: - shutil.rmtree(isolated_ssh_path) - - def build_isolated_job_data(self): - ''' - Write metadata related to the playbook run into a collection of files - on the local file system. - ''' - + # exclude certain files from the rsync rsync_exclude = [ # don't rsync source control metadata (it can be huge!) '- /project/.git', @@ -176,6 +135,23 @@ class IsolatedManager(object): f.write(data) os.chmod(path, stat.S_IRUSR) + extravars = { + 'src': self.private_data_dir, + 'dest': settings.AWX_PROOT_BASE_PATH, + 'ident': self.ident + } + if playbook: + extravars['playbook'] = playbook + if module and module_args: + extravars['module'] = module + extravars['module_args'] = module_args + + logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id)) + runner_obj = self.run_management_playbook('run_isolated.yml', + self.private_data_dir, + extravars=extravars) + return runner_obj.status, runner_obj.rc + def check(self, interval=None): """ Repeatedly poll the isolated node to determine if the job has run. @@ -191,22 +167,12 @@ class IsolatedManager(object): :param interval: an interval (in seconds) to wait between status polls """ interval = interval if interval is not None else settings.AWX_ISOLATED_CHECK_INTERVAL - extra_vars = {'src': self.private_data_dir} - args = self._build_args('check_isolated.yml', '%s,' % self.host, extra_vars) - if self.instance.verbosity: - args.append('-%s' % ('v' * min(5, self.instance.verbosity))) - + extravars = {'src': self.private_data_dir} status = 'failed' - output = '' rc = None - buff = StringIO() last_check = time.time() - job_timeout = remaining = self.job_timeout dispatcher = CallbackQueueDispatcher() while status == 'failed': - if job_timeout != 0: - remaining = max(0, job_timeout - (time.time() - self.started_at)) - canceled = self.cancelled_callback() if self.cancelled_callback else False if not canceled and time.time() - last_check < interval: # If the job isn't cancelled, but we haven't waited `interval` seconds, wait longer @@ -216,18 +182,11 @@ class IsolatedManager(object): if canceled: logger.warning('Isolated job {} was manually cancelled.'.format(self.instance.id)) - buff = StringIO() logger.debug('Checking on isolated job {} with `check_isolated.yml`.'.format(self.instance.id)) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - cancelled_callback=self.cancelled_callback, - idle_timeout=remaining, - job_timeout=remaining, - pexpect_timeout=5, - proot_cmd='bwrap' - ) - output = buff.getvalue().encode('utf-8') - playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output)) + runner_obj = self.run_management_playbook('check_isolated.yml', + self.private_data_dir, + extravars=extravars) + status, rc = runner_obj.status, runner_obj.rc # discover new events and ingest them events_path = self.path_to('artifacts', self.ident, 'job_events') @@ -273,30 +232,21 @@ class IsolatedManager(object): def cleanup(self): # If the job failed for any reason, make a last-ditch effort at cleanup - extra_vars = { + extravars = { 'private_data_dir': self.private_data_dir, 'cleanup_dirs': [ self.private_data_dir, ], } - args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars) logger.debug('Cleaning up job {} on isolated host with `clean_isolated.yml` playbook.'.format(self.instance.id)) - buff = StringIO() - timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - idle_timeout=timeout, job_timeout=timeout, - pexpect_timeout=5 + self.run_management_playbook( + 'clean_isolated.yml', + self.private_data_dir, + extravars=extravars ) - output = buff.getvalue().encode('utf-8') - playbook_logger.info('Isolated job {} cleanup:\n{}'.format(self.instance.id, output)) - - if status != 'successful': - # stdout_handle is closed by this point so writing output to logs is our only option - logger.warning('Isolated job {} cleanup error, output:\n{}'.format(self.instance.id, output)) @classmethod - def update_capacity(cls, instance, task_result, awx_application_version): + def update_capacity(cls, instance, task_result): instance.version = 'ansible-runner-{}'.format(task_result['version']) if instance.capacity == 0 and task_result['capacity_cpu']: @@ -308,8 +258,7 @@ class IsolatedManager(object): mem_capacity=int(task_result['capacity_mem'])) instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) - @classmethod - def health_check(cls, instance_qs, awx_application_version): + def health_check(self, instance_qs): ''' :param instance_qs: List of Django objects representing the isolated instances to manage @@ -319,58 +268,51 @@ class IsolatedManager(object): - clean up orphaned private files Performs save on each instance to update its capacity. ''' - hostname_string = '' - for instance in instance_qs: - hostname_string += '{},'.format(instance.hostname) - args = cls._build_args('heartbeat_isolated.yml', hostname_string) - args.extend(['--forks', str(len(instance_qs))]) - env = cls._base_management_env() + # TODO: runner doesn't have a --forks arg + #args.extend(['--forks', str(len(instance_qs))]) try: - facts_path = tempfile.mkdtemp() - env['ANSIBLE_CACHE_PLUGIN'] = 'jsonfile' - env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = facts_path - - buff = StringIO() - timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) - status, rc = IsolatedManager.run_pexpect( - args, cls.awx_playbook_path(), env, buff, - idle_timeout=timeout, job_timeout=timeout, - pexpect_timeout=5 + private_data_dir = tempfile.mkdtemp( + prefix='awx_iso_heartbeat_', + dir=settings.AWX_PROOT_BASE_PATH + ) + self.runner_params = self.build_runner_params([ + instance.hostname for instance in instance_qs + ]) + self.runner_params['private_data_dir'] = private_data_dir + runner_obj = self.run_management_playbook( + 'heartbeat_isolated.yml', + private_data_dir ) - heartbeat_stdout = buff.getvalue().encode('utf-8') - buff.close() - for instance in instance_qs: - output = heartbeat_stdout - task_result = {} - try: - with open(os.path.join(facts_path, instance.hostname), 'r') as facts_data: - output = facts_data.read() - task_result = json.loads(output) - except Exception: - logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output)) - if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result: - task_result = { - 'capacity_cpu': task_result['awx_capacity_cpu'], - 'capacity_mem': task_result['awx_capacity_mem'], - 'version': task_result['awx_capacity_version'] - } - cls.update_capacity(instance, task_result, awx_application_version) - logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname)) - elif instance.capacity == 0: - logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( - instance.hostname)) - else: - logger.warning('Could not update status of isolated instance {}'.format(instance.hostname)) - if instance.is_lost(isolated=True): - instance.capacity = 0 - instance.save(update_fields=['capacity']) - logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format( - instance.hostname, instance.modified)) + if runner_obj.status == 'successful': + for instance in instance_qs: + task_result = {} + try: + task_result = runner_obj.get_fact_cache(instance.hostname) + except Exception: + logger.exception('Failed to read status from isolated instances') + if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result: + task_result = { + 'capacity_cpu': task_result['awx_capacity_cpu'], + 'capacity_mem': task_result['awx_capacity_mem'], + 'version': task_result['awx_capacity_version'] + } + IsolatedManager.update_capacity(instance, task_result) + logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname)) + elif instance.capacity == 0: + logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( + instance.hostname)) + else: + logger.warning('Could not update status of isolated instance {}'.format(instance.hostname)) + if instance.is_lost(isolated=True): + instance.capacity = 0 + instance.save(update_fields=['capacity']) + logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format( + instance.hostname, instance.modified)) finally: - if os.path.exists(facts_path): - shutil.rmtree(facts_path) + if os.path.exists(private_data_dir): + shutil.rmtree(private_data_dir) def run(self, instance, private_data_dir, playbook, module, module_args, event_data_key, ident=None): @@ -393,10 +335,18 @@ class IsolatedManager(object): self.ident = ident self.event_data_key = event_data_key self.instance = instance - self.host = instance.execution_node self.private_data_dir = private_data_dir + self.runner_params = self.build_runner_params( + [instance.execution_node], + verbosity=min(5, self.instance.verbosity) + ) status, rc = self.dispatch(playbook, module, module_args) if status == 'successful': status, rc = self.check() + else: + # emit an EOF event + event_data = {'event': 'EOF', 'final_counter': 0} + event_data.setdefault(self.event_data_key, self.instance.id) + CallbackQueueDispatcher().dispatch(event_data) self.cleanup() return status, rc diff --git a/awx/main/expect/run.py b/awx/main/expect/run.py deleted file mode 100755 index 744c145688..0000000000 --- a/awx/main/expect/run.py +++ /dev/null @@ -1,316 +0,0 @@ -#! /usr/bin/env python - -import argparse -import base64 -import codecs -import collections -import logging -import json -import os -import stat -import pipes -import re -import signal -import sys -import threading -import time -from io import StringIO - -import pexpect -import psutil - - -logger = logging.getLogger('awx.main.utils.expect') - - -def args2cmdline(*args): - return ' '.join([pipes.quote(a) for a in args]) - - -def wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock=None, silence_ssh_add=False): - if ssh_key_path: - ssh_add_command = args2cmdline('ssh-add', ssh_key_path) - if silence_ssh_add: - ssh_add_command = ' '.join([ssh_add_command, '2>/dev/null']) - cmd = ' && '.join([ssh_add_command, - args2cmdline('rm', '-f', ssh_key_path), - args2cmdline(*args)]) - args = ['ssh-agent'] - if ssh_auth_sock: - args.extend(['-a', ssh_auth_sock]) - args.extend(['sh', '-c', cmd]) - return args - - -def open_fifo_write(path, data): - '''open_fifo_write opens the fifo named pipe in a new thread. - This blocks the thread until an external process (such as ssh-agent) - reads data from the pipe. - ''' - os.mkfifo(path, 0o600) - threading.Thread( - target=lambda p, d: open(p, 'w').write(d), - args=(path, data) - ).start() - - -def run_pexpect(args, cwd, env, logfile, - cancelled_callback=None, expect_passwords={}, - extra_update_fields=None, idle_timeout=None, job_timeout=0, - pexpect_timeout=5, proot_cmd='bwrap'): - ''' - Run the given command using pexpect to capture output and provide - passwords when requested. - - :param args: a list of `subprocess.call`-style arguments - representing a subprocess e.g., ['ls', '-la'] - :param cwd: the directory in which the subprocess should - run - :param env: a dict containing environment variables for the - subprocess, ala `os.environ` - :param logfile: a file-like object for capturing stdout - :param cancelled_callback: a callable - which returns `True` or `False` - - signifying if the job has been prematurely - cancelled - :param expect_passwords: a dict of regular expression password prompts - to input values, i.e., {r'Password:*?$': - 'some_password'} - :param extra_update_fields: a dict used to specify DB fields which should - be updated on the underlying model - object after execution completes - :param idle_timeout a timeout (in seconds); if new output is not - sent to stdout in this interval, the process - will be terminated - :param job_timeout a timeout (in seconds); if the total job runtime - exceeds this, the process will be killed - :param pexpect_timeout a timeout (in seconds) to wait on - `pexpect.spawn().expect()` calls - :param proot_cmd the command used to isolate processes, `bwrap` - - Returns a tuple (status, return_code) i.e., `('successful', 0)` - ''' - expect_passwords[pexpect.TIMEOUT] = None - expect_passwords[pexpect.EOF] = None - - if not isinstance(expect_passwords, collections.OrderedDict): - # We iterate over `expect_passwords.keys()` and - # `expect_passwords.values()` separately to map matched inputs to - # patterns and choose the proper string to send to the subprocess; - # enforce usage of an OrderedDict so that the ordering of elements in - # `keys()` matches `values()`. - expect_passwords = collections.OrderedDict(expect_passwords) - password_patterns = list(expect_passwords.keys()) - password_values = list(expect_passwords.values()) - - child = pexpect.spawn( - args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, - encoding='utf-8', echo=False, use_poll=True - ) - child.logfile_read = logfile - canceled = False - timed_out = False - errored = False - last_stdout_update = time.time() - - job_start = time.time() - while child.isalive(): - result_id = child.expect(password_patterns, timeout=pexpect_timeout, searchwindowsize=100) - password = password_values[result_id] - if password is not None: - child.sendline(password) - last_stdout_update = time.time() - if cancelled_callback: - try: - canceled = cancelled_callback() - except Exception: - logger.exception('Could not check cancel callback - canceling immediately') - if isinstance(extra_update_fields, dict): - extra_update_fields['job_explanation'] = "System error during job execution, check system logs" - errored = True - else: - canceled = False - if not canceled and job_timeout != 0 and (time.time() - job_start) > job_timeout: - timed_out = True - if isinstance(extra_update_fields, dict): - extra_update_fields['job_explanation'] = "Job terminated due to timeout" - if canceled or timed_out or errored: - handle_termination(child.pid, child.args, proot_cmd, is_cancel=canceled) - if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: - child.close(True) - canceled = True - if errored: - return 'error', child.exitstatus - elif canceled: - return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not timed_out: - return 'successful', child.exitstatus - else: - return 'failed', child.exitstatus - - -def run_isolated_job(private_data_dir, secrets, logfile=sys.stdout): - ''' - Launch `ansible-playbook`, executing a job packaged by - `build_isolated_job_data`. - - :param private_data_dir: an absolute path on the local file system where - job metadata exists (i.e., - `/tmp/ansible_awx_xyz/`) - :param secrets: a dict containing sensitive job metadata, { - 'env': { ... } # environment variables, - 'passwords': { ... } # pexpect password prompts - 'ssh_key_data': 'RSA KEY DATA', - } - :param logfile: a file-like object for capturing stdout - - Returns a tuple (status, return_code) i.e., `('successful', 0)` - ''' - with open(os.path.join(private_data_dir, 'args'), 'r') as args: - args = json.load(args) - - env = secrets.get('env', {}) - expect_passwords = { - re.compile(pattern, re.M): password - for pattern, password in secrets.get('passwords', {}).items() - } - - if 'AD_HOC_COMMAND_ID' in env: - cwd = private_data_dir - else: - cwd = os.path.join(private_data_dir, 'project') - - # write the SSH key data into a fifo read by ssh-agent - ssh_key_data = secrets.get('ssh_key_data') - if ssh_key_data: - ssh_key_path = os.path.join(private_data_dir, 'ssh_key_data') - ssh_auth_sock = os.path.join(private_data_dir, 'ssh_auth.sock') - open_fifo_write(ssh_key_path, ssh_key_data) - args = wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) - - idle_timeout = secrets.get('idle_timeout', 10) - job_timeout = secrets.get('job_timeout', 10) - pexpect_timeout = secrets.get('pexpect_timeout', 5) - - env['AWX_ISOLATED_DATA_DIR'] = private_data_dir - - venv_path = env.get('VIRTUAL_ENV') - if venv_path and not os.path.exists(venv_path): - raise RuntimeError( - 'a valid Python virtualenv does not exist at {}'.format(venv_path) - ) - - return run_pexpect(args, cwd, env, logfile, - expect_passwords=expect_passwords, - idle_timeout=idle_timeout, - job_timeout=job_timeout, - pexpect_timeout=pexpect_timeout) - - -def handle_termination(pid, args, proot_cmd, is_cancel=True): - ''' - Terminate a subprocess spawned by `pexpect`. - - :param pid: the process id of the running the job. - :param args: the args for the job, i.e., ['ansible-playbook', 'abc.yml'] - :param proot_cmd the command used to isolate processes i.e., `bwrap` - :param is_cancel: flag showing whether this termination is caused by - instance's cancel_flag. - ''' - try: - used_proot = proot_cmd.encode('utf-8') in args - if used_proot: - if not psutil: - os.kill(pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=pid) - child_procs = main_proc.children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except (TypeError, psutil.Error): - os.kill(pid, signal.SIGKILL) - else: - os.kill(pid, signal.SIGTERM) - time.sleep(3) - except OSError: - keyword = 'cancel' if is_cancel else 'timeout' - logger.warn("Attempted to %s already finished job, ignoring" % keyword) - - -def __run__(private_data_dir): - buff = StringIO() - with codecs.open(os.path.join(private_data_dir, 'env'), 'r', encoding='utf-8') as f: - for line in f: - buff.write(line) - - artifacts_dir = os.path.join(private_data_dir, 'artifacts') - - # Standard out directed to pickup location without event filtering applied - stdout_filename = os.path.join(artifacts_dir, 'stdout') - os.mknod(stdout_filename, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') - - status, rc = run_isolated_job( - private_data_dir, - json.loads(base64.b64decode(buff.getvalue())), - stdout_handle - ) - for filename, data in [ - ('status', status), - ('rc', rc), - ]: - artifact_path = os.path.join(private_data_dir, 'artifacts', filename) - os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - with open(artifact_path, 'w') as f: - f.write(str(data)) - - -if __name__ == '__main__': - import awx - __version__ = awx.__version__ - parser = argparse.ArgumentParser(description='manage a daemonized, isolated ansible playbook') - parser.add_argument('--version', action='version', version=__version__ + '-isolated') - parser.add_argument('command', choices=['start', 'stop', 'is-alive']) - parser.add_argument('private_data_dir') - args = parser.parse_args() - - private_data_dir = args.private_data_dir - pidfile = os.path.join(private_data_dir, 'pid') - - if args.command == 'start': - # create a file to log stderr in case the daemonized process throws - # an exception before it gets to `pexpect.spawn` - stderr_path = os.path.join(private_data_dir, 'artifacts', 'daemon.log') - if not os.path.exists(stderr_path): - os.mknod(stderr_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - stderr = open(stderr_path, 'w+') - - import daemon - from daemon.pidfile import TimeoutPIDLockFile - context = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pidfile), - stderr=stderr - ) - with context: - __run__(private_data_dir) - sys.exit(0) - - try: - with open(pidfile, 'r') as f: - pid = int(f.readline()) - except IOError: - sys.exit(1) - - if args.command == 'stop': - try: - with open(os.path.join(private_data_dir, 'args'), 'r') as args: - handle_termination(pid, json.load(args), 'bwrap') - except IOError: - handle_termination(pid, [], 'bwrap') - elif args.command == 'is-alive': - try: - os.kill(pid, signal.SIG_DFL) - sys.exit(0) - except OSError: - sys.exit(1) diff --git a/awx/main/management/commands/test_isolated_connection.py b/awx/main/management/commands/test_isolated_connection.py index 01047cbc44..8c8c726926 100644 --- a/awx/main/management/commands/test_isolated_connection.py +++ b/awx/main/management/commands/test_isolated_connection.py @@ -1,13 +1,14 @@ import os import shutil -import subprocess import sys import tempfile from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from awx.main.expect import run +import ansible_runner + +from awx.main.expect.isolated_manager import set_pythonpath class Command(BaseCommand): @@ -25,23 +26,24 @@ class Command(BaseCommand): try: path = tempfile.mkdtemp(prefix='awx_isolated_ssh', dir=settings.AWX_PROOT_BASE_PATH) - args = [ - 'ansible', 'all', '-i', '{},'.format(hostname), '-u', - settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell', - '-a', 'ansible-runner --version', '-vvv' - ] + ssh_key = None if all([ getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) ]): - ssh_key_path = os.path.join(path, '.isolated') - ssh_auth_sock = os.path.join(path, 'ssh_auth.sock') - run.open_fifo_write(ssh_key_path, settings.AWX_ISOLATED_PRIVATE_KEY) - args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) - try: - print(' '.join(args)) - subprocess.check_call(args) - except subprocess.CalledProcessError as e: - sys.exit(e.returncode) + ssh_key = settings.AWX_ISOLATED_PRIVATE_KEY + env = dict(os.environ.items()) + set_pythonpath(os.path.join(settings.ANSIBLE_VENV_PATH, 'lib'), env) + res = ansible_runner.interface.run( + private_data_dir=path, + host_pattern='all', + inventory='{} ansible_ssh_user={}'.format(hostname, settings.AWX_ISOLATED_USERNAME), + module='shell', + module_args='ansible-runner --version', + envvars=env, + verbosity=3, + ssh_key=ssh_key, + ) + sys.exit(res.rc) finally: shutil.rmtree(path) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 53bb04649e..c21b4ae554 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -4,7 +4,6 @@ # Python from collections import OrderedDict, namedtuple import errno -import fnmatch import functools import importlib import json @@ -467,7 +466,7 @@ def awx_isolated_heartbeat(): # Slow pass looping over isolated IGs and their isolated instances if len(isolated_instance_qs) > 0: logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs]))) - isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) + isolated_manager.IsolatedManager().health_check(isolated_instance_qs) @task() @@ -884,12 +883,8 @@ class BaseTask(object): raise RuntimeError( 'a valid Python virtualenv does not exist at {}'.format(venv_path) ) - env.pop('PYTHONPATH', None) # default to none if no python_ver matches - for version in os.listdir(venv_libdir): - if fnmatch.fnmatch(version, 'python[23].*'): - if os.path.isdir(os.path.join(venv_libdir, version)): - env['PYTHONPATH'] = os.path.join(venv_libdir, version, "site-packages") + ":" - break + + isolated_manager.set_pythonpath(venv_libdir, env) def add_awx_venv(self, env): env['VIRTUAL_ENV'] = settings.AWX_VENV_PATH @@ -965,9 +960,6 @@ class BaseTask(object): def build_credentials_list(self, instance): return [] - def get_idle_timeout(self): - return None - def get_instance_timeout(self, instance): global_timeout_setting_name = instance._global_timeout_setting() if global_timeout_setting_name: @@ -1168,7 +1160,6 @@ class BaseTask(object): 'finished_callback': self.finished_callback, 'status_handler': self.status_handler, 'settings': { - 'idle_timeout': self.get_idle_timeout() or "", 'job_timeout': self.get_instance_timeout(self.instance), 'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5), **process_isolation_params, @@ -1206,10 +1197,7 @@ class BaseTask(object): copy_tree(cwd, os.path.join(private_data_dir, 'project')) ansible_runner.utils.dump_artifacts(params) manager_instance = isolated_manager.IsolatedManager( - env, - cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag, - job_timeout=self.get_instance_timeout(self.instance), - idle_timeout=self.get_idle_timeout(), + cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag ) status, rc = manager_instance.run(self.instance, private_data_dir, @@ -1230,7 +1218,6 @@ class BaseTask(object): extra_update_fields['job_explanation'] = self.instance.job_explanation except Exception: - # run_pexpect does not throw exceptions for cancel or timeout # this could catch programming or file system errors tb = traceback.format_exc() logger.exception('%s Exception occurred while running task', self.instance.log_format) @@ -1514,9 +1501,6 @@ class RunJob(BaseTask): def build_credentials_list(self, job): return job.credentials.all() - def get_idle_timeout(self): - return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', None) - def get_password_prompts(self, passwords={}): d = super(RunJob, self).get_password_prompts(passwords) d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock' @@ -1790,9 +1774,6 @@ class RunProjectUpdate(BaseTask): d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes' return d - def get_idle_timeout(self): - return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None) - def _update_dependent_inventories(self, project_update, dependent_inventory_sources): scm_revision = project_update.project.scm_revision inv_update_class = InventoryUpdate._get_task_class() @@ -2151,9 +2132,6 @@ class RunInventoryUpdate(BaseTask): # All credentials not used by inventory source injector return inventory_update.get_extra_credentials() - def get_idle_timeout(self): - return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', None) - def pre_run_hook(self, inventory_update): source_project = None if inventory_update.inventory_source: @@ -2341,9 +2319,6 @@ class RunAdHocCommand(BaseTask): def build_playbook_path_relative_to_cwd(self, job, private_data_dir): return None - def get_idle_timeout(self): - return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', None) - def get_password_prompts(self, passwords={}): d = super(RunAdHocCommand, self).get_password_prompts() d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock' diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index 8b67ba4183..f1cf382a7c 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -126,23 +126,6 @@ class TestIsolatedManagementTask: inst.save() return inst - @pytest.mark.skip(reason='fix after runner merge') - def test_old_version(self, control_instance, old_version): - update_capacity = isolated_manager.IsolatedManager.update_capacity - - assert old_version.capacity == 103 - with mock.patch('awx.main.tasks.settings', MockSettings()): - # Isolated node is reporting an older version than the cluster - # instance that issued the health check, set capacity to zero. - update_capacity(old_version, {'version': '1.0.0'}, '3.0.0') - assert old_version.capacity == 0 - - # Upgrade was completed, health check playbook now reports matching - # version, make sure capacity is set. - update_capacity(old_version, {'version': '5.0.0-things', - 'capacity_cpu':103, 'capacity_mem':103}, '5.0.0-stuff') - assert old_version.capacity == 103 - def test_takes_action(self, control_instance, needs_updating): original_isolated_instance = needs_updating.instances.all().first() with mock.patch('awx.main.tasks.settings', MockSettings()): diff --git a/awx/main/tests/unit/expect/test_expect.py b/awx/main/tests/unit/expect/test_expect.py deleted file mode 100644 index f9cab27128..0000000000 --- a/awx/main/tests/unit/expect/test_expect.py +++ /dev/null @@ -1,308 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -import pytest -import shutil -import stat -import tempfile -import time -from io import StringIO -from unittest import mock - -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from django.utils.encoding import smart_str, smart_bytes - -from awx.main.expect import run, isolated_manager - -from django.conf import settings - -HERE, FILENAME = os.path.split(__file__) - - -@pytest.fixture(scope='function') -def rsa_key(request): - passphrase = 'passme' - key = rsa.generate_private_key( - public_exponent=65537, - key_size=1024, - backend=default_backend() - ) - return ( - smart_str(key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.BestAvailableEncryption(smart_bytes(passphrase)) - )), - passphrase - ) - - -@pytest.fixture(scope='function') -def private_data_dir(request): - path = tempfile.mkdtemp(prefix='ansible_awx_unit_test') - request.addfinalizer(lambda: shutil.rmtree(path)) - return path - - -@pytest.fixture(autouse=True) -def mock_sleep(request): - # the process teardown mechanism uses `time.sleep` to wait on processes to - # respond to SIGTERM; these are tests and don't care about being nice - m = mock.patch('time.sleep') - m.start() - request.addfinalizer(m.stop) - - -def test_simple_spawn(): - stdout = StringIO() - status, rc = run.run_pexpect( - ['ls', '-la'], - HERE, - {}, - stdout, - cancelled_callback=lambda: False, - ) - assert status == 'successful' - assert rc == 0 - # assert FILENAME in stdout.getvalue() - - -def test_error_rc(): - stdout = StringIO() - status, rc = run.run_pexpect( - ['ls', '-nonsense'], - HERE, - {}, - stdout, - cancelled_callback=lambda: False, - ) - assert status == 'failed' - # I'd expect 2, but we shouldn't risk making this test platform-dependent - assert rc > 0 - - -def test_cancel_callback_error(): - stdout = StringIO() - - def bad_callback(): - raise Exception('unique exception') - - extra_fields = {} - status, rc = run.run_pexpect( - ['sleep', '2'], - HERE, - {}, - stdout, - cancelled_callback=bad_callback, - extra_update_fields=extra_fields - ) - assert status == 'error' - assert rc == 0 - assert extra_fields['job_explanation'] == "System error during job execution, check system logs" - - -@pytest.mark.skip(reason='fix after runner merge') -@pytest.mark.timeout(3) # https://github.com/ansible/tower/issues/2391#issuecomment-401946895 -@pytest.mark.parametrize('value', ['abc123', 'Iñtërnâtiônàlizætiøn']) -def test_env_vars(value): - stdout = StringIO() - status, rc = run.run_pexpect( - ['python', '-c', 'import os; print os.getenv("X_MY_ENV")'], - HERE, - {'X_MY_ENV': value}, - stdout, - cancelled_callback=lambda: False, - ) - assert status == 'successful' - assert rc == 0 - assert value in stdout.getvalue() - - -def test_manual_cancellation(): - stdout = StringIO() - status, rc = run.run_pexpect( - ['python', '-c', 'print raw_input("Password: ")'], - HERE, - {}, - stdout, - cancelled_callback=lambda: True, # this callable will cause cancellation - # the lack of password inputs will cause stdin to hang - pexpect_timeout=0, - ) - assert status == 'canceled' - - -@pytest.mark.skip(reason='fix after runner merge') -def test_build_isolated_job_data(private_data_dir, rsa_key): - pem, passphrase = rsa_key - mgr = isolated_manager.IsolatedManager( - ['ls', '-la'], HERE, {}, StringIO(), '' - ) - mgr.private_data_dir = private_data_dir - mgr.build_isolated_job_data() - - path = os.path.join(private_data_dir, 'project') - assert os.path.isdir(path) - - # /project is a soft link to HERE, which is the directory - # _this_ test file lives in - assert os.path.exists(os.path.join(path, FILENAME)) - - path = os.path.join(private_data_dir, 'artifacts') - assert os.path.isdir(path) - assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR # user rwx - - path = os.path.join(private_data_dir, 'args') - with open(path, 'r') as f: - assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IRUSR # user r/o - assert f.read() == '["ls", "-la"]' - - path = os.path.join(private_data_dir, '.rsync-filter') - with open(path, 'r') as f: - data = f.read() - assert data == '\n'.join([ - '- /project/.git', - '- /project/.svn', - '- /project/.hg', - '- /artifacts/job_events/*-partial.json.tmp', - '- /env' - ]) - - -@pytest.mark.skip(reason='fix after runner merge') -def test_run_isolated_job(private_data_dir, rsa_key): - env = {'JOB_ID': '1'} - pem, passphrase = rsa_key - mgr = isolated_manager.IsolatedManager( - ['ls', '-la'], HERE, env, StringIO(), '' - ) - mgr.private_data_dir = private_data_dir - secrets = { - 'env': env, - 'passwords': { - r'Enter passphrase for .*:\s*?$': passphrase - }, - 'ssh_key_data': pem - } - mgr.build_isolated_job_data() - stdout = StringIO() - # Mock environment variables for callback module - status, rc = run.run_isolated_job(private_data_dir, secrets, stdout) - assert status == 'successful' - assert rc == 0 - assert FILENAME in stdout.getvalue() - - assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir - - -@pytest.mark.skip(reason='fix after runner merge') -def test_run_isolated_adhoc_command(private_data_dir, rsa_key): - env = {'AD_HOC_COMMAND_ID': '1'} - pem, passphrase = rsa_key - mgr = isolated_manager.IsolatedManager( - ['pwd'], HERE, env, StringIO(), '' - ) - mgr.private_data_dir = private_data_dir - secrets = { - 'env': env, - 'passwords': { - r'Enter passphrase for .*:\s*?$': passphrase - }, - 'ssh_key_data': pem - } - mgr.build_isolated_job_data() - stdout = StringIO() - # Mock environment variables for callback module - with mock.patch('os.getenv') as env_mock: - env_mock.return_value = '/path/to/awx/lib' - status, rc = run.run_isolated_job(private_data_dir, secrets, stdout) - assert status == 'successful' - assert rc == 0 - - # for ad-hoc jobs, `ansible` is invoked from the `private_data_dir`, so - # an ad-hoc command that runs `pwd` should print `private_data_dir` to stdout - assert private_data_dir in stdout.getvalue() - - assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir - - -@pytest.mark.skip(reason='fix after runner merge') -def test_check_isolated_job(private_data_dir, rsa_key): - pem, passphrase = rsa_key - stdout = StringIO() - mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '') - mgr.private_data_dir = private_data_dir - mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity']) - mgr.started_at = time.time() - mgr.host = 'isolated-host' - - os.mkdir(os.path.join(private_data_dir, 'artifacts')) - with mock.patch('awx.main.expect.run.run_pexpect') as run_pexpect: - - def _synchronize_job_artifacts(args, cwd, env, buff, **kw): - buff.write('checking job status...') - for filename, data in ( - ['status', 'failed'], - ['rc', '1'], - ['stdout', 'KABOOM!'], - ): - with open(os.path.join(private_data_dir, 'artifacts', filename), 'w') as f: - f.write(data) - return ('successful', 0) - - run_pexpect.side_effect = _synchronize_job_artifacts - with mock.patch.object(mgr, '_missing_artifacts') as missing_artifacts: - missing_artifacts.return_value = False - status, rc = mgr.check(interval=0) - - assert status == 'failed' - assert rc == 1 - assert stdout.getvalue() == 'KABOOM!' - - run_pexpect.assert_called_with( - [ - 'ansible-playbook', 'check_isolated.yml', - '-u', settings.AWX_ISOLATED_USERNAME, - '-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), - '-i', 'isolated-host,', - '-e', '{"src": "%s"}' % private_data_dir, - '-vvvvv' - ], - '/awx_devel/awx/playbooks', mgr.management_env, mock.ANY, - cancelled_callback=None, - idle_timeout=0, - job_timeout=0, - pexpect_timeout=5, - proot_cmd='bwrap' - ) - - -@pytest.mark.skip(reason='fix after runner merge') -def test_check_isolated_job_timeout(private_data_dir, rsa_key): - pem, passphrase = rsa_key - stdout = StringIO() - extra_update_fields = {} - mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '', - job_timeout=1, - extra_update_fields=extra_update_fields) - mgr.private_data_dir = private_data_dir - mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity']) - mgr.started_at = time.time() - mgr.host = 'isolated-host' - - with mock.patch('awx.main.expect.run.run_pexpect') as run_pexpect: - - def _synchronize_job_artifacts(args, cwd, env, buff, **kw): - buff.write('checking job status...') - return ('failed', 1) - - run_pexpect.side_effect = _synchronize_job_artifacts - status, rc = mgr.check(interval=0) - - assert status == 'failed' - assert rc == 1 - assert stdout.getvalue() == 'checking job status...' - - assert extra_update_fields['job_explanation'] == 'Job terminated due to timeout'