Merge pull request #3534 from ryanpetrello/iso-pexpect-cleanup

replace our usage of pexpect in IsolatedManager with ansible-runner

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2019-03-29 13:17:05 +00:00 committed by GitHub
commit 2ab290ff2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 172 additions and 886 deletions

View File

@ -1,3 +1,4 @@
import fnmatch
import json
import os
import shutil
@ -5,12 +6,11 @@ import stat
import tempfile
import time
import logging
from io import StringIO
from django.conf import settings
import ansible_runner
import awx
from awx.main.expect import run
from awx.main.utils import get_system_task_capacity
from awx.main.queue import CallbackQueueDispatcher
@ -18,73 +18,96 @@ logger = logging.getLogger('awx.isolated.manager')
playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
def set_pythonpath(venv_libdir, env):
env.pop('PYTHONPATH', None) # default to none if no python_ver matches
for version in os.listdir(venv_libdir):
if fnmatch.fnmatch(version, 'python[23].*'):
if os.path.isdir(os.path.join(venv_libdir, version)):
env['PYTHONPATH'] = os.path.join(venv_libdir, version, "site-packages") + ":"
break
class IsolatedManager(object):
def __init__(self, env, cancelled_callback=None, job_timeout=0,
idle_timeout=None):
def __init__(self, cancelled_callback=None):
"""
:param env: a dict containing environment variables for the
subprocess, ala `os.environ`
:param cancelled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely
cancelled
:param job_timeout a timeout (in seconds); if the total job runtime
exceeds this, the process will be killed
:param idle_timeout a timeout (in seconds); if new output is not
sent to stdout in this interval, the process
will be terminated
"""
self.management_env = self._base_management_env()
self.cancelled_callback = cancelled_callback
self.job_timeout = job_timeout
self.idle_timeout = idle_timeout
self.idle_timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
self.started_at = None
@staticmethod
def _base_management_env():
'''
Returns environment variables to use when running a playbook
that manages the isolated instance.
Use of normal job callback and other such configurations are avoided.
'''
def build_runner_params(self, hosts, verbosity=1):
env = dict(os.environ.items())
env['ANSIBLE_RETRY_FILES_ENABLED'] = 'False'
env['ANSIBLE_HOST_KEY_CHECKING'] = 'False'
env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'plugins', 'isolated')
return env
set_pythonpath(os.path.join(settings.ANSIBLE_VENV_PATH, 'lib'), env)
@staticmethod
def _build_args(playbook, hosts, extra_vars=None):
'''
Returns list of Ansible CLI command arguments for a management task
def finished_callback(runner_obj):
if runner_obj.status == 'failed' and runner_obj.config.playbook != 'check_isolated.yml':
# failed for clean_isolated.yml just means the playbook hasn't
# exited on the isolated host
stdout = runner_obj.stdout.read()
playbook_logger.error(stdout)
elif runner_obj.status == 'timeout':
# this means that the default idle timeout of
# (2 * AWX_ISOLATED_CONNECTION_TIMEOUT) was exceeded
# (meaning, we tried to sync with an isolated node, and we got
# no new output for 2 * AWX_ISOLATED_CONNECTION_TIMEOUT seconds)
# this _usually_ means SSH key auth from the controller ->
# isolated didn't work, and ssh is hung waiting on interactive
# input e.g.,
#
# awx@isolated's password:
stdout = runner_obj.stdout.read()
playbook_logger.error(stdout)
else:
playbook_logger.info(runner_obj.stdout.read())
:param playbook: name of the playbook to run
:param hosts: host pattern to operate on, ex. "localhost,"
:param extra_vars: optional dictionary of extra_vars to apply
'''
args = [
'ansible-playbook',
playbook,
'-u', settings.AWX_ISOLATED_USERNAME,
'-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT),
'-i', hosts
]
if extra_vars:
args.extend(['-e', json.dumps(extra_vars)])
if settings.AWX_ISOLATED_VERBOSITY:
args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY)))
return args
inventory = '\n'.join([
'{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME)
for host in hosts
])
@classmethod
def awx_playbook_path(cls):
return os.path.abspath(os.path.join(
os.path.dirname(awx.__file__),
'playbooks'
))
return {
'project_dir': os.path.abspath(os.path.join(
os.path.dirname(awx.__file__),
'playbooks'
)),
'inventory': inventory,
'envvars': env,
'finished_callback': finished_callback,
'verbosity': verbosity,
'cancel_callback': self.cancelled_callback,
'settings': {
'idle_timeout': self.idle_timeout,
'job_timeout': settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5),
},
}
def path_to(self, *args):
return os.path.join(self.private_data_dir, *args)
def run_management_playbook(self, playbook, private_data_dir, **kw):
iso_dir = tempfile.mkdtemp(
prefix=playbook,
dir=private_data_dir
)
params = self.runner_params.copy()
params['playbook'] = playbook
params['private_data_dir'] = iso_dir
params.update(**kw)
if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,
getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)
]):
params['ssh_key'] = settings.AWX_ISOLATED_PRIVATE_KEY
return ansible_runner.interface.run(**params)
def dispatch(self, playbook=None, module=None, module_args=None):
'''
Ship the runner payload to a remote host for isolated execution.
@ -92,71 +115,7 @@ class IsolatedManager(object):
self.handled_events = set()
self.started_at = time.time()
self.build_isolated_job_data()
extra_vars = {
'src': self.private_data_dir,
'dest': settings.AWX_PROOT_BASE_PATH,
'ident': self.ident
}
if playbook:
extra_vars['playbook'] = playbook
if module and module_args:
extra_vars['module'] = module
extra_vars['module_args'] = module_args
# Run ansible-playbook to launch a job on the isolated host. This:
#
# - sets up a temporary directory for proot/bwrap (if necessary)
# - copies encrypted job data from the controlling host to the isolated host (with rsync)
# - writes the encryption secret to a named pipe on the isolated host
# - launches ansible-runner
args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars)
if self.instance.verbosity:
args.append('-%s' % ('v' * min(5, self.instance.verbosity)))
buff = StringIO()
logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id))
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
idle_timeout=self.idle_timeout,
job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
pexpect_timeout=5
)
output = buff.getvalue()
playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output))
if status != 'successful':
for event_data in [
{'event': 'verbose', 'stdout': output},
{'event': 'EOF', 'final_counter': 1},
]:
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
return status, rc
@classmethod
def run_pexpect(cls, pexpect_args, *args, **kw):
isolated_ssh_path = None
try:
if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,
getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)
]):
isolated_ssh_path = tempfile.mkdtemp(prefix='awx_isolated', dir=settings.AWX_PROOT_BASE_PATH)
os.chmod(isolated_ssh_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
isolated_key = os.path.join(isolated_ssh_path, '.isolated')
ssh_sock = os.path.join(isolated_ssh_path, '.isolated_ssh_auth.sock')
run.open_fifo_write(isolated_key, settings.AWX_ISOLATED_PRIVATE_KEY)
pexpect_args = run.wrap_args_with_ssh_agent(pexpect_args, isolated_key, ssh_sock, silence_ssh_add=True)
return run.run_pexpect(pexpect_args, *args, **kw)
finally:
if isolated_ssh_path:
shutil.rmtree(isolated_ssh_path)
def build_isolated_job_data(self):
'''
Write metadata related to the playbook run into a collection of files
on the local file system.
'''
# exclude certain files from the rsync
rsync_exclude = [
# don't rsync source control metadata (it can be huge!)
'- /project/.git',
@ -176,6 +135,23 @@ class IsolatedManager(object):
f.write(data)
os.chmod(path, stat.S_IRUSR)
extravars = {
'src': self.private_data_dir,
'dest': settings.AWX_PROOT_BASE_PATH,
'ident': self.ident
}
if playbook:
extravars['playbook'] = playbook
if module and module_args:
extravars['module'] = module
extravars['module_args'] = module_args
logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id))
runner_obj = self.run_management_playbook('run_isolated.yml',
self.private_data_dir,
extravars=extravars)
return runner_obj.status, runner_obj.rc
def check(self, interval=None):
"""
Repeatedly poll the isolated node to determine if the job has run.
@ -191,22 +167,12 @@ class IsolatedManager(object):
:param interval: an interval (in seconds) to wait between status polls
"""
interval = interval if interval is not None else settings.AWX_ISOLATED_CHECK_INTERVAL
extra_vars = {'src': self.private_data_dir}
args = self._build_args('check_isolated.yml', '%s,' % self.host, extra_vars)
if self.instance.verbosity:
args.append('-%s' % ('v' * min(5, self.instance.verbosity)))
extravars = {'src': self.private_data_dir}
status = 'failed'
output = ''
rc = None
buff = StringIO()
last_check = time.time()
job_timeout = remaining = self.job_timeout
dispatcher = CallbackQueueDispatcher()
while status == 'failed':
if job_timeout != 0:
remaining = max(0, job_timeout - (time.time() - self.started_at))
canceled = self.cancelled_callback() if self.cancelled_callback else False
if not canceled and time.time() - last_check < interval:
# If the job isn't cancelled, but we haven't waited `interval` seconds, wait longer
@ -216,18 +182,11 @@ class IsolatedManager(object):
if canceled:
logger.warning('Isolated job {} was manually cancelled.'.format(self.instance.id))
buff = StringIO()
logger.debug('Checking on isolated job {} with `check_isolated.yml`.'.format(self.instance.id))
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
cancelled_callback=self.cancelled_callback,
idle_timeout=remaining,
job_timeout=remaining,
pexpect_timeout=5,
proot_cmd='bwrap'
)
output = buff.getvalue().encode('utf-8')
playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output))
runner_obj = self.run_management_playbook('check_isolated.yml',
self.private_data_dir,
extravars=extravars)
status, rc = runner_obj.status, runner_obj.rc
# discover new events and ingest them
events_path = self.path_to('artifacts', self.ident, 'job_events')
@ -273,30 +232,21 @@ class IsolatedManager(object):
def cleanup(self):
# If the job failed for any reason, make a last-ditch effort at cleanup
extra_vars = {
extravars = {
'private_data_dir': self.private_data_dir,
'cleanup_dirs': [
self.private_data_dir,
],
}
args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars)
logger.debug('Cleaning up job {} on isolated host with `clean_isolated.yml` playbook.'.format(self.instance.id))
buff = StringIO()
timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
status, rc = IsolatedManager.run_pexpect(
args, self.awx_playbook_path(), self.management_env, buff,
idle_timeout=timeout, job_timeout=timeout,
pexpect_timeout=5
self.run_management_playbook(
'clean_isolated.yml',
self.private_data_dir,
extravars=extravars
)
output = buff.getvalue().encode('utf-8')
playbook_logger.info('Isolated job {} cleanup:\n{}'.format(self.instance.id, output))
if status != 'successful':
# stdout_handle is closed by this point so writing output to logs is our only option
logger.warning('Isolated job {} cleanup error, output:\n{}'.format(self.instance.id, output))
@classmethod
def update_capacity(cls, instance, task_result, awx_application_version):
def update_capacity(cls, instance, task_result):
instance.version = 'ansible-runner-{}'.format(task_result['version'])
if instance.capacity == 0 and task_result['capacity_cpu']:
@ -308,8 +258,7 @@ class IsolatedManager(object):
mem_capacity=int(task_result['capacity_mem']))
instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified'])
@classmethod
def health_check(cls, instance_qs, awx_application_version):
def health_check(self, instance_qs):
'''
:param instance_qs: List of Django objects representing the
isolated instances to manage
@ -319,58 +268,51 @@ class IsolatedManager(object):
- clean up orphaned private files
Performs save on each instance to update its capacity.
'''
hostname_string = ''
for instance in instance_qs:
hostname_string += '{},'.format(instance.hostname)
args = cls._build_args('heartbeat_isolated.yml', hostname_string)
args.extend(['--forks', str(len(instance_qs))])
env = cls._base_management_env()
# TODO: runner doesn't have a --forks arg
#args.extend(['--forks', str(len(instance_qs))])
try:
facts_path = tempfile.mkdtemp()
env['ANSIBLE_CACHE_PLUGIN'] = 'jsonfile'
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = facts_path
buff = StringIO()
timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT)
status, rc = IsolatedManager.run_pexpect(
args, cls.awx_playbook_path(), env, buff,
idle_timeout=timeout, job_timeout=timeout,
pexpect_timeout=5
private_data_dir = tempfile.mkdtemp(
prefix='awx_iso_heartbeat_',
dir=settings.AWX_PROOT_BASE_PATH
)
self.runner_params = self.build_runner_params([
instance.hostname for instance in instance_qs
])
self.runner_params['private_data_dir'] = private_data_dir
runner_obj = self.run_management_playbook(
'heartbeat_isolated.yml',
private_data_dir
)
heartbeat_stdout = buff.getvalue().encode('utf-8')
buff.close()
for instance in instance_qs:
output = heartbeat_stdout
task_result = {}
try:
with open(os.path.join(facts_path, instance.hostname), 'r') as facts_data:
output = facts_data.read()
task_result = json.loads(output)
except Exception:
logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output))
if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result:
task_result = {
'capacity_cpu': task_result['awx_capacity_cpu'],
'capacity_mem': task_result['awx_capacity_mem'],
'version': task_result['awx_capacity_version']
}
cls.update_capacity(instance, task_result, awx_application_version)
logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname))
elif instance.capacity == 0:
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
instance.hostname))
else:
logger.warning('Could not update status of isolated instance {}'.format(instance.hostname))
if instance.is_lost(isolated=True):
instance.capacity = 0
instance.save(update_fields=['capacity'])
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(
instance.hostname, instance.modified))
if runner_obj.status == 'successful':
for instance in instance_qs:
task_result = {}
try:
task_result = runner_obj.get_fact_cache(instance.hostname)
except Exception:
logger.exception('Failed to read status from isolated instances')
if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result:
task_result = {
'capacity_cpu': task_result['awx_capacity_cpu'],
'capacity_mem': task_result['awx_capacity_mem'],
'version': task_result['awx_capacity_version']
}
IsolatedManager.update_capacity(instance, task_result)
logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname))
elif instance.capacity == 0:
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
instance.hostname))
else:
logger.warning('Could not update status of isolated instance {}'.format(instance.hostname))
if instance.is_lost(isolated=True):
instance.capacity = 0
instance.save(update_fields=['capacity'])
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(
instance.hostname, instance.modified))
finally:
if os.path.exists(facts_path):
shutil.rmtree(facts_path)
if os.path.exists(private_data_dir):
shutil.rmtree(private_data_dir)
def run(self, instance, private_data_dir, playbook, module, module_args,
event_data_key, ident=None):
@ -393,10 +335,18 @@ class IsolatedManager(object):
self.ident = ident
self.event_data_key = event_data_key
self.instance = instance
self.host = instance.execution_node
self.private_data_dir = private_data_dir
self.runner_params = self.build_runner_params(
[instance.execution_node],
verbosity=min(5, self.instance.verbosity)
)
status, rc = self.dispatch(playbook, module, module_args)
if status == 'successful':
status, rc = self.check()
else:
# emit an EOF event
event_data = {'event': 'EOF', 'final_counter': 0}
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
self.cleanup()
return status, rc

View File

@ -1,316 +0,0 @@
#! /usr/bin/env python
import argparse
import base64
import codecs
import collections
import logging
import json
import os
import stat
import pipes
import re
import signal
import sys
import threading
import time
from io import StringIO
import pexpect
import psutil
logger = logging.getLogger('awx.main.utils.expect')
def args2cmdline(*args):
return ' '.join([pipes.quote(a) for a in args])
def wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock=None, silence_ssh_add=False):
if ssh_key_path:
ssh_add_command = args2cmdline('ssh-add', ssh_key_path)
if silence_ssh_add:
ssh_add_command = ' '.join([ssh_add_command, '2>/dev/null'])
cmd = ' && '.join([ssh_add_command,
args2cmdline('rm', '-f', ssh_key_path),
args2cmdline(*args)])
args = ['ssh-agent']
if ssh_auth_sock:
args.extend(['-a', ssh_auth_sock])
args.extend(['sh', '-c', cmd])
return args
def open_fifo_write(path, data):
'''open_fifo_write opens the fifo named pipe in a new thread.
This blocks the thread until an external process (such as ssh-agent)
reads data from the pipe.
'''
os.mkfifo(path, 0o600)
threading.Thread(
target=lambda p, d: open(p, 'w').write(d),
args=(path, data)
).start()
def run_pexpect(args, cwd, env, logfile,
cancelled_callback=None, expect_passwords={},
extra_update_fields=None, idle_timeout=None, job_timeout=0,
pexpect_timeout=5, proot_cmd='bwrap'):
'''
Run the given command using pexpect to capture output and provide
passwords when requested.
:param args: a list of `subprocess.call`-style arguments
representing a subprocess e.g., ['ls', '-la']
:param cwd: the directory in which the subprocess should
run
:param env: a dict containing environment variables for the
subprocess, ala `os.environ`
:param logfile: a file-like object for capturing stdout
:param cancelled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely
cancelled
:param expect_passwords: a dict of regular expression password prompts
to input values, i.e., {r'Password:*?$':
'some_password'}
:param extra_update_fields: a dict used to specify DB fields which should
be updated on the underlying model
object after execution completes
:param idle_timeout a timeout (in seconds); if new output is not
sent to stdout in this interval, the process
will be terminated
:param job_timeout a timeout (in seconds); if the total job runtime
exceeds this, the process will be killed
:param pexpect_timeout a timeout (in seconds) to wait on
`pexpect.spawn().expect()` calls
:param proot_cmd the command used to isolate processes, `bwrap`
Returns a tuple (status, return_code) i.e., `('successful', 0)`
'''
expect_passwords[pexpect.TIMEOUT] = None
expect_passwords[pexpect.EOF] = None
if not isinstance(expect_passwords, collections.OrderedDict):
# We iterate over `expect_passwords.keys()` and
# `expect_passwords.values()` separately to map matched inputs to
# patterns and choose the proper string to send to the subprocess;
# enforce usage of an OrderedDict so that the ordering of elements in
# `keys()` matches `values()`.
expect_passwords = collections.OrderedDict(expect_passwords)
password_patterns = list(expect_passwords.keys())
password_values = list(expect_passwords.values())
child = pexpect.spawn(
args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True,
encoding='utf-8', echo=False, use_poll=True
)
child.logfile_read = logfile
canceled = False
timed_out = False
errored = False
last_stdout_update = time.time()
job_start = time.time()
while child.isalive():
result_id = child.expect(password_patterns, timeout=pexpect_timeout, searchwindowsize=100)
password = password_values[result_id]
if password is not None:
child.sendline(password)
last_stdout_update = time.time()
if cancelled_callback:
try:
canceled = cancelled_callback()
except Exception:
logger.exception('Could not check cancel callback - canceling immediately')
if isinstance(extra_update_fields, dict):
extra_update_fields['job_explanation'] = "System error during job execution, check system logs"
errored = True
else:
canceled = False
if not canceled and job_timeout != 0 and (time.time() - job_start) > job_timeout:
timed_out = True
if isinstance(extra_update_fields, dict):
extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if canceled or timed_out or errored:
handle_termination(child.pid, child.args, proot_cmd, is_cancel=canceled)
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True)
canceled = True
if errored:
return 'error', child.exitstatus
elif canceled:
return 'canceled', child.exitstatus
elif child.exitstatus == 0 and not timed_out:
return 'successful', child.exitstatus
else:
return 'failed', child.exitstatus
def run_isolated_job(private_data_dir, secrets, logfile=sys.stdout):
'''
Launch `ansible-playbook`, executing a job packaged by
`build_isolated_job_data`.
:param private_data_dir: an absolute path on the local file system where
job metadata exists (i.e.,
`/tmp/ansible_awx_xyz/`)
:param secrets: a dict containing sensitive job metadata, {
'env': { ... } # environment variables,
'passwords': { ... } # pexpect password prompts
'ssh_key_data': 'RSA KEY DATA',
}
:param logfile: a file-like object for capturing stdout
Returns a tuple (status, return_code) i.e., `('successful', 0)`
'''
with open(os.path.join(private_data_dir, 'args'), 'r') as args:
args = json.load(args)
env = secrets.get('env', {})
expect_passwords = {
re.compile(pattern, re.M): password
for pattern, password in secrets.get('passwords', {}).items()
}
if 'AD_HOC_COMMAND_ID' in env:
cwd = private_data_dir
else:
cwd = os.path.join(private_data_dir, 'project')
# write the SSH key data into a fifo read by ssh-agent
ssh_key_data = secrets.get('ssh_key_data')
if ssh_key_data:
ssh_key_path = os.path.join(private_data_dir, 'ssh_key_data')
ssh_auth_sock = os.path.join(private_data_dir, 'ssh_auth.sock')
open_fifo_write(ssh_key_path, ssh_key_data)
args = wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
idle_timeout = secrets.get('idle_timeout', 10)
job_timeout = secrets.get('job_timeout', 10)
pexpect_timeout = secrets.get('pexpect_timeout', 5)
env['AWX_ISOLATED_DATA_DIR'] = private_data_dir
venv_path = env.get('VIRTUAL_ENV')
if venv_path and not os.path.exists(venv_path):
raise RuntimeError(
'a valid Python virtualenv does not exist at {}'.format(venv_path)
)
return run_pexpect(args, cwd, env, logfile,
expect_passwords=expect_passwords,
idle_timeout=idle_timeout,
job_timeout=job_timeout,
pexpect_timeout=pexpect_timeout)
def handle_termination(pid, args, proot_cmd, is_cancel=True):
'''
Terminate a subprocess spawned by `pexpect`.
:param pid: the process id of the running the job.
:param args: the args for the job, i.e., ['ansible-playbook', 'abc.yml']
:param proot_cmd the command used to isolate processes i.e., `bwrap`
:param is_cancel: flag showing whether this termination is caused by
instance's cancel_flag.
'''
try:
used_proot = proot_cmd.encode('utf-8') in args
if used_proot:
if not psutil:
os.kill(pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=pid)
child_procs = main_proc.children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except (TypeError, psutil.Error):
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGTERM)
time.sleep(3)
except OSError:
keyword = 'cancel' if is_cancel else 'timeout'
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
def __run__(private_data_dir):
buff = StringIO()
with codecs.open(os.path.join(private_data_dir, 'env'), 'r', encoding='utf-8') as f:
for line in f:
buff.write(line)
artifacts_dir = os.path.join(private_data_dir, 'artifacts')
# Standard out directed to pickup location without event filtering applied
stdout_filename = os.path.join(artifacts_dir, 'stdout')
os.mknod(stdout_filename, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
status, rc = run_isolated_job(
private_data_dir,
json.loads(base64.b64decode(buff.getvalue())),
stdout_handle
)
for filename, data in [
('status', status),
('rc', rc),
]:
artifact_path = os.path.join(private_data_dir, 'artifacts', filename)
os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
with open(artifact_path, 'w') as f:
f.write(str(data))
if __name__ == '__main__':
import awx
__version__ = awx.__version__
parser = argparse.ArgumentParser(description='manage a daemonized, isolated ansible playbook')
parser.add_argument('--version', action='version', version=__version__ + '-isolated')
parser.add_argument('command', choices=['start', 'stop', 'is-alive'])
parser.add_argument('private_data_dir')
args = parser.parse_args()
private_data_dir = args.private_data_dir
pidfile = os.path.join(private_data_dir, 'pid')
if args.command == 'start':
# create a file to log stderr in case the daemonized process throws
# an exception before it gets to `pexpect.spawn`
stderr_path = os.path.join(private_data_dir, 'artifacts', 'daemon.log')
if not os.path.exists(stderr_path):
os.mknod(stderr_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
stderr = open(stderr_path, 'w+')
import daemon
from daemon.pidfile import TimeoutPIDLockFile
context = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pidfile),
stderr=stderr
)
with context:
__run__(private_data_dir)
sys.exit(0)
try:
with open(pidfile, 'r') as f:
pid = int(f.readline())
except IOError:
sys.exit(1)
if args.command == 'stop':
try:
with open(os.path.join(private_data_dir, 'args'), 'r') as args:
handle_termination(pid, json.load(args), 'bwrap')
except IOError:
handle_termination(pid, [], 'bwrap')
elif args.command == 'is-alive':
try:
os.kill(pid, signal.SIG_DFL)
sys.exit(0)
except OSError:
sys.exit(1)

View File

@ -1,13 +1,14 @@
import os
import shutil
import subprocess
import sys
import tempfile
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from awx.main.expect import run
import ansible_runner
from awx.main.expect.isolated_manager import set_pythonpath
class Command(BaseCommand):
@ -25,23 +26,24 @@ class Command(BaseCommand):
try:
path = tempfile.mkdtemp(prefix='awx_isolated_ssh', dir=settings.AWX_PROOT_BASE_PATH)
args = [
'ansible', 'all', '-i', '{},'.format(hostname), '-u',
settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell',
'-a', 'ansible-runner --version', '-vvv'
]
ssh_key = None
if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,
getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)
]):
ssh_key_path = os.path.join(path, '.isolated')
ssh_auth_sock = os.path.join(path, 'ssh_auth.sock')
run.open_fifo_write(ssh_key_path, settings.AWX_ISOLATED_PRIVATE_KEY)
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
try:
print(' '.join(args))
subprocess.check_call(args)
except subprocess.CalledProcessError as e:
sys.exit(e.returncode)
ssh_key = settings.AWX_ISOLATED_PRIVATE_KEY
env = dict(os.environ.items())
set_pythonpath(os.path.join(settings.ANSIBLE_VENV_PATH, 'lib'), env)
res = ansible_runner.interface.run(
private_data_dir=path,
host_pattern='all',
inventory='{} ansible_ssh_user={}'.format(hostname, settings.AWX_ISOLATED_USERNAME),
module='shell',
module_args='ansible-runner --version',
envvars=env,
verbosity=3,
ssh_key=ssh_key,
)
sys.exit(res.rc)
finally:
shutil.rmtree(path)

View File

@ -4,7 +4,6 @@
# Python
from collections import OrderedDict, namedtuple
import errno
import fnmatch
import functools
import importlib
import json
@ -467,7 +466,7 @@ def awx_isolated_heartbeat():
# Slow pass looping over isolated IGs and their isolated instances
if len(isolated_instance_qs) > 0:
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
isolated_manager.IsolatedManager().health_check(isolated_instance_qs)
@task()
@ -884,12 +883,8 @@ class BaseTask(object):
raise RuntimeError(
'a valid Python virtualenv does not exist at {}'.format(venv_path)
)
env.pop('PYTHONPATH', None) # default to none if no python_ver matches
for version in os.listdir(venv_libdir):
if fnmatch.fnmatch(version, 'python[23].*'):
if os.path.isdir(os.path.join(venv_libdir, version)):
env['PYTHONPATH'] = os.path.join(venv_libdir, version, "site-packages") + ":"
break
isolated_manager.set_pythonpath(venv_libdir, env)
def add_awx_venv(self, env):
env['VIRTUAL_ENV'] = settings.AWX_VENV_PATH
@ -965,9 +960,6 @@ class BaseTask(object):
def build_credentials_list(self, instance):
return []
def get_idle_timeout(self):
return None
def get_instance_timeout(self, instance):
global_timeout_setting_name = instance._global_timeout_setting()
if global_timeout_setting_name:
@ -1168,7 +1160,6 @@ class BaseTask(object):
'finished_callback': self.finished_callback,
'status_handler': self.status_handler,
'settings': {
'idle_timeout': self.get_idle_timeout() or "",
'job_timeout': self.get_instance_timeout(self.instance),
'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5),
**process_isolation_params,
@ -1206,10 +1197,7 @@ class BaseTask(object):
copy_tree(cwd, os.path.join(private_data_dir, 'project'))
ansible_runner.utils.dump_artifacts(params)
manager_instance = isolated_manager.IsolatedManager(
env,
cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag,
job_timeout=self.get_instance_timeout(self.instance),
idle_timeout=self.get_idle_timeout(),
cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag
)
status, rc = manager_instance.run(self.instance,
private_data_dir,
@ -1230,7 +1218,6 @@ class BaseTask(object):
extra_update_fields['job_explanation'] = self.instance.job_explanation
except Exception:
# run_pexpect does not throw exceptions for cancel or timeout
# this could catch programming or file system errors
tb = traceback.format_exc()
logger.exception('%s Exception occurred while running task', self.instance.log_format)
@ -1514,9 +1501,6 @@ class RunJob(BaseTask):
def build_credentials_list(self, job):
return job.credentials.all()
def get_idle_timeout(self):
return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', None)
def get_password_prompts(self, passwords={}):
d = super(RunJob, self).get_password_prompts(passwords)
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'
@ -1790,9 +1774,6 @@ class RunProjectUpdate(BaseTask):
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
return d
def get_idle_timeout(self):
return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None)
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
scm_revision = project_update.project.scm_revision
inv_update_class = InventoryUpdate._get_task_class()
@ -2151,9 +2132,6 @@ class RunInventoryUpdate(BaseTask):
# All credentials not used by inventory source injector
return inventory_update.get_extra_credentials()
def get_idle_timeout(self):
return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', None)
def pre_run_hook(self, inventory_update):
source_project = None
if inventory_update.inventory_source:
@ -2341,9 +2319,6 @@ class RunAdHocCommand(BaseTask):
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
return None
def get_idle_timeout(self):
return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', None)
def get_password_prompts(self, passwords={}):
d = super(RunAdHocCommand, self).get_password_prompts()
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'

View File

@ -126,23 +126,6 @@ class TestIsolatedManagementTask:
inst.save()
return inst
@pytest.mark.skip(reason='fix after runner merge')
def test_old_version(self, control_instance, old_version):
update_capacity = isolated_manager.IsolatedManager.update_capacity
assert old_version.capacity == 103
with mock.patch('awx.main.tasks.settings', MockSettings()):
# Isolated node is reporting an older version than the cluster
# instance that issued the health check, set capacity to zero.
update_capacity(old_version, {'version': '1.0.0'}, '3.0.0')
assert old_version.capacity == 0
# Upgrade was completed, health check playbook now reports matching
# version, make sure capacity is set.
update_capacity(old_version, {'version': '5.0.0-things',
'capacity_cpu':103, 'capacity_mem':103}, '5.0.0-stuff')
assert old_version.capacity == 103
def test_takes_action(self, control_instance, needs_updating):
original_isolated_instance = needs_updating.instances.all().first()
with mock.patch('awx.main.tasks.settings', MockSettings()):

View File

@ -1,308 +0,0 @@
# -*- coding: utf-8 -*-
import os
import pytest
import shutil
import stat
import tempfile
import time
from io import StringIO
from unittest import mock
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from django.utils.encoding import smart_str, smart_bytes
from awx.main.expect import run, isolated_manager
from django.conf import settings
HERE, FILENAME = os.path.split(__file__)
@pytest.fixture(scope='function')
def rsa_key(request):
passphrase = 'passme'
key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend()
)
return (
smart_str(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.BestAvailableEncryption(smart_bytes(passphrase))
)),
passphrase
)
@pytest.fixture(scope='function')
def private_data_dir(request):
path = tempfile.mkdtemp(prefix='ansible_awx_unit_test')
request.addfinalizer(lambda: shutil.rmtree(path))
return path
@pytest.fixture(autouse=True)
def mock_sleep(request):
# the process teardown mechanism uses `time.sleep` to wait on processes to
# respond to SIGTERM; these are tests and don't care about being nice
m = mock.patch('time.sleep')
m.start()
request.addfinalizer(m.stop)
def test_simple_spawn():
stdout = StringIO()
status, rc = run.run_pexpect(
['ls', '-la'],
HERE,
{},
stdout,
cancelled_callback=lambda: False,
)
assert status == 'successful'
assert rc == 0
# assert FILENAME in stdout.getvalue()
def test_error_rc():
stdout = StringIO()
status, rc = run.run_pexpect(
['ls', '-nonsense'],
HERE,
{},
stdout,
cancelled_callback=lambda: False,
)
assert status == 'failed'
# I'd expect 2, but we shouldn't risk making this test platform-dependent
assert rc > 0
def test_cancel_callback_error():
stdout = StringIO()
def bad_callback():
raise Exception('unique exception')
extra_fields = {}
status, rc = run.run_pexpect(
['sleep', '2'],
HERE,
{},
stdout,
cancelled_callback=bad_callback,
extra_update_fields=extra_fields
)
assert status == 'error'
assert rc == 0
assert extra_fields['job_explanation'] == "System error during job execution, check system logs"
@pytest.mark.skip(reason='fix after runner merge')
@pytest.mark.timeout(3) # https://github.com/ansible/tower/issues/2391#issuecomment-401946895
@pytest.mark.parametrize('value', ['abc123', 'Iñtërnâtiônàlizætiøn'])
def test_env_vars(value):
stdout = StringIO()
status, rc = run.run_pexpect(
['python', '-c', 'import os; print os.getenv("X_MY_ENV")'],
HERE,
{'X_MY_ENV': value},
stdout,
cancelled_callback=lambda: False,
)
assert status == 'successful'
assert rc == 0
assert value in stdout.getvalue()
def test_manual_cancellation():
stdout = StringIO()
status, rc = run.run_pexpect(
['python', '-c', 'print raw_input("Password: ")'],
HERE,
{},
stdout,
cancelled_callback=lambda: True, # this callable will cause cancellation
# the lack of password inputs will cause stdin to hang
pexpect_timeout=0,
)
assert status == 'canceled'
@pytest.mark.skip(reason='fix after runner merge')
def test_build_isolated_job_data(private_data_dir, rsa_key):
pem, passphrase = rsa_key
mgr = isolated_manager.IsolatedManager(
['ls', '-la'], HERE, {}, StringIO(), ''
)
mgr.private_data_dir = private_data_dir
mgr.build_isolated_job_data()
path = os.path.join(private_data_dir, 'project')
assert os.path.isdir(path)
# <private_data_dir>/project is a soft link to HERE, which is the directory
# _this_ test file lives in
assert os.path.exists(os.path.join(path, FILENAME))
path = os.path.join(private_data_dir, 'artifacts')
assert os.path.isdir(path)
assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR # user rwx
path = os.path.join(private_data_dir, 'args')
with open(path, 'r') as f:
assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IRUSR # user r/o
assert f.read() == '["ls", "-la"]'
path = os.path.join(private_data_dir, '.rsync-filter')
with open(path, 'r') as f:
data = f.read()
assert data == '\n'.join([
'- /project/.git',
'- /project/.svn',
'- /project/.hg',
'- /artifacts/job_events/*-partial.json.tmp',
'- /env'
])
@pytest.mark.skip(reason='fix after runner merge')
def test_run_isolated_job(private_data_dir, rsa_key):
env = {'JOB_ID': '1'}
pem, passphrase = rsa_key
mgr = isolated_manager.IsolatedManager(
['ls', '-la'], HERE, env, StringIO(), ''
)
mgr.private_data_dir = private_data_dir
secrets = {
'env': env,
'passwords': {
r'Enter passphrase for .*:\s*?$': passphrase
},
'ssh_key_data': pem
}
mgr.build_isolated_job_data()
stdout = StringIO()
# Mock environment variables for callback module
status, rc = run.run_isolated_job(private_data_dir, secrets, stdout)
assert status == 'successful'
assert rc == 0
assert FILENAME in stdout.getvalue()
assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir
@pytest.mark.skip(reason='fix after runner merge')
def test_run_isolated_adhoc_command(private_data_dir, rsa_key):
env = {'AD_HOC_COMMAND_ID': '1'}
pem, passphrase = rsa_key
mgr = isolated_manager.IsolatedManager(
['pwd'], HERE, env, StringIO(), ''
)
mgr.private_data_dir = private_data_dir
secrets = {
'env': env,
'passwords': {
r'Enter passphrase for .*:\s*?$': passphrase
},
'ssh_key_data': pem
}
mgr.build_isolated_job_data()
stdout = StringIO()
# Mock environment variables for callback module
with mock.patch('os.getenv') as env_mock:
env_mock.return_value = '/path/to/awx/lib'
status, rc = run.run_isolated_job(private_data_dir, secrets, stdout)
assert status == 'successful'
assert rc == 0
# for ad-hoc jobs, `ansible` is invoked from the `private_data_dir`, so
# an ad-hoc command that runs `pwd` should print `private_data_dir` to stdout
assert private_data_dir in stdout.getvalue()
assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir
@pytest.mark.skip(reason='fix after runner merge')
def test_check_isolated_job(private_data_dir, rsa_key):
pem, passphrase = rsa_key
stdout = StringIO()
mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '')
mgr.private_data_dir = private_data_dir
mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity'])
mgr.started_at = time.time()
mgr.host = 'isolated-host'
os.mkdir(os.path.join(private_data_dir, 'artifacts'))
with mock.patch('awx.main.expect.run.run_pexpect') as run_pexpect:
def _synchronize_job_artifacts(args, cwd, env, buff, **kw):
buff.write('checking job status...')
for filename, data in (
['status', 'failed'],
['rc', '1'],
['stdout', 'KABOOM!'],
):
with open(os.path.join(private_data_dir, 'artifacts', filename), 'w') as f:
f.write(data)
return ('successful', 0)
run_pexpect.side_effect = _synchronize_job_artifacts
with mock.patch.object(mgr, '_missing_artifacts') as missing_artifacts:
missing_artifacts.return_value = False
status, rc = mgr.check(interval=0)
assert status == 'failed'
assert rc == 1
assert stdout.getvalue() == 'KABOOM!'
run_pexpect.assert_called_with(
[
'ansible-playbook', 'check_isolated.yml',
'-u', settings.AWX_ISOLATED_USERNAME,
'-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT),
'-i', 'isolated-host,',
'-e', '{"src": "%s"}' % private_data_dir,
'-vvvvv'
],
'/awx_devel/awx/playbooks', mgr.management_env, mock.ANY,
cancelled_callback=None,
idle_timeout=0,
job_timeout=0,
pexpect_timeout=5,
proot_cmd='bwrap'
)
@pytest.mark.skip(reason='fix after runner merge')
def test_check_isolated_job_timeout(private_data_dir, rsa_key):
pem, passphrase = rsa_key
stdout = StringIO()
extra_update_fields = {}
mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '',
job_timeout=1,
extra_update_fields=extra_update_fields)
mgr.private_data_dir = private_data_dir
mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity'])
mgr.started_at = time.time()
mgr.host = 'isolated-host'
with mock.patch('awx.main.expect.run.run_pexpect') as run_pexpect:
def _synchronize_job_artifacts(args, cwd, env, buff, **kw):
buff.write('checking job status...')
return ('failed', 1)
run_pexpect.side_effect = _synchronize_job_artifacts
status, rc = mgr.check(interval=0)
assert status == 'failed'
assert rc == 1
assert stdout.getvalue() == 'checking job status...'
assert extra_update_fields['job_explanation'] == 'Job terminated due to timeout'