isolated ramparts: replace systemd unit with a tower-expect binary

instead of launching isolated tasks via `systemctl`, treat
`awx.main.isolated.run` as an executable that knows how to daemonize

additionally, add `setup.py isolated_build` for isolated Tower source
distribution
This commit is contained in:
Ryan Petrello 2017-06-14 14:48:47 -04:00
parent 422950f45d
commit 44e0c8621a
13 changed files with 107 additions and 111 deletions

View File

@ -22,7 +22,6 @@ import base64
import contextlib
import datetime
import json
import logging
import multiprocessing
import os
import stat
@ -30,69 +29,9 @@ import threading
import uuid
import memcache
# Kombu
from kombu import Connection, Exchange, Producer
__all__ = ['event_context']
class CallbackQueueEventDispatcher(object):
def __init__(self):
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def dispatch(self, obj):
if not self.callback_connection or not self.connection_queue:
return
active_pid = os.getpid()
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
producer.publish(obj,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
retry_count += 1
if retry_count >= 3:
break
class IsolatedFileWrite:
'''
Stand-in class that will write partial event data to a file as a
@ -123,7 +62,6 @@ class EventContext(object):
def __init__(self):
self.display_lock = multiprocessing.RLock()
self.dispatcher = CallbackQueueEventDispatcher()
cache_actual = os.getenv('CACHE', '127.0.0.1:11211')
if os.getenv('AWX_ISOLATED_DATA_DIR', False):
self.cache = IsolatedFileWrite()

View File

@ -108,7 +108,7 @@ class IsolatedManager(object):
# strip some environment variables that aren't applicable to isolated
# execution
for var in (
'RABBITMQ_HOST', 'RABBITMQ_PASS', 'RABBITMQ_USER', 'CACHE',
'HOME', 'RABBITMQ_HOST', 'RABBITMQ_PASS', 'RABBITMQ_USER', 'CACHE',
'DJANGO_PROJECT_DIR', 'DJANGO_SETTINGS_MODULE', 'RABBITMQ_VHOST'):
secrets['env'].pop(var, None)
self.build_isolated_job_data()
@ -126,7 +126,7 @@ class IsolatedManager(object):
# - sets up a temporary directory for proot/bwrap (if necessary)
# - copies encrypted job data from the controlling host to the isolated host (with rsync)
# - writes the encryption secret to a named pipe on the isolated host
# - launches the isolated playbook runner via `systemctl start playbook@<job-id>.service`
# - launches the isolated playbook runner via `tower-expect start <job-id>`
args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i',
'%s,' % self.host, 'run_isolated.yml', '-e',
json.dumps(extra_vars)]
@ -178,18 +178,18 @@ class IsolatedManager(object):
if 'AD_HOC_COMMAND_ID' not in self.env:
os.symlink(self.cwd, self.path_to('project'))
# create a directory for build artifacts to live in
os.mkdir(self.path_to('artifacts'), stat.S_IRUSR | stat.S_IWUSR)
# create directories for build artifacts to live in
os.makedirs(self.path_to('artifacts', 'job_events'), mode=stat.S_IRUSR | stat.S_IWUSR)
def _missing_artifacts(self, path_list, buff):
missing_artifacts = filter(lambda path: not os.path.exists(path), path_list)
for path in missing_artifacts:
self.stdout_handle.write('ERROR running isolated job, missing `{}`.\n'.format(path))
self.stdout_handle.write('ansible did not exit cleanly, missing `{}`.\n'.format(path))
if missing_artifacts:
systemctl_path = self.path_to('artifacts', 'systemctl_logs')
if os.path.exists(systemctl_path):
daemon_path = self.path_to('artifacts', 'daemon.log')
if os.path.exists(daemon_path):
# If available, show log files from the run.py call
with codecs.open(systemctl_path, 'r', encoding='utf-8') as f:
with codecs.open(daemon_path, 'r', encoding='utf-8') as f:
self.stdout_handle.write(f.read())
else:
# Provide the management playbook standard out if not available

75
awx/main/isolated/run.py Normal file → Executable file
View File

@ -1,3 +1,5 @@
#! /usr/bin/env python
import argparse
import base64
import codecs
@ -119,7 +121,7 @@ def run_pexpect(args, cwd, env, logfile,
if isinstance(extra_update_fields, dict):
extra_update_fields['job_explanation'] = "Job terminated due to timeout"
if canceled or timed_out:
handle_termination(child, proot_cmd, is_cancel=canceled)
handle_termination(child.pid, child.args, proot_cmd, is_cancel=canceled)
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True)
canceled = True
@ -194,51 +196,44 @@ def run_isolated_job(private_data_dir, secrets, logfile=sys.stdout):
pexpect_timeout=pexpect_timeout)
def handle_termination(job, proot_cmd, is_cancel=True):
def handle_termination(pid, args, proot_cmd, is_cancel=True):
'''
Terminate a subprocess spawned by `pexpect`.
:param job: the pexpect subprocess running the job.
:param pid: the process id of the running the job.
:param args: the args for the job, i.e., ['ansible-playbook', 'abc.yml']
:param proot_cmd the command used to isolate processes i.e., `bwrap`
:param is_cancel: flag showing whether this termination is caused by
instance's cancel_flag.
'''
try:
if proot_cmd in ' '.join(job.args):
if proot_cmd in ' '.join(args):
if not psutil:
os.kill(job.pid, signal.SIGKILL)
os.kill(pid, signal.SIGKILL)
else:
try:
main_proc = psutil.Process(pid=job.pid)
main_proc = psutil.Process(pid=pid)
child_procs = main_proc.children(recursive=True)
for child_proc in child_procs:
os.kill(child_proc.pid, signal.SIGKILL)
os.kill(main_proc.pid, signal.SIGKILL)
except (TypeError, psutil.Error):
os.kill(job.pid, signal.SIGKILL)
os.kill(pid, signal.SIGKILL)
else:
os.kill(job.pid, signal.SIGTERM)
os.kill(pid, signal.SIGTERM)
time.sleep(3)
except OSError:
keyword = 'cancel' if is_cancel else 'timeout'
logger.warn("Attempted to %s already finished job, ignoring" % keyword)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='run an isolated ansible playbook')
parser.add_argument('job_id')
args = parser.parse_args()
private_data_dir = os.readlink('/tmp/ansible_tower/jobs/%s' % args.job_id)
def __run__(private_data_dir):
buff = cStringIO.StringIO()
with open(os.path.join(private_data_dir, 'env'), 'r') as f:
for line in f:
buff.write(line)
artifacts_dir = os.path.join(private_data_dir, 'artifacts')
job_events_dir = os.path.join(artifacts_dir, 'job_events')
if not os.path.exists(job_events_dir):
os.makedirs(job_events_dir, mode=stat.S_IRUSR | stat.S_IWUSR)
# Standard out directed to pickup location without event filtering applied
stdout_filename = os.path.join(artifacts_dir, 'stdout')
@ -258,3 +253,49 @@ if __name__ == '__main__':
os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
with open(artifact_path, 'w') as f:
f.write(str(data))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='manage a daemonized, isolated ansible playbook')
parser.add_argument('command', choices=['start', 'stop', 'is-alive'])
parser.add_argument('job_id')
args = parser.parse_args()
private_data_dir = os.readlink('/tmp/ansible_tower/jobs/%s' % args.job_id)
pidfile = os.path.join(private_data_dir, 'pid')
if args.command == 'start':
# create a file to log stderr in case the daemonized process throws
# an exception before it gets to `pexpect.spawn`
stderr_path = os.path.join(private_data_dir, 'artifacts', 'daemon.log')
if not os.path.exists(stderr_path):
os.mknod(stderr_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR)
stderr = open(stderr_path, 'w+')
import daemon
from daemon.pidfile import TimeoutPIDLockFile
context = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pidfile),
stderr=stderr
)
with context:
__run__(private_data_dir)
sys.exit(0)
try:
with open(pidfile, 'r') as f:
pid = int(f.readline())
except IOError:
sys.exit(1)
if args.command == 'stop':
try:
with open(os.path.join(private_data_dir, 'args'), 'r') as args:
handle_termination(pid, json.load(args), 'bwrap')
except IOError:
handle_termination(pid, [], 'bwrap')
elif args.command == 'is-alive':
try:
os.kill(pid, signal.SIG_DFL)
sys.exit(0)
except OSError:
sys.exit(1)

View File

@ -371,7 +371,7 @@ class TestIsolatedExecution(TestJobExecution):
os.makedirs(artifacts)
if 'run_isolated.yml' in args[0]:
for filename, data in (
['systemctl_logs', 'ERROR IN EXPECT.PY'],
['daemon.log', 'ERROR IN RUN.PY'],
):
with open(os.path.join(artifacts, filename), 'w') as f:
f.write(data)

View File

@ -16,13 +16,6 @@
mode: pull
recursive: yes
- name: check to see if the job is inactive
command: "systemctl is-active playbook@{{job_id}}.service"
register: is_active
failed_when: "is_active.rc == 0"
- shell: journalctl -u playbook@{{job_id}}.service --no-pager
register: systemctl_logs
ignore_errors: True
- local_action: copy content={{ systemctl_logs.stdout }} dest={{src}}/artifacts/systemctl_logs mode=0600
- shell: "tower-expect is-alive {{job_id}}"
register: is_alive
failed_when: "is_alive.rc == 0"

View File

@ -10,7 +10,7 @@
tasks:
- name: cancel the job
command: "systemctl stop playbook@{{job_id}}.service"
command: "tower-expect stop {{job_id}}"
ignore_errors: yes
- name: remove build artifacts

View File

@ -36,7 +36,7 @@
command: "mkfifo /tmp/ansible_tower/jobs/{{job_id}}/env"
- name: spawn the playbook
command: "systemctl start playbook@{{job_id}}.service"
command: "tower-expect start {{job_id}}"
- name: write the secret environment data
mkfifo:

View File

@ -1,2 +1,3 @@
psutil==5.2.2
pexpect==4.2.1
pexpect==4.2.1
python-daemon==2.1.2

View File

@ -8,6 +8,7 @@ import datetime
import glob
import sys
from setuptools import setup
from distutils.command.sdist import sdist
from awx import __version__
@ -38,6 +39,30 @@ else:
# The .spec will create symlinks to support multiple versions of sosreport
sosconfig = "/usr/share/sosreport/sos/plugins"
#####################################################################
# Isolated packaging
#####################################################################
class sdist_isolated(sdist):
includes = [
'include awx/__init__.py',
'include awx/main/isolated/run.py',
'recursive-include awx/lib *.py',
]
def get_file_list(self):
self.filelist.process_template_line('include setup.py')
for line in self.includes:
self.filelist.process_template_line(line)
self.write_manifest()
def make_release_tree(self, base_dir, files):
sdist.make_release_tree(self, base_dir, files)
with open(os.path.join(base_dir, 'MANIFEST.in'), 'w') as f:
f.write('\n'.join(self.includes))
#####################################################################
# Helper Functions
@ -112,7 +137,7 @@ setup(
entry_points = {
'console_scripts': [
'awx-manage = awx:manage',
'tower-manage = awx:manage',
'tower-manage = awx:manage'
],
},
data_files = proc_data_files([
@ -129,6 +154,7 @@ setup(
"tools/scripts/tower-python",
"tools/scripts/ansible-tower-setup"]),
("%s" % sosconfig, ["tools/sosreport/tower.py"])]),
cmdclass = {'sdist_isolated': sdist_isolated},
options = {
'egg_info': {
'tag_build': build_timestamp,
@ -136,6 +162,7 @@ setup(
'aliases': {
'dev_build': 'clean --all egg_info sdist',
'release_build': 'clean --all egg_info -b "" sdist',
'isolated_build': 'clean --all egg_info -b "" sdist_isolated',
},
'build_scripts': {
'executable': '/usr/bin/tower-python',

View File

@ -17,7 +17,7 @@ ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
WORKDIR /
EXPOSE 22
ADD tools/docker-isolated/playbook@.service /lib/systemd/system/playbook@.service
ADD tools/docker-isolated/tower-expect /usr/local/bin/tower-expect
RUN rm -f /etc/ssh/ssh_host_ecdsa_key /etc/ssh/ssh_host_rsa_key
RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_ecdsa_key

View File

@ -66,6 +66,6 @@ This should give a shell to the `tools_isolated_1` container, as the
The following command would run the playbook for job 57.
```bash
systemctl start playbook@57.service
tower-expect start 57
```

View File

@ -1,7 +0,0 @@
[Unit]
Description=Run of Ansible Tower job %I
[Service]
ExecStart=/venv/tower_isolated/bin/python /tower_isolated/run.py %I
Restart=no
Environment=TOWER_LIB_DIRECTORY=/tower_lib

View File

@ -0,0 +1,3 @@
#!/bin/bash
. /venv/tower_isolated/bin/activate
exec env TOWER_LIB_DIRECTORY=/tower_lib /tower_isolated/run.py "$@"