From 99478f5d255986cba796b78551a4e2eaf5ef5f17 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 27 Mar 2019 18:16:09 -0400 Subject: [PATCH] replace our usage of pexpect in IsolatedManager with ansible-runner --- awx/main/expect/isolated_manager.py | 319 +++++++----------- awx/main/expect/run.py | 316 ----------------- .../commands/test_isolated_connection.py | 30 +- awx/main/tasks.py | 5 +- awx/main/tests/functional/test_tasks.py | 17 - 5 files changed, 136 insertions(+), 551 deletions(-) delete mode 100755 awx/main/expect/run.py diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 849b39adbc..0554aa1109 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -5,12 +5,11 @@ import stat import tempfile import time import logging -from io import StringIO from django.conf import settings +import ansible_runner import awx -from awx.main.expect import run from awx.main.utils import get_system_task_capacity from awx.main.queue import CallbackQueueDispatcher @@ -20,71 +19,75 @@ playbook_logger = logging.getLogger('awx.isolated.manager.playbooks') class IsolatedManager(object): - def __init__(self, env, cancelled_callback=None, job_timeout=0, - idle_timeout=None): + def __init__(self, cancelled_callback=None, idle_timeout=None): """ - :param env: a dict containing environment variables for the - subprocess, ala `os.environ` :param cancelled_callback: a callable - which returns `True` or `False` - signifying if the job has been prematurely cancelled - :param job_timeout a timeout (in seconds); if the total job runtime - exceeds this, the process will be killed :param idle_timeout a timeout (in seconds); if new output is not sent to stdout in this interval, the process will be terminated """ - self.management_env = self._base_management_env() self.cancelled_callback = cancelled_callback - self.job_timeout = job_timeout - self.idle_timeout = idle_timeout + self.idle_timeout = idle_timeout or max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) self.started_at = None - @staticmethod - def _base_management_env(): - ''' - Returns environment variables to use when running a playbook - that manages the isolated instance. - Use of normal job callback and other such configurations are avoided. - ''' + def build_runner_params(self, hosts, verbosity=1): env = dict(os.environ.items()) env['ANSIBLE_RETRY_FILES_ENABLED'] = 'False' env['ANSIBLE_HOST_KEY_CHECKING'] = 'False' env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'plugins', 'isolated') - return env - @staticmethod - def _build_args(playbook, hosts, extra_vars=None): - ''' - Returns list of Ansible CLI command arguments for a management task + def finished_callback(runner_obj): + if runner_obj.status == 'failed': + stdout = runner_obj.stdout.read() + playbook_logger.error(stdout) + event_data = {'event': 'verbose', 'stdout': stdout, self.event_data_key: self.instance.id} + CallbackQueueDispatcher().dispatch(event_data) + else: + playbook_logger.info(runner_obj.stdout.read()) - :param playbook: name of the playbook to run - :param hosts: host pattern to operate on, ex. "localhost," - :param extra_vars: optional dictionary of extra_vars to apply - ''' - args = [ - 'ansible-playbook', - playbook, - '-u', settings.AWX_ISOLATED_USERNAME, - '-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), - '-i', hosts - ] - if extra_vars: - args.extend(['-e', json.dumps(extra_vars)]) - if settings.AWX_ISOLATED_VERBOSITY: - args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY))) - return args + inventory = '\n'.join([ + '{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME) + for host in hosts + ]) - @classmethod - def awx_playbook_path(cls): - return os.path.abspath(os.path.join( - os.path.dirname(awx.__file__), - 'playbooks' - )) + return { + 'project_dir': os.path.abspath(os.path.join( + os.path.dirname(awx.__file__), + 'playbooks' + )), + 'inventory': inventory, + 'envvars': env, + 'finished_callback': finished_callback, + 'verbosity': verbosity, + 'cancel_callback': self.cancelled_callback, + 'settings': { + 'idle_timeout': self.idle_timeout, + 'job_timeout': settings.AWX_ISOLATED_LAUNCH_TIMEOUT, + 'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5), + }, + } def path_to(self, *args): return os.path.join(self.private_data_dir, *args) + def run_management_playbook(self, playbook, private_data_dir, **kw): + iso_dir = tempfile.mkdtemp( + prefix=playbook, + dir=private_data_dir + ) + params = self.runner_params.copy() + params['playbook'] = playbook + params['private_data_dir'] = iso_dir + params.update(**kw) + if all([ + getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, + getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) + ]): + params['ssh_key'] = settings.AWX_ISOLATED_PRIVATE_KEY + return ansible_runner.interface.run(**params) + def dispatch(self, playbook=None, module=None, module_args=None): ''' Ship the runner payload to a remote host for isolated execution. @@ -92,71 +95,7 @@ class IsolatedManager(object): self.handled_events = set() self.started_at = time.time() - self.build_isolated_job_data() - extra_vars = { - 'src': self.private_data_dir, - 'dest': settings.AWX_PROOT_BASE_PATH, - 'ident': self.ident - } - if playbook: - extra_vars['playbook'] = playbook - if module and module_args: - extra_vars['module'] = module - extra_vars['module_args'] = module_args - - # Run ansible-playbook to launch a job on the isolated host. This: - # - # - sets up a temporary directory for proot/bwrap (if necessary) - # - copies encrypted job data from the controlling host to the isolated host (with rsync) - # - writes the encryption secret to a named pipe on the isolated host - # - launches ansible-runner - args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars) - if self.instance.verbosity: - args.append('-%s' % ('v' * min(5, self.instance.verbosity))) - buff = StringIO() - logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id)) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - idle_timeout=self.idle_timeout, - job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT, - pexpect_timeout=5 - ) - output = buff.getvalue() - playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output)) - if status != 'successful': - for event_data in [ - {'event': 'verbose', 'stdout': output}, - {'event': 'EOF', 'final_counter': 1}, - ]: - event_data.setdefault(self.event_data_key, self.instance.id) - CallbackQueueDispatcher().dispatch(event_data) - return status, rc - - @classmethod - def run_pexpect(cls, pexpect_args, *args, **kw): - isolated_ssh_path = None - try: - if all([ - getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, - getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) - ]): - isolated_ssh_path = tempfile.mkdtemp(prefix='awx_isolated', dir=settings.AWX_PROOT_BASE_PATH) - os.chmod(isolated_ssh_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - isolated_key = os.path.join(isolated_ssh_path, '.isolated') - ssh_sock = os.path.join(isolated_ssh_path, '.isolated_ssh_auth.sock') - run.open_fifo_write(isolated_key, settings.AWX_ISOLATED_PRIVATE_KEY) - pexpect_args = run.wrap_args_with_ssh_agent(pexpect_args, isolated_key, ssh_sock, silence_ssh_add=True) - return run.run_pexpect(pexpect_args, *args, **kw) - finally: - if isolated_ssh_path: - shutil.rmtree(isolated_ssh_path) - - def build_isolated_job_data(self): - ''' - Write metadata related to the playbook run into a collection of files - on the local file system. - ''' - + # exclude certain files from the rsync rsync_exclude = [ # don't rsync source control metadata (it can be huge!) '- /project/.git', @@ -176,6 +115,23 @@ class IsolatedManager(object): f.write(data) os.chmod(path, stat.S_IRUSR) + extravars = { + 'src': self.private_data_dir, + 'dest': settings.AWX_PROOT_BASE_PATH, + 'ident': self.ident + } + if playbook: + extravars['playbook'] = playbook + if module and module_args: + extravars['module'] = module + extravars['module_args'] = module_args + + logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id)) + runner_obj = self.run_management_playbook('run_isolated.yml', + self.private_data_dir, + extravars=extravars) + return runner_obj.status, runner_obj.rc + def check(self, interval=None): """ Repeatedly poll the isolated node to determine if the job has run. @@ -191,22 +147,12 @@ class IsolatedManager(object): :param interval: an interval (in seconds) to wait between status polls """ interval = interval if interval is not None else settings.AWX_ISOLATED_CHECK_INTERVAL - extra_vars = {'src': self.private_data_dir} - args = self._build_args('check_isolated.yml', '%s,' % self.host, extra_vars) - if self.instance.verbosity: - args.append('-%s' % ('v' * min(5, self.instance.verbosity))) - + extravars = {'src': self.private_data_dir} status = 'failed' - output = '' rc = None - buff = StringIO() last_check = time.time() - job_timeout = remaining = self.job_timeout dispatcher = CallbackQueueDispatcher() while status == 'failed': - if job_timeout != 0: - remaining = max(0, job_timeout - (time.time() - self.started_at)) - canceled = self.cancelled_callback() if self.cancelled_callback else False if not canceled and time.time() - last_check < interval: # If the job isn't cancelled, but we haven't waited `interval` seconds, wait longer @@ -216,18 +162,11 @@ class IsolatedManager(object): if canceled: logger.warning('Isolated job {} was manually cancelled.'.format(self.instance.id)) - buff = StringIO() logger.debug('Checking on isolated job {} with `check_isolated.yml`.'.format(self.instance.id)) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - cancelled_callback=self.cancelled_callback, - idle_timeout=remaining, - job_timeout=remaining, - pexpect_timeout=5, - proot_cmd='bwrap' - ) - output = buff.getvalue().encode('utf-8') - playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output)) + runner_obj = self.run_management_playbook('check_isolated.yml', + self.private_data_dir, + extravars=extravars) + status, rc = runner_obj.status, runner_obj.rc # discover new events and ingest them events_path = self.path_to('artifacts', self.ident, 'job_events') @@ -273,30 +212,21 @@ class IsolatedManager(object): def cleanup(self): # If the job failed for any reason, make a last-ditch effort at cleanup - extra_vars = { + extravars = { 'private_data_dir': self.private_data_dir, 'cleanup_dirs': [ self.private_data_dir, ], } - args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars) logger.debug('Cleaning up job {} on isolated host with `clean_isolated.yml` playbook.'.format(self.instance.id)) - buff = StringIO() - timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) - status, rc = IsolatedManager.run_pexpect( - args, self.awx_playbook_path(), self.management_env, buff, - idle_timeout=timeout, job_timeout=timeout, - pexpect_timeout=5 + self.run_management_playbook( + 'clean_isolated.yml', + self.private_data_dir, + extravars=extravars ) - output = buff.getvalue().encode('utf-8') - playbook_logger.info('Isolated job {} cleanup:\n{}'.format(self.instance.id, output)) - - if status != 'successful': - # stdout_handle is closed by this point so writing output to logs is our only option - logger.warning('Isolated job {} cleanup error, output:\n{}'.format(self.instance.id, output)) @classmethod - def update_capacity(cls, instance, task_result, awx_application_version): + def update_capacity(cls, instance, task_result): instance.version = 'ansible-runner-{}'.format(task_result['version']) if instance.capacity == 0 and task_result['capacity_cpu']: @@ -308,8 +238,7 @@ class IsolatedManager(object): mem_capacity=int(task_result['capacity_mem'])) instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) - @classmethod - def health_check(cls, instance_qs, awx_application_version): + def health_check(self, instance_qs): ''' :param instance_qs: List of Django objects representing the isolated instances to manage @@ -319,58 +248,51 @@ class IsolatedManager(object): - clean up orphaned private files Performs save on each instance to update its capacity. ''' - hostname_string = '' - for instance in instance_qs: - hostname_string += '{},'.format(instance.hostname) - args = cls._build_args('heartbeat_isolated.yml', hostname_string) - args.extend(['--forks', str(len(instance_qs))]) - env = cls._base_management_env() + # TODO: runner doesn't have a --forks arg + #args.extend(['--forks', str(len(instance_qs))]) try: - facts_path = tempfile.mkdtemp() - env['ANSIBLE_CACHE_PLUGIN'] = 'jsonfile' - env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = facts_path - - buff = StringIO() - timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) - status, rc = IsolatedManager.run_pexpect( - args, cls.awx_playbook_path(), env, buff, - idle_timeout=timeout, job_timeout=timeout, - pexpect_timeout=5 + private_data_dir = tempfile.mkdtemp( + prefix='awx_iso_heartbeat_', + dir=settings.AWX_PROOT_BASE_PATH + ) + self.runner_params = self.build_runner_params([ + instance.hostname for instance in instance_qs + ]) + self.runner_params['private_data_dir'] = private_data_dir + runner_obj = self.run_management_playbook( + 'heartbeat_isolated.yml', + private_data_dir ) - heartbeat_stdout = buff.getvalue().encode('utf-8') - buff.close() - for instance in instance_qs: - output = heartbeat_stdout - task_result = {} - try: - with open(os.path.join(facts_path, instance.hostname), 'r') as facts_data: - output = facts_data.read() - task_result = json.loads(output) - except Exception: - logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output)) - if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result: - task_result = { - 'capacity_cpu': task_result['awx_capacity_cpu'], - 'capacity_mem': task_result['awx_capacity_mem'], - 'version': task_result['awx_capacity_version'] - } - cls.update_capacity(instance, task_result, awx_application_version) - logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname)) - elif instance.capacity == 0: - logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( - instance.hostname)) - else: - logger.warning('Could not update status of isolated instance {}'.format(instance.hostname)) - if instance.is_lost(isolated=True): - instance.capacity = 0 - instance.save(update_fields=['capacity']) - logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format( - instance.hostname, instance.modified)) + if runner_obj.status == 'successful': + for instance in instance_qs: + task_result = {} + try: + task_result = runner_obj.get_fact_cache(instance.hostname) + except Exception: + logger.exception('Failed to read status from isolated instances') + if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result: + task_result = { + 'capacity_cpu': task_result['awx_capacity_cpu'], + 'capacity_mem': task_result['awx_capacity_mem'], + 'version': task_result['awx_capacity_version'] + } + IsolatedManager.update_capacity(instance, task_result) + logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname)) + elif instance.capacity == 0: + logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( + instance.hostname)) + else: + logger.warning('Could not update status of isolated instance {}'.format(instance.hostname)) + if instance.is_lost(isolated=True): + instance.capacity = 0 + instance.save(update_fields=['capacity']) + logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format( + instance.hostname, instance.modified)) finally: - if os.path.exists(facts_path): - shutil.rmtree(facts_path) + if os.path.exists(private_data_dir): + shutil.rmtree(private_data_dir) def run(self, instance, private_data_dir, playbook, module, module_args, event_data_key, ident=None): @@ -393,8 +315,11 @@ class IsolatedManager(object): self.ident = ident self.event_data_key = event_data_key self.instance = instance - self.host = instance.execution_node self.private_data_dir = private_data_dir + self.runner_params = self.build_runner_params( + [instance.execution_node], + verbosity=min(5, self.instance.verbosity) + ) status, rc = self.dispatch(playbook, module, module_args) if status == 'successful': status, rc = self.check() diff --git a/awx/main/expect/run.py b/awx/main/expect/run.py deleted file mode 100755 index 744c145688..0000000000 --- a/awx/main/expect/run.py +++ /dev/null @@ -1,316 +0,0 @@ -#! /usr/bin/env python - -import argparse -import base64 -import codecs -import collections -import logging -import json -import os -import stat -import pipes -import re -import signal -import sys -import threading -import time -from io import StringIO - -import pexpect -import psutil - - -logger = logging.getLogger('awx.main.utils.expect') - - -def args2cmdline(*args): - return ' '.join([pipes.quote(a) for a in args]) - - -def wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock=None, silence_ssh_add=False): - if ssh_key_path: - ssh_add_command = args2cmdline('ssh-add', ssh_key_path) - if silence_ssh_add: - ssh_add_command = ' '.join([ssh_add_command, '2>/dev/null']) - cmd = ' && '.join([ssh_add_command, - args2cmdline('rm', '-f', ssh_key_path), - args2cmdline(*args)]) - args = ['ssh-agent'] - if ssh_auth_sock: - args.extend(['-a', ssh_auth_sock]) - args.extend(['sh', '-c', cmd]) - return args - - -def open_fifo_write(path, data): - '''open_fifo_write opens the fifo named pipe in a new thread. - This blocks the thread until an external process (such as ssh-agent) - reads data from the pipe. - ''' - os.mkfifo(path, 0o600) - threading.Thread( - target=lambda p, d: open(p, 'w').write(d), - args=(path, data) - ).start() - - -def run_pexpect(args, cwd, env, logfile, - cancelled_callback=None, expect_passwords={}, - extra_update_fields=None, idle_timeout=None, job_timeout=0, - pexpect_timeout=5, proot_cmd='bwrap'): - ''' - Run the given command using pexpect to capture output and provide - passwords when requested. - - :param args: a list of `subprocess.call`-style arguments - representing a subprocess e.g., ['ls', '-la'] - :param cwd: the directory in which the subprocess should - run - :param env: a dict containing environment variables for the - subprocess, ala `os.environ` - :param logfile: a file-like object for capturing stdout - :param cancelled_callback: a callable - which returns `True` or `False` - - signifying if the job has been prematurely - cancelled - :param expect_passwords: a dict of regular expression password prompts - to input values, i.e., {r'Password:*?$': - 'some_password'} - :param extra_update_fields: a dict used to specify DB fields which should - be updated on the underlying model - object after execution completes - :param idle_timeout a timeout (in seconds); if new output is not - sent to stdout in this interval, the process - will be terminated - :param job_timeout a timeout (in seconds); if the total job runtime - exceeds this, the process will be killed - :param pexpect_timeout a timeout (in seconds) to wait on - `pexpect.spawn().expect()` calls - :param proot_cmd the command used to isolate processes, `bwrap` - - Returns a tuple (status, return_code) i.e., `('successful', 0)` - ''' - expect_passwords[pexpect.TIMEOUT] = None - expect_passwords[pexpect.EOF] = None - - if not isinstance(expect_passwords, collections.OrderedDict): - # We iterate over `expect_passwords.keys()` and - # `expect_passwords.values()` separately to map matched inputs to - # patterns and choose the proper string to send to the subprocess; - # enforce usage of an OrderedDict so that the ordering of elements in - # `keys()` matches `values()`. - expect_passwords = collections.OrderedDict(expect_passwords) - password_patterns = list(expect_passwords.keys()) - password_values = list(expect_passwords.values()) - - child = pexpect.spawn( - args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, - encoding='utf-8', echo=False, use_poll=True - ) - child.logfile_read = logfile - canceled = False - timed_out = False - errored = False - last_stdout_update = time.time() - - job_start = time.time() - while child.isalive(): - result_id = child.expect(password_patterns, timeout=pexpect_timeout, searchwindowsize=100) - password = password_values[result_id] - if password is not None: - child.sendline(password) - last_stdout_update = time.time() - if cancelled_callback: - try: - canceled = cancelled_callback() - except Exception: - logger.exception('Could not check cancel callback - canceling immediately') - if isinstance(extra_update_fields, dict): - extra_update_fields['job_explanation'] = "System error during job execution, check system logs" - errored = True - else: - canceled = False - if not canceled and job_timeout != 0 and (time.time() - job_start) > job_timeout: - timed_out = True - if isinstance(extra_update_fields, dict): - extra_update_fields['job_explanation'] = "Job terminated due to timeout" - if canceled or timed_out or errored: - handle_termination(child.pid, child.args, proot_cmd, is_cancel=canceled) - if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: - child.close(True) - canceled = True - if errored: - return 'error', child.exitstatus - elif canceled: - return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not timed_out: - return 'successful', child.exitstatus - else: - return 'failed', child.exitstatus - - -def run_isolated_job(private_data_dir, secrets, logfile=sys.stdout): - ''' - Launch `ansible-playbook`, executing a job packaged by - `build_isolated_job_data`. - - :param private_data_dir: an absolute path on the local file system where - job metadata exists (i.e., - `/tmp/ansible_awx_xyz/`) - :param secrets: a dict containing sensitive job metadata, { - 'env': { ... } # environment variables, - 'passwords': { ... } # pexpect password prompts - 'ssh_key_data': 'RSA KEY DATA', - } - :param logfile: a file-like object for capturing stdout - - Returns a tuple (status, return_code) i.e., `('successful', 0)` - ''' - with open(os.path.join(private_data_dir, 'args'), 'r') as args: - args = json.load(args) - - env = secrets.get('env', {}) - expect_passwords = { - re.compile(pattern, re.M): password - for pattern, password in secrets.get('passwords', {}).items() - } - - if 'AD_HOC_COMMAND_ID' in env: - cwd = private_data_dir - else: - cwd = os.path.join(private_data_dir, 'project') - - # write the SSH key data into a fifo read by ssh-agent - ssh_key_data = secrets.get('ssh_key_data') - if ssh_key_data: - ssh_key_path = os.path.join(private_data_dir, 'ssh_key_data') - ssh_auth_sock = os.path.join(private_data_dir, 'ssh_auth.sock') - open_fifo_write(ssh_key_path, ssh_key_data) - args = wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) - - idle_timeout = secrets.get('idle_timeout', 10) - job_timeout = secrets.get('job_timeout', 10) - pexpect_timeout = secrets.get('pexpect_timeout', 5) - - env['AWX_ISOLATED_DATA_DIR'] = private_data_dir - - venv_path = env.get('VIRTUAL_ENV') - if venv_path and not os.path.exists(venv_path): - raise RuntimeError( - 'a valid Python virtualenv does not exist at {}'.format(venv_path) - ) - - return run_pexpect(args, cwd, env, logfile, - expect_passwords=expect_passwords, - idle_timeout=idle_timeout, - job_timeout=job_timeout, - pexpect_timeout=pexpect_timeout) - - -def handle_termination(pid, args, proot_cmd, is_cancel=True): - ''' - Terminate a subprocess spawned by `pexpect`. - - :param pid: the process id of the running the job. - :param args: the args for the job, i.e., ['ansible-playbook', 'abc.yml'] - :param proot_cmd the command used to isolate processes i.e., `bwrap` - :param is_cancel: flag showing whether this termination is caused by - instance's cancel_flag. - ''' - try: - used_proot = proot_cmd.encode('utf-8') in args - if used_proot: - if not psutil: - os.kill(pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=pid) - child_procs = main_proc.children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except (TypeError, psutil.Error): - os.kill(pid, signal.SIGKILL) - else: - os.kill(pid, signal.SIGTERM) - time.sleep(3) - except OSError: - keyword = 'cancel' if is_cancel else 'timeout' - logger.warn("Attempted to %s already finished job, ignoring" % keyword) - - -def __run__(private_data_dir): - buff = StringIO() - with codecs.open(os.path.join(private_data_dir, 'env'), 'r', encoding='utf-8') as f: - for line in f: - buff.write(line) - - artifacts_dir = os.path.join(private_data_dir, 'artifacts') - - # Standard out directed to pickup location without event filtering applied - stdout_filename = os.path.join(artifacts_dir, 'stdout') - os.mknod(stdout_filename, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') - - status, rc = run_isolated_job( - private_data_dir, - json.loads(base64.b64decode(buff.getvalue())), - stdout_handle - ) - for filename, data in [ - ('status', status), - ('rc', rc), - ]: - artifact_path = os.path.join(private_data_dir, 'artifacts', filename) - os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - with open(artifact_path, 'w') as f: - f.write(str(data)) - - -if __name__ == '__main__': - import awx - __version__ = awx.__version__ - parser = argparse.ArgumentParser(description='manage a daemonized, isolated ansible playbook') - parser.add_argument('--version', action='version', version=__version__ + '-isolated') - parser.add_argument('command', choices=['start', 'stop', 'is-alive']) - parser.add_argument('private_data_dir') - args = parser.parse_args() - - private_data_dir = args.private_data_dir - pidfile = os.path.join(private_data_dir, 'pid') - - if args.command == 'start': - # create a file to log stderr in case the daemonized process throws - # an exception before it gets to `pexpect.spawn` - stderr_path = os.path.join(private_data_dir, 'artifacts', 'daemon.log') - if not os.path.exists(stderr_path): - os.mknod(stderr_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) - stderr = open(stderr_path, 'w+') - - import daemon - from daemon.pidfile import TimeoutPIDLockFile - context = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pidfile), - stderr=stderr - ) - with context: - __run__(private_data_dir) - sys.exit(0) - - try: - with open(pidfile, 'r') as f: - pid = int(f.readline()) - except IOError: - sys.exit(1) - - if args.command == 'stop': - try: - with open(os.path.join(private_data_dir, 'args'), 'r') as args: - handle_termination(pid, json.load(args), 'bwrap') - except IOError: - handle_termination(pid, [], 'bwrap') - elif args.command == 'is-alive': - try: - os.kill(pid, signal.SIG_DFL) - sys.exit(0) - except OSError: - sys.exit(1) diff --git a/awx/main/management/commands/test_isolated_connection.py b/awx/main/management/commands/test_isolated_connection.py index 01047cbc44..aa6a936b3d 100644 --- a/awx/main/management/commands/test_isolated_connection.py +++ b/awx/main/management/commands/test_isolated_connection.py @@ -1,13 +1,11 @@ -import os import shutil -import subprocess import sys import tempfile from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from awx.main.expect import run +import ansible_runner class Command(BaseCommand): @@ -25,23 +23,21 @@ class Command(BaseCommand): try: path = tempfile.mkdtemp(prefix='awx_isolated_ssh', dir=settings.AWX_PROOT_BASE_PATH) - args = [ - 'ansible', 'all', '-i', '{},'.format(hostname), '-u', - settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell', - '-a', 'ansible-runner --version', '-vvv' - ] + ssh_key = None if all([ getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None) ]): - ssh_key_path = os.path.join(path, '.isolated') - ssh_auth_sock = os.path.join(path, 'ssh_auth.sock') - run.open_fifo_write(ssh_key_path, settings.AWX_ISOLATED_PRIVATE_KEY) - args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) - try: - print(' '.join(args)) - subprocess.check_call(args) - except subprocess.CalledProcessError as e: - sys.exit(e.returncode) + ssh_key = settings.AWX_ISOLATED_PRIVATE_KEY + res = ansible_runner.interface.run( + private_data_dir=path, + host_pattern='all', + inventory='{} ansible_ssh_user={}'.format(hostname, settings.AWX_ISOLATED_USERNAME), + module='shell', + module_args='ansible-runner --version', + verbosity=3, + ssh_key=ssh_key, + ) + sys.exit(res.rc) finally: shutil.rmtree(path) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 41b48c2489..c8e2f748f6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -453,7 +453,7 @@ def awx_isolated_heartbeat(): # Slow pass looping over isolated IGs and their isolated instances if len(isolated_instance_qs) > 0: logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs]))) - isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) + isolated_manager.IsolatedManager().health_check(isolated_instance_qs) @task() @@ -1192,9 +1192,7 @@ class BaseTask(object): copy_tree(cwd, os.path.join(private_data_dir, 'project')) ansible_runner.utils.dump_artifacts(params) manager_instance = isolated_manager.IsolatedManager( - env, cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag, - job_timeout=self.get_instance_timeout(self.instance), idle_timeout=self.get_idle_timeout(), ) status, rc = manager_instance.run(self.instance, @@ -1216,7 +1214,6 @@ class BaseTask(object): extra_update_fields['job_explanation'] = self.instance.job_explanation except Exception: - # run_pexpect does not throw exceptions for cancel or timeout # this could catch programming or file system errors tb = traceback.format_exc() logger.exception('%s Exception occurred while running task', self.instance.log_format) diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index 8b67ba4183..f1cf382a7c 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -126,23 +126,6 @@ class TestIsolatedManagementTask: inst.save() return inst - @pytest.mark.skip(reason='fix after runner merge') - def test_old_version(self, control_instance, old_version): - update_capacity = isolated_manager.IsolatedManager.update_capacity - - assert old_version.capacity == 103 - with mock.patch('awx.main.tasks.settings', MockSettings()): - # Isolated node is reporting an older version than the cluster - # instance that issued the health check, set capacity to zero. - update_capacity(old_version, {'version': '1.0.0'}, '3.0.0') - assert old_version.capacity == 0 - - # Upgrade was completed, health check playbook now reports matching - # version, make sure capacity is set. - update_capacity(old_version, {'version': '5.0.0-things', - 'capacity_cpu':103, 'capacity_mem':103}, '5.0.0-stuff') - assert old_version.capacity == 103 - def test_takes_action(self, control_instance, needs_updating): original_isolated_instance = needs_updating.instances.all().first() with mock.patch('awx.main.tasks.settings', MockSettings()):