Support for executing job and adhoc commands on isolated Tower nodes (#6524)

This commit is contained in:
Ryan Petrello
2017-06-14 11:47:30 -04:00
committed by GitHub
parent aa962a26f1
commit 422950f45d
38 changed files with 1794 additions and 267 deletions

View File

@@ -6,16 +6,14 @@ import codecs
from collections import OrderedDict
import ConfigParser
import cStringIO
import imp
import json
import logging
import os
import signal
import pipes
import re
import shutil
import stat
import tempfile
import thread
import time
import traceback
import urlparse
@@ -29,9 +27,6 @@ try:
except:
psutil = None
# Pexpect
import pexpect
# Celery
from celery import Task, task
from celery.signals import celeryd_init, worker_process_init
@@ -54,9 +49,11 @@ from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
from awx.main.queue import CallbackQueueDispatcher
from awx.main.task_engine import TaskEnhancer
from awx.main.isolated import run, isolated_manager
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
get_system_task_capacity, OutputEventFilter, parse_yaml_or_json)
check_proot_installed, build_proot_temp_dir,
wrap_args_with_proot, get_system_task_capacity, OutputEventFilter,
parse_yaml_or_json)
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification
@@ -407,7 +404,7 @@ class BaseTask(Task):
'''
Create a temporary directory for job-related files.
'''
path = tempfile.mkdtemp(prefix='ansible_tower_')
path = tempfile.mkdtemp(prefix='ansible_tower_%s_' % instance.pk)
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return path
@@ -445,7 +442,7 @@ class BaseTask(Task):
# will be read then closed, instead of leaving the SSH key on disk.
if credential.kind in ('ssh', 'scm') and not ssh_too_old:
path = os.path.join(kwargs.get('private_data_dir', tempfile.gettempdir()), name)
self.open_fifo_write(path, data)
run.open_fifo_write(path, data)
private_data_files['credentials']['ssh'] = path
# Ansible network modules do not yet support ssh-agent.
# Instead, ssh private key file is explicitly passed via an
@@ -459,14 +456,6 @@ class BaseTask(Task):
private_data_files['credentials'][credential] = path
return private_data_files
def open_fifo_write(self, path, data):
'''open_fifo_write opens the fifo named pipe in a new thread.
This blocks until the the calls to ssh-agent/ssh-add have read the
credential information from the pipe.
'''
os.mkfifo(path, 0600)
thread.start_new_thread(lambda p, d: open(p, 'w').write(d), (path, data))
def build_passwords(self, instance, **kwargs):
'''
Build a dictionary of passwords for responding to prompts.
@@ -477,7 +466,7 @@ class BaseTask(Task):
'': '',
}
def add_ansible_venv(self, env):
def add_ansible_venv(self, env, add_tower_lib=True):
if settings.ANSIBLE_USE_VENV:
env['VIRTUAL_ENV'] = settings.ANSIBLE_VENV_PATH
env['PATH'] = os.path.join(settings.ANSIBLE_VENV_PATH, "bin") + ":" + env['PATH']
@@ -488,7 +477,8 @@ class BaseTask(Task):
env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":"
break
# Add awx/lib to PYTHONPATH.
env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')]))
if add_tower_lib:
env['PYTHONPATH'] = env.get('PYTHONPATH', '') + self.get_path_to('..', 'lib') + ':'
return env
def add_tower_venv(self, env):
@@ -537,26 +527,39 @@ class BaseTask(Task):
safe_env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v)
return safe_env
def args2cmdline(self, *args):
return ' '.join([pipes.quote(a) for a in args])
def wrap_args_with_ssh_agent(self, args, ssh_key_path, ssh_auth_sock=None):
if ssh_key_path:
cmd = ' && '.join([self.args2cmdline('ssh-add', ssh_key_path),
self.args2cmdline('rm', '-f', ssh_key_path),
self.args2cmdline(*args)])
args = ['ssh-agent']
if ssh_auth_sock:
args.extend(['-a', ssh_auth_sock])
args.extend(['sh', '-c', cmd])
return args
def should_use_proot(self, instance, **kwargs):
'''
Return whether this task should use proot.
'''
return False
def build_inventory(self, instance, **kwargs):
plugin = self.get_path_to('..', 'plugins', 'inventory', 'awxrest.py')
if kwargs.get('isolated') is True:
# For isolated jobs, we have to interact w/ the REST API from the
# controlling node and ship the static JSON inventory to the
# isolated host (because the isolated host itself can't reach the
# Tower REST API to fetch the inventory).
path = os.path.join(kwargs['private_data_dir'], 'inventory')
if os.path.exists(path):
return path
awxrest = imp.load_source('awxrest', plugin)
with open(path, 'w') as f:
buff = cStringIO.StringIO()
awxrest.InventoryScript(**{
'base_url': settings.INTERNAL_API_URL,
'authtoken': instance.task_auth_token or '',
'inventory_id': str(instance.inventory.pk),
'list': True,
'hostvars': True,
}).run(buff)
json_data = buff.getvalue().strip()
f.write("#! /usr/bin/env python\nprint '''%s'''\n" % json_data)
os.chmod(path, stat.S_IRUSR | stat.S_IXUSR)
return path
else:
return plugin
def build_args(self, instance, **kwargs):
raise NotImplementedError
@@ -572,6 +575,17 @@ class BaseTask(Task):
def get_idle_timeout(self):
return None
def get_instance_timeout(self, instance):
global_timeout_setting_name = instance._global_timeout_setting()
if global_timeout_setting_name:
global_timeout = getattr(settings, global_timeout_setting_name, 0)
local_timeout = getattr(instance, 'timeout', 0)
job_timeout = global_timeout if local_timeout == 0 else local_timeout
job_timeout = 0 if local_timeout < 0 else job_timeout
else:
job_timeout = 0
return job_timeout
def get_password_prompts(self):
'''
Return a dictionary where keys are strings or regular expressions for
@@ -591,105 +605,6 @@ class BaseTask(Task):
assert stdout_handle.name == stdout_filename
return stdout_handle
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None, extra_update_fields=None):
'''
Run the given command using pexpect to capture output and provide
passwords when requested.
'''
logfile = stdout_handle
logfile_pos = logfile.tell()
global_timeout_setting_name = instance._global_timeout_setting()
if global_timeout_setting_name:
global_timeout = getattr(settings, global_timeout_setting_name, 0)
local_timeout = getattr(instance, 'timeout', 0)
job_timeout = global_timeout if local_timeout == 0 else local_timeout
job_timeout = 0 if local_timeout < 0 else job_timeout
else:
job_timeout = 0
child = pexpect.spawn(
args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True,
encoding='utf-8', echo=False,
)
child.logfile_read = logfile
canceled = False
timed_out = False
last_stdout_update = time.time()
idle_timeout = self.get_idle_timeout()
expect_list = []
expect_passwords = {}
pexpect_timeout = getattr(settings, 'PEXPECT_TIMEOUT', 5)
for n, item in enumerate(self.get_password_prompts().items()):
expect_list.append(item[0])
expect_passwords[n] = passwords.get(item[1], '') or ''
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
instance = self.update_model(instance.pk, status='running',
execution_node=settings.CLUSTER_HOST_ID,
output_replacements=output_replacements)
job_start = time.time()
while child.isalive():
result_id = child.expect(expect_list, timeout=pexpect_timeout, searchwindowsize=100)
if result_id in expect_passwords:
child.sendline(expect_passwords[result_id])
if logfile_pos != logfile.tell():
logfile_pos = logfile.tell()
last_stdout_update = time.time()
# Refresh model instance from the database (to check cancel flag).
instance = self.update_model(instance.pk)
if instance.cancel_flag:
canceled = True
elif job_timeout != 0 and (time.time() - job_start) > job_timeout:
timed_out = True
if isinstance(extra_update_fields, dict):
extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if canceled or timed_out:
self._handle_termination(instance, child, is_cancel=canceled)
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True)
canceled = True
if canceled:
return 'canceled', child.exitstatus
elif child.exitstatus == 0 and not timed_out:
return 'successful', child.exitstatus
else:
return 'failed', child.exitstatus
def _handle_termination(self, instance, job, is_cancel=True):
'''Helper function to properly terminate specified job.
Args:
instance: The corresponding model instance of this task.
job: The pexpect subprocess running the job.
is_cancel: Flag showing whether this termination is caused by instance's
cancel_flag.
Return:
None.
'''
try:
if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance):
# NOTE: Refactor this once we get a newer psutil across the board
if not psutil:
os.kill(job.pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=job.pid)
if hasattr(main_proc, "children"):
child_procs = main_proc.children(recursive=True)
else:
child_procs = main_proc.get_children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except (TypeError, psutil.Error):
os.kill(job.pid, signal.SIGKILL)
else:
os.kill(job.pid, signal.SIGTERM)
time.sleep(3)
except OSError:
keyword = 'cancel' if is_cancel else 'timeout'
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
def pre_run_hook(self, instance, **kwargs):
'''
Hook for any steps to run before the job/task starts
@@ -705,7 +620,7 @@ class BaseTask(Task):
Hook for any steps to run after job/task is marked as complete.
'''
def run(self, pk, **kwargs):
def run(self, pk, isolated_host=None, **kwargs):
'''
Run the job/task and capture its output.
'''
@@ -716,6 +631,7 @@ class BaseTask(Task):
output_replacements = []
extra_update_fields = {}
try:
kwargs['isolated'] = isolated_host is not None
self.pre_run_hook(instance, **kwargs)
if instance.cancel_flag:
instance = self.update_model(instance.pk, status='canceled')
@@ -754,7 +670,12 @@ class BaseTask(Task):
credential, env, safe_env, args, safe_args, kwargs['private_data_dir']
)
stdout_handle = self.get_stdout_handle(instance)
if isolated_host is None:
stdout_handle = self.get_stdout_handle(instance)
else:
base_handle = super(self.__class__, self).get_stdout_handle(instance)
stdout_handle = isolated_manager.IsolatedManager.wrap_stdout_handle(
instance, kwargs['private_data_dir'], base_handle)
if self.should_use_proot(instance, **kwargs):
if not check_proot_installed():
raise RuntimeError('bubblewrap is not installed')
@@ -763,14 +684,42 @@ class BaseTask(Task):
safe_args = wrap_args_with_proot(safe_args, cwd, **kwargs)
# If there is an SSH key path defined, wrap args with ssh-agent.
ssh_key_path = self.get_ssh_key_path(instance, **kwargs)
if ssh_key_path:
# If we're executing on an isolated host, don't bother adding the
# key to the agent in this environment
if ssh_key_path and isolated_host is None:
ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock')
args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields)
expect_passwords = {}
for k, v in self.get_password_prompts().items():
expect_passwords[k] = kwargs['passwords'].get(v, '') or ''
_kw = dict(
expect_passwords=expect_passwords,
cancelled_callback=lambda: self.update_model(instance.pk).cancel_flag,
job_timeout=self.get_instance_timeout(instance),
idle_timeout=self.get_idle_timeout(),
extra_update_fields=extra_update_fields,
pexpect_timeout=getattr(settings, 'PEXPECT_TIMEOUT', 5),
proot_cmd=getattr(settings, 'AWX_PROOT_CMD', 'bwrap'),
)
instance = self.update_model(instance.pk, status='running',
execution_node=settings.CLUSTER_HOST_ID,
output_replacements=output_replacements)
if isolated_host:
manager_instance = isolated_manager.IsolatedManager(
args, cwd, env, stdout_handle, ssh_key_path, **_kw
)
status, rc = manager_instance.run(instance, isolated_host,
kwargs['private_data_dir'],
kwargs.get('proot_temp_dir'))
else:
status, rc = run.run_pexpect(
args, cwd, env, stdout_handle, **_kw
)
except Exception:
if status != 'canceled':
tb = traceback.format_exc()
@@ -901,7 +850,7 @@ class RunJob(BaseTask):
plugin_dirs.extend(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
plugin_path = ':'.join(plugin_dirs)
env = super(RunJob, self).build_env(job, **kwargs)
env = self.add_ansible_venv(env)
env = self.add_ansible_venv(env, add_tower_lib=kwargs.get('isolated', False))
# Set environment variables needed for inventory and job event
# callbacks to work.
env['JOB_ID'] = str(job.pk)
@@ -909,14 +858,15 @@ class RunJob(BaseTask):
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display'
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
env['TOWER_HOST'] = settings.TOWER_URL_BASE
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL
if not kwargs.get('isolated'):
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display'
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
env['TOWER_HOST'] = settings.TOWER_URL_BASE
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
env['JOB_CALLBACK_DEBUG'] = '2'
@@ -980,7 +930,7 @@ class RunJob(BaseTask):
env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password')
# Set environment variables related to gathering facts from the cache
if job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True:
if (job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True) and not kwargs.get('isolated'):
env['FACT_QUEUE'] = settings.FACT_QUEUE
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
@@ -1007,9 +957,7 @@ class RunJob(BaseTask):
# it doesn't make sense to rely on ansible-playbook's default of using
# the current user.
ssh_username = ssh_username or 'root'
inventory_script = self.get_path_to('..', 'plugins', 'inventory',
'awxrest.py')
args = ['ansible-playbook', '-i', inventory_script]
args = ['ansible-playbook', '-i', self.build_inventory(job, **kwargs)]
if job.job_type == 'check':
args.append('--check')
args.extend(['-u', ssh_username])
@@ -1144,12 +1092,15 @@ class RunJob(BaseTask):
def pre_run_hook(self, job, **kwargs):
if job.project and job.project.scm_type:
job_request_id = '' if self.request.id is None else self.request.id
pu_ig = job.instance_group
if kwargs['isolated']:
pu_ig = pu_ig.controller
local_project_sync = job.project.create_project_update(
launch_type="sync",
_eager_fields=dict(
job_type='run',
status='running',
instance_group = job.instance_group,
instance_group = pu_ig,
celery_task_id=job_request_id))
# save the associated job before calling run() so that a
# cancel() call on the job can cancel the project update
@@ -1266,12 +1217,15 @@ class RunProjectUpdate(BaseTask):
return scm_url, extra_vars
def build_inventory(self, instance, **kwargs):
return 'localhost,'
def build_args(self, project_update, **kwargs):
'''
Build command line argument list for running ansible-playbook,
optionally using ssh-agent for public/private key authentication.
'''
args = ['ansible-playbook', '-i', 'localhost,']
args = ['ansible-playbook', '-i', self.build_inventory(project_update, **kwargs)]
if getattr(settings, 'PROJECT_UPDATE_VVV', False):
args.append('-vvv')
else:
@@ -1885,7 +1839,6 @@ class RunInventoryUpdate(BaseTask):
pass
class RunAdHocCommand(BaseTask):
'''
Celery task to run an ad hoc command using ansible.
@@ -1984,9 +1937,7 @@ class RunAdHocCommand(BaseTask):
# it doesn't make sense to rely on ansible's default of using the
# current user.
ssh_username = ssh_username or 'root'
inventory_script = self.get_path_to('..', 'plugins', 'inventory',
'awxrest.py')
args = ['ansible', '-i', inventory_script]
args = ['ansible', '-i', self.build_inventory(ad_hoc_command, **kwargs)]
if ad_hoc_command.job_type == 'check':
args.append('--check')
args.extend(['-u', ssh_username])