From 422950f45dde64429dc319112284edbddacff059 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 14 Jun 2017 11:47:30 -0400 Subject: [PATCH] Support for executing job and adhoc commands on isolated Tower nodes (#6524) --- MANIFEST.in | 1 + Makefile | 38 +- awx/api/serializers.py | 4 +- awx/lib/isolated_callbacks/minimal.py | 30 ++ awx/lib/isolated_callbacks/tower_display.py | 30 ++ awx/lib/tower_display_callback/events.py | 29 +- awx/main/isolated/__init__.py | 0 awx/main/isolated/isolated_manager.py | 352 ++++++++++++++++++ awx/main/isolated/run.py | 260 +++++++++++++ .../management/commands/register_queue.py | 11 + .../migrations/0043_v320_instancegroups.py | 1 + awx/main/models/ad_hoc_commands.py | 4 + awx/main/models/ha.py | 8 + awx/main/models/jobs.py | 4 + awx/main/models/unified_jobs.py | 17 +- awx/main/scheduler/__init__.py | 6 +- awx/main/tasks.py | 267 ++++++------- awx/main/tests/unit/isolated/test_expect.py | 303 +++++++++++++++ awx/main/tests/unit/test_tasks.py | 274 +++++++++----- .../tests/unit/utils/test_event_filter.py | 112 ++++++ awx/main/utils/common.py | 26 +- awx/playbooks/check_isolated.yml | 28 ++ awx/playbooks/clean_isolated.yml | 18 + awx/playbooks/library/mkfifo.py | 24 ++ awx/playbooks/run_isolated.yml | 45 +++ awx/plugins/inventory/awxrest.py | 18 +- awx/settings/defaults.py | 6 + awx/settings/development.py | 2 + awx/settings/production.py | 2 + requirements/requirements_isolated.txt | 2 + tools/docker-compose.yml | 1 + Procfile => tools/docker-compose/Procfile | 0 tools/docker-compose/start_development.sh | 4 +- tools/docker-isolated-override.yml | 17 + tools/docker-isolated/Dockerfile | 31 ++ tools/docker-isolated/Procfile | 8 + tools/docker-isolated/README.md | 71 ++++ tools/docker-isolated/playbook@.service | 7 + 38 files changed, 1794 insertions(+), 267 deletions(-) create mode 100644 awx/lib/isolated_callbacks/minimal.py create mode 100644 awx/lib/isolated_callbacks/tower_display.py create mode 100644 awx/main/isolated/__init__.py create mode 100644 awx/main/isolated/isolated_manager.py create mode 100644 awx/main/isolated/run.py create mode 100644 awx/main/tests/unit/isolated/test_expect.py create mode 100644 awx/main/tests/unit/utils/test_event_filter.py create mode 100644 awx/playbooks/check_isolated.yml create mode 100644 awx/playbooks/clean_isolated.yml create mode 100755 awx/playbooks/library/mkfifo.py create mode 100644 awx/playbooks/run_isolated.yml create mode 100644 requirements/requirements_isolated.txt rename Procfile => tools/docker-compose/Procfile (100%) create mode 100644 tools/docker-isolated-override.yml create mode 100644 tools/docker-isolated/Dockerfile create mode 100644 tools/docker-isolated/Procfile create mode 100644 tools/docker-isolated/README.md create mode 100644 tools/docker-isolated/playbook@.service diff --git a/MANIFEST.in b/MANIFEST.in index 27ad03a35d..81483cee39 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -20,6 +20,7 @@ include tools/scripts/request_tower_configuration.ps1 include tools/scripts/ansible-tower-service include tools/scripts/failure-event-handler include tools/scripts/tower-python +include awx/playbooks/library/mkfifo.py include tools/sosreport/* include COPYING include Makefile diff --git a/Makefile b/Makefile index 9b48cb4964..9d8d6f665b 100644 --- a/Makefile +++ b/Makefile @@ -301,6 +301,15 @@ requirements_ansible_dev: $(VENV_BASE)/ansible/bin/pip install pytest mock; \ fi +requirements_isolated: + if [ ! -d "$(VENV_BASE)/tower_isolated" ]; then \ + virtualenv --system-site-packages $(VENV_BASE)/tower_isolated && \ + $(VENV_BASE)/tower_isolated/bin/pip install $(PIP_OPTIONS) --ignore-installed six packaging appdirs && \ + $(VENV_BASE)/tower_isolated/bin/pip install $(PIP_OPTIONS) --ignore-installed setuptools==35.0.2 && \ + $(VENV_BASE)/tower_isolated/bin/pip install $(PIP_OPTIONS) --ignore-installed pip==9.0.1; \ + fi; + $(VENV_BASE)/tower_isolated/bin/pip install -r requirements/requirements_isolated.txt + # Install third-party requirements needed for Tower's environment. requirements_tower: virtualenv_tower if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \ @@ -341,6 +350,10 @@ init: fi; \ tower-manage register_instance --hostname=$(COMPOSE_HOST); \ tower-manage register_queue --queuename=tower --hostnames=$(COMPOSE_HOST);\ + if [ "$(DOCKER_TOOLS_DIR)" == "tools/docker-isolated" ]; then \ + tower-manage register_instance --hostname=isolated; \ + tower-manage register_queue --queuename='thepentagon' --hostnames=isolated --controller=tower; \ + fi; # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate @@ -399,7 +412,7 @@ honcho: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - honcho start + honcho start -f tools/docker-compose/Procfile flower: @if [ "$(VENV_BASE)" ]; then \ @@ -443,7 +456,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B -Ofair --autoreload --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) + $(PYTHON) manage.py celeryd -l DEBUG -B -Ofair --autoreload --autoscale=100,4 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST),$(EXTRA_GROUP_QUEUES) -n celery@$(COMPOSE_HOST) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver @@ -932,6 +945,18 @@ install: docker-auth: docker login -e 1234@5678.com -u oauth2accesstoken -p "$(GCLOUD_AUTH)" https://gcr.io +# Docker isolated rampart +docker-isolated: + TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create + docker start tools_tower_1 + docker start tools_isolated_1 + if [ "`docker exec -i -t tools_isolated_1 cat /root/.ssh/authorized_keys`" != "" ]; then \ + echo "SSH keys already copied to isolated instance"; \ + else \ + docker exec "tools_isolated_1" bash -c "mkdir -p /root/.ssh && echo $$(docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub) >> /root/.ssh/authorized_keys"; \ + fi + TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up + # Docker Compose Development environment docker-compose: docker-auth DOCKER_HOST_IP=$(DOCKER_HOST_IP) TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml up --no-recreate tower @@ -942,11 +967,18 @@ docker-compose-cluster: docker-auth docker-compose-test: docker-auth cd tools && DOCKER_HOST_IP=$(DOCKER_HOST_IP) TAG=$(COMPOSE_TAG) docker-compose run --rm --service-ports tower /bin/bash -docker-compose-build: +docker-compose-build: tower-devel-build tower-isolated-build + +tower-devel-build: docker build -t ansible/tower_devel -f tools/docker-compose/Dockerfile . docker tag ansible/tower_devel gcr.io/ansible-tower-engineering/tower_devel:$(COMPOSE_TAG) #docker push gcr.io/ansible-tower-engineering/tower_devel:$(COMPOSE_TAG) +tower-isolated-build: + docker build -t ansible/tower_isolated -f tools/docker-isolated/Dockerfile . + docker tag ansible/tower_isolated gcr.io/ansible-tower-engineering/tower_isolated:$(COMPOSE_TAG) + #docker push gcr.io/ansible-tower-engineering/tower_isolated:$(COMPOSE_TAG) + MACHINE?=default docker-clean: eval $$(docker-machine env $(MACHINE)) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 858cb508cc..14dbd82c35 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3480,12 +3480,14 @@ class InstanceGroupSerializer(BaseSerializer): class Meta: model = InstanceGroup fields = ("id", "type", "url", "related", "name", "created", "modified", "capacity", "consumed_capacity", - "percent_capacity_remaining", "jobs_running", "instances") + "percent_capacity_remaining", "jobs_running", "instances", "controller") def get_related(self, obj): res = super(InstanceGroupSerializer, self).get_related(obj) res['jobs'] = self.reverse('api:instance_group_unified_jobs_list', kwargs={'pk': obj.pk}) res['instances'] = self.reverse('api:instance_group_instance_list', kwargs={'pk': obj.pk}) + if obj.controller_id: + res['controller'] = self.reverse('api:instance_group_detail', kwargs={'pk': obj.controller_id}) return res def get_consumed_capacity(self, obj): diff --git a/awx/lib/isolated_callbacks/minimal.py b/awx/lib/isolated_callbacks/minimal.py new file mode 100644 index 0000000000..6c136b7824 --- /dev/null +++ b/awx/lib/isolated_callbacks/minimal.py @@ -0,0 +1,30 @@ +# Copyright (c) 2017 Ansible by Red Hat +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import os +import sys + +# Add awx/lib to sys.path. +awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +if awx_lib_path not in sys.path: + sys.path.insert(0, awx_lib_path) + +# Tower Display Callback +from tower_display_callback import TowerMinimalCallbackModule as CallbackModule # noqa diff --git a/awx/lib/isolated_callbacks/tower_display.py b/awx/lib/isolated_callbacks/tower_display.py new file mode 100644 index 0000000000..dbe463303d --- /dev/null +++ b/awx/lib/isolated_callbacks/tower_display.py @@ -0,0 +1,30 @@ +# Copyright (c) 2017 Ansible by Red Hat +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import os +import sys + +# Add awx/lib to sys.path. +awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +if awx_lib_path not in sys.path: + sys.path.insert(0, awx_lib_path) + +# Tower Display Callback +from tower_display_callback import TowerDefaultCallbackModule as CallbackModule # noqa diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index a419b33e85..2ef7220f9a 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -25,6 +25,7 @@ import json import logging import multiprocessing import os +import stat import threading import uuid import memcache @@ -92,6 +93,28 @@ class CallbackQueueEventDispatcher(object): break +class IsolatedFileWrite: + ''' + Stand-in class that will write partial event data to a file as a + replacement for memcache when a job is running on an isolated host. + ''' + + def __init__(self): + self.private_data_dir = os.getenv('AWX_ISOLATED_DATA_DIR') + + def set(self, key, value): + # Strip off the leading memcache key identifying characters :1:ev- + event_uuid = key[len(':1:ev-'):] + # Write data in a staging area and then atomic move to pickup directory + filename = '{}-partial.json'.format(event_uuid) + dropoff_location = os.path.join(self.private_data_dir, 'artifacts', 'job_events', filename) + write_location = '.'.join([dropoff_location, 'tmp']) + partial_data = json.dumps(value) + with os.fdopen(os.open(write_location, os.O_WRONLY | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR), 'w') as f: + f.write(partial_data) + os.rename(write_location, dropoff_location) + + class EventContext(object): ''' Store global and local (per thread/process) data associated with callback @@ -102,7 +125,10 @@ class EventContext(object): self.display_lock = multiprocessing.RLock() self.dispatcher = CallbackQueueEventDispatcher() cache_actual = os.getenv('CACHE', '127.0.0.1:11211') - self.cache = memcache.Client([cache_actual], debug=0) + if os.getenv('AWX_ISOLATED_DATA_DIR', False): + self.cache = IsolatedFileWrite() + else: + self.cache = memcache.Client([cache_actual], debug=0) def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -193,6 +219,7 @@ class EventContext(object): def dump(self, fileobj, data, max_width=78, flush=False): b64data = base64.b64encode(json.dumps(data)) with self.display_lock: + # pattern corresponding to OutputEventFilter expectation fileobj.write(u'\x1b[K') for offset in xrange(0, len(b64data), max_width): chunk = b64data[offset:offset + max_width] diff --git a/awx/main/isolated/__init__.py b/awx/main/isolated/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/main/isolated/isolated_manager.py b/awx/main/isolated/isolated_manager.py new file mode 100644 index 0000000000..e033ed890c --- /dev/null +++ b/awx/main/isolated/isolated_manager.py @@ -0,0 +1,352 @@ +import base64 +import cStringIO +import codecs +import StringIO +import json +import os +import re +import stat +import time +import logging + +from django.conf import settings + +import awx +from awx.main.isolated import run +from awx.main.utils import OutputEventFilter +from awx.main.queue import CallbackQueueDispatcher + +logger = logging.getLogger('awx.isolated.manager') + + +class IsolatedManager(object): + + def __init__(self, args, cwd, env, stdout_handle, ssh_key_path, + expect_passwords={}, cancelled_callback=None, job_timeout=0, + idle_timeout=None, extra_update_fields=None, + pexpect_timeout=5, proot_cmd='bwrap'): + """ + :param args: a list of `subprocess.call`-style arguments + representing a subprocess e.g., + ['ansible-playbook', '...'] + :param cwd: the directory where the subprocess should run, + generally the directory where playbooks exist + :param env: a dict containing environment variables for the + subprocess, ala `os.environ` + :param stdout_handle: a file-like object for capturing stdout + :param ssh_key_path: a filepath where SSH key data can be read + :param expect_passwords: a dict of regular expression password prompts + to input values, i.e., {r'Password:\s*?$': + 'some_password'} + :param cancelled_callback: a callable - which returns `True` or `False` + - signifying if the job has been prematurely + cancelled + :param job_timeout a timeout (in seconds); if the total job runtime + exceeds this, the process will be killed + :param idle_timeout a timeout (in seconds); if new output is not + sent to stdout in this interval, the process + will be terminated + :param extra_update_fields: a dict used to specify DB fields which should + be updated on the underlying model + object after execution completes + :param pexpect_timeout a timeout (in seconds) to wait on + `pexpect.spawn().expect()` calls + :param proot_cmd the command used to isolate processes, `bwrap` + """ + self.args = args + self.cwd = cwd + self.env = env.copy() + # Do not use callbacks for controller's management jobs + self.env['ANSIBLE_CALLBACK_PLUGINS'] = '' + self.env['CALLBACK_QUEUE'] = '' + self.env['CALLBACK_CONNECTION'] = '' + self.stdout_handle = stdout_handle + self.ssh_key_path = ssh_key_path + self.expect_passwords = {k.pattern: v for k, v in expect_passwords.items()} + self.cancelled_callback = cancelled_callback + self.job_timeout = job_timeout + self.idle_timeout = idle_timeout + self.extra_update_fields = extra_update_fields + self.pexpect_timeout = pexpect_timeout + self.proot_cmd = proot_cmd + self.started_at = None + + @property + def awx_playbook_path(self): + return os.path.join( + os.path.dirname(awx.__file__), + 'playbooks' + ) + + def path_to(self, *args): + return os.path.join(self.private_data_dir, *args) + + def dispatch(self): + ''' + Compile the playbook, its environment, and metadata into a series + of files, and ship to a remote host for isolated execution. + ''' + self.started_at = time.time() + secrets = { + 'env': self.env.copy(), + 'passwords': self.expect_passwords, + 'ssh_key_data': None, + 'idle_timeout': self.idle_timeout, + 'job_timeout': self.job_timeout, + 'pexpect_timeout': self.pexpect_timeout + } + + # if an ssh private key fifo exists, read its contents and delete it + if self.ssh_key_path: + buff = cStringIO.StringIO() + with open(self.ssh_key_path, 'r') as fifo: + for line in fifo: + buff.write(line) + secrets['ssh_key_data'] = buff.getvalue() + os.remove(self.ssh_key_path) + + # strip some environment variables that aren't applicable to isolated + # execution + for var in ( + 'RABBITMQ_HOST', 'RABBITMQ_PASS', 'RABBITMQ_USER', 'CACHE', + 'DJANGO_PROJECT_DIR', 'DJANGO_SETTINGS_MODULE', 'RABBITMQ_VHOST'): + secrets['env'].pop(var, None) + self.build_isolated_job_data() + + extra_vars = { + 'src': self.private_data_dir, + 'dest': os.path.split(self.private_data_dir)[0], + 'job_id': str(self.instance.pk) + } + if self.proot_temp_dir: + extra_vars['proot_temp_dir'] = self.proot_temp_dir + + # Run ansible-playbook to launch a job on the isolated host. This: + # + # - sets up a temporary directory for proot/bwrap (if necessary) + # - copies encrypted job data from the controlling host to the isolated host (with rsync) + # - writes the encryption secret to a named pipe on the isolated host + # - launches the isolated playbook runner via `systemctl start playbook@.service` + args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i', + '%s,' % self.host, 'run_isolated.yml', '-e', + json.dumps(extra_vars)] + if self.instance.verbosity: + args.append('-%s' % ('v' * min(5, self.instance.verbosity))) + buff = StringIO.StringIO() + logger.debug('Starting job on isolated host with `run_isolated.yml` playbook.') + status, rc = run.run_pexpect( + args, self.awx_playbook_path, self.env, buff, + expect_passwords={ + re.compile(r'Secret:\s*?$', re.M): base64.b64encode(json.dumps(secrets)) + }, + idle_timeout=self.idle_timeout, + job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT, + pexpect_timeout=5 + ) + if status != 'successful': + self.stdout_handle.write(buff.getvalue()) + return status, rc + + def build_isolated_job_data(self): + ''' + Write the playbook and metadata into a collection of files on the local + file system. + + This function is intended to be used to compile job data so that it + can be shipped to a remote, isolated host (via ssh). + ''' + + rsync_exclude = [ + # don't rsync source control metadata (it can be huge!) + '- /project/.git', + '- /project/.svn', + '- /project/.hg', + # don't rsync job events that are in the process of being written + '- /artifacts/job_events/*-partial.json.tmp' + ] + + for filename, data in ( + ['.rsync-filter', '\n'.join(rsync_exclude)], + ['args', json.dumps(self.args)] + ): + path = self.path_to(filename) + with open(path, 'w') as f: + f.write(data) + os.chmod(path, stat.S_IRUSR) + + # symlink the scm checkout (if there is one) so that it's rsync'ed over, too + if 'AD_HOC_COMMAND_ID' not in self.env: + os.symlink(self.cwd, self.path_to('project')) + + # create a directory for build artifacts to live in + os.mkdir(self.path_to('artifacts'), stat.S_IRUSR | stat.S_IWUSR) + + def _missing_artifacts(self, path_list, buff): + missing_artifacts = filter(lambda path: not os.path.exists(path), path_list) + for path in missing_artifacts: + self.stdout_handle.write('ERROR running isolated job, missing `{}`.\n'.format(path)) + if missing_artifacts: + systemctl_path = self.path_to('artifacts', 'systemctl_logs') + if os.path.exists(systemctl_path): + # If available, show log files from the run.py call + with codecs.open(systemctl_path, 'r', encoding='utf-8') as f: + self.stdout_handle.write(f.read()) + else: + # Provide the management playbook standard out if not available + self.stdout_handle.write(buff.getvalue()) + return True + return False + + def check(self): + """ + Repeatedly poll the isolated node to determine if the job has run. + + On success, copy job artifacts to the controlling node. + On failure, continue to poll the isolated node (until the job timeout + is exceeded). + + For a completed job run, this function returns (status, rc), + representing the status and return code of the isolated + `ansible-playbook` run. + """ + + extra_vars = { + 'src': self.private_data_dir, + 'job_id': str(self.instance.pk) + } + args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i', + '%s,' % self.host, 'check_isolated.yml', '-e', + json.dumps(extra_vars)] + if self.instance.verbosity: + args.append('-%s' % ('v' * min(5, self.instance.verbosity))) + + status = 'failed' + rc = None + buff = None + seek = 0 + job_timeout = remaining = self.job_timeout + while status == 'failed': + if job_timeout != 0: + remaining = max(0, job_timeout - (time.time() - self.started_at)) + if remaining == 0: + # if it takes longer than $REMAINING_JOB_TIMEOUT to retrieve + # job artifacts from the host, consider the job failed + if isinstance(self.extra_update_fields, dict): + self.extra_update_fields['job_explanation'] = "Job terminated due to timeout" + status = 'failed' + break + + time.sleep(settings.AWX_ISOLATED_CHECK_INTERVAL) + + buff = cStringIO.StringIO() + logger.debug('Checking job on isolated host with `check_isolated.yml` playbook.') + status, rc = run.run_pexpect( + args, self.awx_playbook_path, self.env, buff, + cancelled_callback=self.cancelled_callback, + idle_timeout=remaining, + job_timeout=remaining, + pexpect_timeout=5, + proot_cmd=self.proot_cmd + ) + + path = self.path_to('artifacts', 'stdout') + if os.path.exists(path): + with codecs.open(path, 'r', encoding='utf-8') as f: + f.seek(seek) + for line in f: + self.stdout_handle.write(line) + seek += len(line) + + if status == 'successful': + status_path = self.path_to('artifacts', 'status') + rc_path = self.path_to('artifacts', 'rc') + if self._missing_artifacts([status_path, rc_path], buff): + status = 'failed' + rc = 1 + else: + with open(status_path, 'r') as f: + status = f.readline() + with open(rc_path, 'r') as f: + rc = int(f.readline()) + elif status == 'failed': + # if we were unable to retrieve job reults from the isolated host, + # print stdout of the `check_isolated.yml` playbook for clues + self.stdout_handle.write(buff.getvalue()) + + return status, rc + + def cleanup(self): + # If the job failed for any reason, make a last-ditch effort at cleanup + extra_vars = { + 'private_dirs': [ + '/tmp/ansible_tower/jobs/%s' % self.instance.pk, + self.private_data_dir, + self.proot_temp_dir, + ], + 'job_id': str(self.instance.pk), + } + args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i', + '%s,' % self.host, 'clean_isolated.yml', '-e', + json.dumps(extra_vars)] + logger.debug('Cleaning up job on isolated host with `clean_isolated.yml` playbook.') + buff = cStringIO.StringIO() + status, rc = run.run_pexpect( + args, self.awx_playbook_path, self.env, buff, + idle_timeout=60, job_timeout=60, + pexpect_timeout=5 + ) + + if status != 'successful': + # stdout_handle is closed by this point so writing output to logs is our only option + logger.warning('Cleanup from isolated job encountered error, output:\n{}'.format(buff.getvalue())) + + @staticmethod + def wrap_stdout_handle(instance, private_data_dir, stdout_handle): + dispatcher = CallbackQueueDispatcher() + + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + if 'uuid' in event_data: + filename = '{}-partial.json'.format(event_data['uuid']) + partial_filename = os.path.join(private_data_dir, 'artifacts', 'job_events', filename) + try: + with codecs.open(partial_filename, 'r', encoding='utf-8') as f: + partial_event_data = json.load(f) + event_data.update(partial_event_data) + except IOError: + if event_data.get('event', '') != 'verbose': + logger.error('Missing callback data for event type `{}`, uuid {}, job {}.\nevent_data: {}'.format( + event_data.get('event', ''), event_data['uuid'], instance.id, event_data)) + dispatcher.dispatch(event_data) + + return OutputEventFilter(stdout_handle, job_event_callback) + + def run(self, instance, host, private_data_dir, proot_temp_dir): + """ + Run a job on an isolated host. + + :param instance: a `model.Job` instance + :param host: the hostname (or IP address) to run the + isolated job on + :param private_data_dir: an absolute path on the local file system + where job-specific data should be written + (i.e., `/tmp/ansible_tower_xyz/`) + :param proot_temp_dir: a temporary directory which bwrap maps + restricted paths to + + For a completed job run, this function returns (status, rc), + representing the status and return code of the isolated + `ansible-playbook` run. + """ + self.instance = instance + self.host = host + self.private_data_dir = private_data_dir + self.proot_temp_dir = proot_temp_dir + status, rc = self.dispatch() + if status == 'successful': + status, rc = self.check() + else: + # If dispatch fails, attempt to consume artifacts that *might* exist + self.check() + self.cleanup() + return status, rc diff --git a/awx/main/isolated/run.py b/awx/main/isolated/run.py new file mode 100644 index 0000000000..fdf4603e2c --- /dev/null +++ b/awx/main/isolated/run.py @@ -0,0 +1,260 @@ +import argparse +import base64 +import codecs +import collections +import cStringIO +import logging +import json +import os +import stat +import pipes +import re +import signal +import sys +import thread +import time + +import pexpect +import psutil + + +logger = logging.getLogger('awx.main.utils.expect') + + +def args2cmdline(*args): + return ' '.join([pipes.quote(a) for a in args]) + + +def wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock=None): + if ssh_key_path: + cmd = ' && '.join([args2cmdline('ssh-add', ssh_key_path), + args2cmdline('rm', '-f', ssh_key_path), + args2cmdline(*args)]) + args = ['ssh-agent'] + if ssh_auth_sock: + args.extend(['-a', ssh_auth_sock]) + args.extend(['sh', '-c', cmd]) + return args + + +def open_fifo_write(path, data): + '''open_fifo_write opens the fifo named pipe in a new thread. + This blocks the thread until an external process (such as ssh-agent) + reads data from the pipe. + ''' + os.mkfifo(path, 0600) + thread.start_new_thread(lambda p, d: open(p, 'w').write(d), (path, data)) + + +def run_pexpect(args, cwd, env, logfile, + cancelled_callback=None, expect_passwords={}, + extra_update_fields=None, idle_timeout=None, job_timeout=0, + pexpect_timeout=5, proot_cmd='bwrap'): + ''' + Run the given command using pexpect to capture output and provide + passwords when requested. + + :param args: a list of `subprocess.call`-style arguments + representing a subprocess e.g., ['ls', '-la'] + :param cwd: the directory in which the subprocess should + run + :param env: a dict containing environment variables for the + subprocess, ala `os.environ` + :param logfile: a file-like object for capturing stdout + :param cancelled_callback: a callable - which returns `True` or `False` + - signifying if the job has been prematurely + cancelled + :param expect_passwords: a dict of regular expression password prompts + to input values, i.e., {r'Password:\s*?$': + 'some_password'} + :param extra_update_fields: a dict used to specify DB fields which should + be updated on the underlying model + object after execution completes + :param idle_timeout a timeout (in seconds); if new output is not + sent to stdout in this interval, the process + will be terminated + :param job_timeout a timeout (in seconds); if the total job runtime + exceeds this, the process will be killed + :param pexpect_timeout a timeout (in seconds) to wait on + `pexpect.spawn().expect()` calls + :param proot_cmd the command used to isolate processes, `bwrap` + + Returns a tuple (status, return_code) i.e., `('successful', 0)` + ''' + expect_passwords[pexpect.TIMEOUT] = None + expect_passwords[pexpect.EOF] = None + + if not isinstance(expect_passwords, collections.OrderedDict): + # We iterate over `expect_passwords.keys()` and + # `expect_passwords.values()` separately to map matched inputs to + # patterns and choose the proper string to send to the subprocess; + # enforce usage of an OrderedDict so that the ordering of elements in + # `keys()` matches `values()`. + expect_passwords = collections.OrderedDict(expect_passwords) + password_patterns = expect_passwords.keys() + password_values = expect_passwords.values() + + logfile_pos = logfile.tell() + child = pexpect.spawn( + args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, + encoding='utf-8', echo=False, + ) + child.logfile_read = logfile + canceled = False + timed_out = False + last_stdout_update = time.time() + + job_start = time.time() + while child.isalive(): + result_id = child.expect(password_patterns, timeout=pexpect_timeout, searchwindowsize=100) + password = password_values[result_id] + if password is not None: + child.sendline(password) + if logfile_pos != logfile.tell(): + logfile_pos = logfile.tell() + last_stdout_update = time.time() + canceled = cancelled_callback() if cancelled_callback else False + if not canceled and job_timeout != 0 and (time.time() - job_start) > job_timeout: + timed_out = True + if isinstance(extra_update_fields, dict): + extra_update_fields['job_explanation'] = "Job terminated due to timeout" + if canceled or timed_out: + handle_termination(child, proot_cmd, is_cancel=canceled) + if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: + child.close(True) + canceled = True + if canceled: + return 'canceled', child.exitstatus + elif child.exitstatus == 0 and not timed_out: + return 'successful', child.exitstatus + else: + return 'failed', child.exitstatus + + +def run_isolated_job(private_data_dir, secrets, logfile=sys.stdout): + ''' + Launch `ansible-playbook`, executing a job packaged by + `build_isolated_job_data`. + + :param private_data_dir: an absolute path on the local file system where + job metadata exists (i.e., + `/tmp/ansible_tower_xyz/`) + :param secrets: a dict containing sensitive job metadata, { + 'env': { ... } # environment variables, + 'passwords': { ... } # pexpect password prompts + 'ssh_key_data': 'RSA KEY DATA', + } + :param logfile: a file-like object for capturing stdout + + Returns a tuple (status, return_code) i.e., `('successful', 0)` + ''' + with open(os.path.join(private_data_dir, 'args'), 'r') as args: + args = json.load(args) + + env = secrets.get('env', {}) + expect_passwords = { + re.compile(pattern, re.M): password + for pattern, password in secrets.get('passwords', {}).items() + } + + if 'AD_HOC_COMMAND_ID' in env: + cwd = private_data_dir + else: + cwd = os.path.join(private_data_dir, 'project') + + # write the SSH key data into a fifo read by ssh-agent + ssh_key_data = secrets.get('ssh_key_data') + if ssh_key_data: + ssh_key_path = os.path.join(private_data_dir, 'ssh_key_data') + ssh_auth_sock = os.path.join(private_data_dir, 'ssh_auth.sock') + open_fifo_write(ssh_key_path, ssh_key_data) + args = wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) + + idle_timeout = secrets.get('idle_timeout', 10) + job_timeout = secrets.get('job_timeout', 10) + pexpect_timeout = secrets.get('pexpect_timeout', 5) + + # Use local callback directory + callback_dir = os.getenv('TOWER_LIB_DIRECTORY') + if callback_dir is None: + raise RuntimeError('Location for Tower Ansible callbacks must be specified ' + 'by environment variable TOWER_LIB_DIRECTORY.') + env['ANSIBLE_CALLBACK_PLUGINS'] = os.path.join(callback_dir, 'isolated_callbacks') + if 'AD_HOC_COMMAND_ID' in env: + env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' + else: + env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display' + env['AWX_ISOLATED_DATA_DIR'] = private_data_dir + env['PYTHONPATH'] = env.get('PYTHONPATH', '') + callback_dir + ':' + + return run_pexpect(args, cwd, env, logfile, + expect_passwords=expect_passwords, + idle_timeout=idle_timeout, + job_timeout=job_timeout, + pexpect_timeout=pexpect_timeout) + + +def handle_termination(job, proot_cmd, is_cancel=True): + ''' + Terminate a subprocess spawned by `pexpect`. + + :param job: the pexpect subprocess running the job. + :param proot_cmd the command used to isolate processes i.e., `bwrap` + :param is_cancel: flag showing whether this termination is caused by + instance's cancel_flag. + ''' + try: + if proot_cmd in ' '.join(job.args): + if not psutil: + os.kill(job.pid, signal.SIGKILL) + else: + try: + main_proc = psutil.Process(pid=job.pid) + child_procs = main_proc.children(recursive=True) + for child_proc in child_procs: + os.kill(child_proc.pid, signal.SIGKILL) + os.kill(main_proc.pid, signal.SIGKILL) + except (TypeError, psutil.Error): + os.kill(job.pid, signal.SIGKILL) + else: + os.kill(job.pid, signal.SIGTERM) + time.sleep(3) + except OSError: + keyword = 'cancel' if is_cancel else 'timeout' + logger.warn("Attempted to %s already finished job, ignoring" % keyword) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='run an isolated ansible playbook') + parser.add_argument('job_id') + args = parser.parse_args() + private_data_dir = os.readlink('/tmp/ansible_tower/jobs/%s' % args.job_id) + + buff = cStringIO.StringIO() + with open(os.path.join(private_data_dir, 'env'), 'r') as f: + for line in f: + buff.write(line) + + artifacts_dir = os.path.join(private_data_dir, 'artifacts') + job_events_dir = os.path.join(artifacts_dir, 'job_events') + if not os.path.exists(job_events_dir): + os.makedirs(job_events_dir, mode=stat.S_IRUSR | stat.S_IWUSR) + + # Standard out directed to pickup location without event filtering applied + stdout_filename = os.path.join(artifacts_dir, 'stdout') + os.mknod(stdout_filename, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) + stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') + + status, rc = run_isolated_job( + private_data_dir, + json.loads(base64.b64decode(buff.getvalue())), + stdout_handle + ) + for filename, data in [ + ('status', status), + ('rc', rc), + ]: + artifact_path = os.path.join(private_data_dir, 'artifacts', filename) + os.mknod(artifact_path, stat.S_IFREG | stat.S_IRUSR | stat.S_IWUSR) + with open(artifact_path, 'w') as f: + f.write(str(data)) diff --git a/awx/main/management/commands/register_queue.py b/awx/main/management/commands/register_queue.py index 7eae1e1d97..e0ca862a37 100644 --- a/awx/main/management/commands/register_queue.py +++ b/awx/main/management/commands/register_queue.py @@ -15,16 +15,27 @@ class Command(BaseCommand): help='Queue to create/update'), make_option('--hostnames', dest='hostnames', type='string', help='Comma-Delimited Hosts to add to the Queue'), + make_option('--controller', dest='controller', type='string', default='', + help='The controlling group (makes this an isolated group)'), ) def handle(self, **options): ig = InstanceGroup.objects.filter(name=options.get('queuename')) + control_ig = None + if options.get('controller'): + control_ig = InstanceGroup.objects.filter(name=options.get('controller')).first() if ig.exists(): print("Instance Group already registered {}".format(ig[0])) ig = ig[0] + if control_ig and ig.controller_id != control_ig.pk: + ig.controller = control_ig + ig.save() + print("Set controller group {} on {}.".format(control_ig, ig)) else: print("Creating instance group {}".format(options.get('queuename'))) ig = InstanceGroup(name=options.get('queuename')) + if control_ig: + ig.controller = control_ig ig.save() hostname_list = [] if options.get('hostnames'): diff --git a/awx/main/migrations/0043_v320_instancegroups.py b/awx/main/migrations/0043_v320_instancegroups.py index a89f64ba17..bf0585acf6 100644 --- a/awx/main/migrations/0043_v320_instancegroups.py +++ b/awx/main/migrations/0043_v320_instancegroups.py @@ -19,6 +19,7 @@ class Migration(migrations.Migration): ('name', models.CharField(unique=True, max_length=250)), ('created', models.DateTimeField(auto_now_add=True)), ('modified', models.DateTimeField(auto_now=True)), + ('controller', models.ForeignKey(related_name='controlled_groups', default=None, editable=False, to='main.InstanceGroup', help_text='Instance Group to remotely control this group.', null=True)), ('instances', models.ManyToManyField(help_text='Instances that are members of this InstanceGroup', related_name='rampart_groups', editable=False, to='main.Instance')), ], ), diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 8b04f457e1..d5662c7faa 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -143,6 +143,10 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): from awx.main.tasks import RunAdHocCommand return RunAdHocCommand + @classmethod + def supports_isolation(cls): + return True + def get_absolute_url(self, request=None): return reverse('api:ad_hoc_command_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 1456b68d2b..134646d2cb 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -60,6 +60,14 @@ class InstanceGroup(models.Model): editable=False, help_text=_('Instances that are members of this InstanceGroup'), ) + controller = models.ForeignKey( + 'InstanceGroup', + related_name='controlled_groups', + help_text=_('Instance Group to remotely control this group.'), + editable=False, + default=None, + null=True + ) def get_absolute_url(self, request=None): return reverse('api:instance_group_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 78d0d44192..5a83491a97 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -494,6 +494,10 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): from awx.main.tasks import RunJob return RunJob + @classmethod + def supports_isolation(cls): + return True + def _global_timeout_setting(self): return 'DEFAULT_JOB_TIMEOUT' diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index edcecbcc02..55c53f1d8f 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -594,6 +594,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def _get_task_class(cls): raise NotImplementedError # Implement in subclasses. + @classmethod + def supports_isolation(cls): + return False + @classmethod def _get_parent_field_name(cls): return 'unified_job_template' # Override in subclasses. @@ -952,7 +956,18 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def start_celery_task(self, opts, error_callback, success_callback, queue): task_class = self._get_task_class() - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback, queue=queue) + from awx.main.models.ha import InstanceGroup + ig = InstanceGroup.objects.get(name=queue) + args = [self.pk] + if ig.controller_id: + if self.supports_isolation(): # case of jobs and ad hoc commands + # TODO: dock capacity from controller instance, use capacity to select isolated node + import random + isolated_instance = random.choice(ig.instances.all()) + args.append(isolated_instance.hostname) + else: # proj & inv updates, system jobs run on controller + queue = ig.controller.name + task_class().apply_async(args, opts, link_error=error_callback, link=success_callback, queue=queue) def start(self, error_callback, success_callback, **kwargs): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e3d262615d..d3ad7de17c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -215,7 +215,11 @@ class TaskManager(): else: if type(task) is WorkflowJob: task.status = 'running' - task.instance_group = rampart_group + if not task.supports_isolation() and rampart_group.controller_id: + # non-Ansible jobs on isolated instances run on controller + task.instance_group = rampart_group.controller + else: + task.instance_group = rampart_group task.save() self.consume_capacity(task, rampart_group.name) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 96df154179..8445e6ce19 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -6,16 +6,14 @@ import codecs from collections import OrderedDict import ConfigParser import cStringIO +import imp import json import logging import os -import signal -import pipes import re import shutil import stat import tempfile -import thread import time import traceback import urlparse @@ -29,9 +27,6 @@ try: except: psutil = None -# Pexpect -import pexpect - # Celery from celery import Task, task from celery.signals import celeryd_init, worker_process_init @@ -54,9 +49,11 @@ from awx.main.models import * # noqa from awx.main.models.unified_jobs import ACTIVE_STATES from awx.main.queue import CallbackQueueDispatcher from awx.main.task_engine import TaskEnhancer +from awx.main.isolated import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, - check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, - get_system_task_capacity, OutputEventFilter, parse_yaml_or_json) + check_proot_installed, build_proot_temp_dir, + wrap_args_with_proot, get_system_task_capacity, OutputEventFilter, + parse_yaml_or_json) from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification @@ -407,7 +404,7 @@ class BaseTask(Task): ''' Create a temporary directory for job-related files. ''' - path = tempfile.mkdtemp(prefix='ansible_tower_') + path = tempfile.mkdtemp(prefix='ansible_tower_%s_' % instance.pk) os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) return path @@ -445,7 +442,7 @@ class BaseTask(Task): # will be read then closed, instead of leaving the SSH key on disk. if credential.kind in ('ssh', 'scm') and not ssh_too_old: path = os.path.join(kwargs.get('private_data_dir', tempfile.gettempdir()), name) - self.open_fifo_write(path, data) + run.open_fifo_write(path, data) private_data_files['credentials']['ssh'] = path # Ansible network modules do not yet support ssh-agent. # Instead, ssh private key file is explicitly passed via an @@ -459,14 +456,6 @@ class BaseTask(Task): private_data_files['credentials'][credential] = path return private_data_files - def open_fifo_write(self, path, data): - '''open_fifo_write opens the fifo named pipe in a new thread. - This blocks until the the calls to ssh-agent/ssh-add have read the - credential information from the pipe. - ''' - os.mkfifo(path, 0600) - thread.start_new_thread(lambda p, d: open(p, 'w').write(d), (path, data)) - def build_passwords(self, instance, **kwargs): ''' Build a dictionary of passwords for responding to prompts. @@ -477,7 +466,7 @@ class BaseTask(Task): '': '', } - def add_ansible_venv(self, env): + def add_ansible_venv(self, env, add_tower_lib=True): if settings.ANSIBLE_USE_VENV: env['VIRTUAL_ENV'] = settings.ANSIBLE_VENV_PATH env['PATH'] = os.path.join(settings.ANSIBLE_VENV_PATH, "bin") + ":" + env['PATH'] @@ -488,7 +477,8 @@ class BaseTask(Task): env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":" break # Add awx/lib to PYTHONPATH. - env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')])) + if add_tower_lib: + env['PYTHONPATH'] = env.get('PYTHONPATH', '') + self.get_path_to('..', 'lib') + ':' return env def add_tower_venv(self, env): @@ -537,26 +527,39 @@ class BaseTask(Task): safe_env[k] = urlpass_re.sub(HIDDEN_PASSWORD, v) return safe_env - def args2cmdline(self, *args): - return ' '.join([pipes.quote(a) for a in args]) - - def wrap_args_with_ssh_agent(self, args, ssh_key_path, ssh_auth_sock=None): - if ssh_key_path: - cmd = ' && '.join([self.args2cmdline('ssh-add', ssh_key_path), - self.args2cmdline('rm', '-f', ssh_key_path), - self.args2cmdline(*args)]) - args = ['ssh-agent'] - if ssh_auth_sock: - args.extend(['-a', ssh_auth_sock]) - args.extend(['sh', '-c', cmd]) - return args - def should_use_proot(self, instance, **kwargs): ''' Return whether this task should use proot. ''' return False + def build_inventory(self, instance, **kwargs): + plugin = self.get_path_to('..', 'plugins', 'inventory', 'awxrest.py') + if kwargs.get('isolated') is True: + # For isolated jobs, we have to interact w/ the REST API from the + # controlling node and ship the static JSON inventory to the + # isolated host (because the isolated host itself can't reach the + # Tower REST API to fetch the inventory). + path = os.path.join(kwargs['private_data_dir'], 'inventory') + if os.path.exists(path): + return path + awxrest = imp.load_source('awxrest', plugin) + with open(path, 'w') as f: + buff = cStringIO.StringIO() + awxrest.InventoryScript(**{ + 'base_url': settings.INTERNAL_API_URL, + 'authtoken': instance.task_auth_token or '', + 'inventory_id': str(instance.inventory.pk), + 'list': True, + 'hostvars': True, + }).run(buff) + json_data = buff.getvalue().strip() + f.write("#! /usr/bin/env python\nprint '''%s'''\n" % json_data) + os.chmod(path, stat.S_IRUSR | stat.S_IXUSR) + return path + else: + return plugin + def build_args(self, instance, **kwargs): raise NotImplementedError @@ -572,6 +575,17 @@ class BaseTask(Task): def get_idle_timeout(self): return None + def get_instance_timeout(self, instance): + global_timeout_setting_name = instance._global_timeout_setting() + if global_timeout_setting_name: + global_timeout = getattr(settings, global_timeout_setting_name, 0) + local_timeout = getattr(instance, 'timeout', 0) + job_timeout = global_timeout if local_timeout == 0 else local_timeout + job_timeout = 0 if local_timeout < 0 else job_timeout + else: + job_timeout = 0 + return job_timeout + def get_password_prompts(self): ''' Return a dictionary where keys are strings or regular expressions for @@ -591,105 +605,6 @@ class BaseTask(Task): assert stdout_handle.name == stdout_filename return stdout_handle - def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, - output_replacements=None, extra_update_fields=None): - ''' - Run the given command using pexpect to capture output and provide - passwords when requested. - ''' - logfile = stdout_handle - logfile_pos = logfile.tell() - global_timeout_setting_name = instance._global_timeout_setting() - if global_timeout_setting_name: - global_timeout = getattr(settings, global_timeout_setting_name, 0) - local_timeout = getattr(instance, 'timeout', 0) - job_timeout = global_timeout if local_timeout == 0 else local_timeout - job_timeout = 0 if local_timeout < 0 else job_timeout - else: - job_timeout = 0 - child = pexpect.spawn( - args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, - encoding='utf-8', echo=False, - ) - child.logfile_read = logfile - canceled = False - timed_out = False - last_stdout_update = time.time() - idle_timeout = self.get_idle_timeout() - expect_list = [] - expect_passwords = {} - pexpect_timeout = getattr(settings, 'PEXPECT_TIMEOUT', 5) - for n, item in enumerate(self.get_password_prompts().items()): - expect_list.append(item[0]) - expect_passwords[n] = passwords.get(item[1], '') or '' - expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) - instance = self.update_model(instance.pk, status='running', - execution_node=settings.CLUSTER_HOST_ID, - output_replacements=output_replacements) - job_start = time.time() - while child.isalive(): - result_id = child.expect(expect_list, timeout=pexpect_timeout, searchwindowsize=100) - if result_id in expect_passwords: - child.sendline(expect_passwords[result_id]) - if logfile_pos != logfile.tell(): - logfile_pos = logfile.tell() - last_stdout_update = time.time() - # Refresh model instance from the database (to check cancel flag). - instance = self.update_model(instance.pk) - if instance.cancel_flag: - canceled = True - elif job_timeout != 0 and (time.time() - job_start) > job_timeout: - timed_out = True - if isinstance(extra_update_fields, dict): - extra_update_fields['job_explanation'] = "Job terminated due to timeout" - if canceled or timed_out: - self._handle_termination(instance, child, is_cancel=canceled) - if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: - child.close(True) - canceled = True - if canceled: - return 'canceled', child.exitstatus - elif child.exitstatus == 0 and not timed_out: - return 'successful', child.exitstatus - else: - return 'failed', child.exitstatus - - def _handle_termination(self, instance, job, is_cancel=True): - '''Helper function to properly terminate specified job. - - Args: - instance: The corresponding model instance of this task. - job: The pexpect subprocess running the job. - is_cancel: Flag showing whether this termination is caused by instance's - cancel_flag. - - Return: - None. - ''' - try: - if settings.AWX_PROOT_ENABLED and self.should_use_proot(instance): - # NOTE: Refactor this once we get a newer psutil across the board - if not psutil: - os.kill(job.pid, signal.SIGKILL) - else: - try: - main_proc = psutil.Process(pid=job.pid) - if hasattr(main_proc, "children"): - child_procs = main_proc.children(recursive=True) - else: - child_procs = main_proc.get_children(recursive=True) - for child_proc in child_procs: - os.kill(child_proc.pid, signal.SIGKILL) - os.kill(main_proc.pid, signal.SIGKILL) - except (TypeError, psutil.Error): - os.kill(job.pid, signal.SIGKILL) - else: - os.kill(job.pid, signal.SIGTERM) - time.sleep(3) - except OSError: - keyword = 'cancel' if is_cancel else 'timeout' - logger.warn("Attempted to %s already finished job, ignoring" % keyword) - def pre_run_hook(self, instance, **kwargs): ''' Hook for any steps to run before the job/task starts @@ -705,7 +620,7 @@ class BaseTask(Task): Hook for any steps to run after job/task is marked as complete. ''' - def run(self, pk, **kwargs): + def run(self, pk, isolated_host=None, **kwargs): ''' Run the job/task and capture its output. ''' @@ -716,6 +631,7 @@ class BaseTask(Task): output_replacements = [] extra_update_fields = {} try: + kwargs['isolated'] = isolated_host is not None self.pre_run_hook(instance, **kwargs) if instance.cancel_flag: instance = self.update_model(instance.pk, status='canceled') @@ -754,7 +670,12 @@ class BaseTask(Task): credential, env, safe_env, args, safe_args, kwargs['private_data_dir'] ) - stdout_handle = self.get_stdout_handle(instance) + if isolated_host is None: + stdout_handle = self.get_stdout_handle(instance) + else: + base_handle = super(self.__class__, self).get_stdout_handle(instance) + stdout_handle = isolated_manager.IsolatedManager.wrap_stdout_handle( + instance, kwargs['private_data_dir'], base_handle) if self.should_use_proot(instance, **kwargs): if not check_proot_installed(): raise RuntimeError('bubblewrap is not installed') @@ -763,14 +684,42 @@ class BaseTask(Task): safe_args = wrap_args_with_proot(safe_args, cwd, **kwargs) # If there is an SSH key path defined, wrap args with ssh-agent. ssh_key_path = self.get_ssh_key_path(instance, **kwargs) - if ssh_key_path: + # If we're executing on an isolated host, don't bother adding the + # key to the agent in this environment + if ssh_key_path and isolated_host is None: ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock') - args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) - safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) + args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) + safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) instance = self.update_model(pk, job_args=json.dumps(safe_args), job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name) - status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, - extra_update_fields=extra_update_fields) + + expect_passwords = {} + for k, v in self.get_password_prompts().items(): + expect_passwords[k] = kwargs['passwords'].get(v, '') or '' + _kw = dict( + expect_passwords=expect_passwords, + cancelled_callback=lambda: self.update_model(instance.pk).cancel_flag, + job_timeout=self.get_instance_timeout(instance), + idle_timeout=self.get_idle_timeout(), + extra_update_fields=extra_update_fields, + pexpect_timeout=getattr(settings, 'PEXPECT_TIMEOUT', 5), + proot_cmd=getattr(settings, 'AWX_PROOT_CMD', 'bwrap'), + ) + instance = self.update_model(instance.pk, status='running', + execution_node=settings.CLUSTER_HOST_ID, + output_replacements=output_replacements) + if isolated_host: + manager_instance = isolated_manager.IsolatedManager( + args, cwd, env, stdout_handle, ssh_key_path, **_kw + ) + status, rc = manager_instance.run(instance, isolated_host, + kwargs['private_data_dir'], + kwargs.get('proot_temp_dir')) + else: + status, rc = run.run_pexpect( + args, cwd, env, stdout_handle, **_kw + ) + except Exception: if status != 'canceled': tb = traceback.format_exc() @@ -901,7 +850,7 @@ class RunJob(BaseTask): plugin_dirs.extend(settings.AWX_ANSIBLE_CALLBACK_PLUGINS) plugin_path = ':'.join(plugin_dirs) env = super(RunJob, self).build_env(job, **kwargs) - env = self.add_ansible_venv(env) + env = self.add_ansible_venv(env, add_tower_lib=kwargs.get('isolated', False)) # Set environment variables needed for inventory and job event # callbacks to work. env['JOB_ID'] = str(job.pk) @@ -909,14 +858,15 @@ class RunJob(BaseTask): if job.project: env['PROJECT_REVISION'] = job.project.scm_revision env['ANSIBLE_RETRY_FILES_ENABLED'] = "False" - env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path - env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display' - env['REST_API_URL'] = settings.INTERNAL_API_URL - env['REST_API_TOKEN'] = job.task_auth_token or '' - env['TOWER_HOST'] = settings.TOWER_URL_BASE env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA) - env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE - env['CALLBACK_CONNECTION'] = settings.BROKER_URL + if not kwargs.get('isolated'): + env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path + env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display' + env['REST_API_URL'] = settings.INTERNAL_API_URL + env['REST_API_TOKEN'] = job.task_auth_token or '' + env['TOWER_HOST'] = settings.TOWER_URL_BASE + env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE + env['CALLBACK_CONNECTION'] = settings.BROKER_URL env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' @@ -980,7 +930,7 @@ class RunJob(BaseTask): env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password') # Set environment variables related to gathering facts from the cache - if job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True: + if (job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True) and not kwargs.get('isolated'): env['FACT_QUEUE'] = settings.FACT_QUEUE env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') @@ -1007,9 +957,7 @@ class RunJob(BaseTask): # it doesn't make sense to rely on ansible-playbook's default of using # the current user. ssh_username = ssh_username or 'root' - inventory_script = self.get_path_to('..', 'plugins', 'inventory', - 'awxrest.py') - args = ['ansible-playbook', '-i', inventory_script] + args = ['ansible-playbook', '-i', self.build_inventory(job, **kwargs)] if job.job_type == 'check': args.append('--check') args.extend(['-u', ssh_username]) @@ -1144,12 +1092,15 @@ class RunJob(BaseTask): def pre_run_hook(self, job, **kwargs): if job.project and job.project.scm_type: job_request_id = '' if self.request.id is None else self.request.id + pu_ig = job.instance_group + if kwargs['isolated']: + pu_ig = pu_ig.controller local_project_sync = job.project.create_project_update( launch_type="sync", _eager_fields=dict( job_type='run', status='running', - instance_group = job.instance_group, + instance_group = pu_ig, celery_task_id=job_request_id)) # save the associated job before calling run() so that a # cancel() call on the job can cancel the project update @@ -1266,12 +1217,15 @@ class RunProjectUpdate(BaseTask): return scm_url, extra_vars + def build_inventory(self, instance, **kwargs): + return 'localhost,' + def build_args(self, project_update, **kwargs): ''' Build command line argument list for running ansible-playbook, optionally using ssh-agent for public/private key authentication. ''' - args = ['ansible-playbook', '-i', 'localhost,'] + args = ['ansible-playbook', '-i', self.build_inventory(project_update, **kwargs)] if getattr(settings, 'PROJECT_UPDATE_VVV', False): args.append('-vvv') else: @@ -1885,7 +1839,6 @@ class RunInventoryUpdate(BaseTask): pass - class RunAdHocCommand(BaseTask): ''' Celery task to run an ad hoc command using ansible. @@ -1984,9 +1937,7 @@ class RunAdHocCommand(BaseTask): # it doesn't make sense to rely on ansible's default of using the # current user. ssh_username = ssh_username or 'root' - inventory_script = self.get_path_to('..', 'plugins', 'inventory', - 'awxrest.py') - args = ['ansible', '-i', inventory_script] + args = ['ansible', '-i', self.build_inventory(ad_hoc_command, **kwargs)] if ad_hoc_command.job_type == 'check': args.append('--check') args.extend(['-u', ssh_username]) diff --git a/awx/main/tests/unit/isolated/test_expect.py b/awx/main/tests/unit/isolated/test_expect.py new file mode 100644 index 0000000000..a34d4f3881 --- /dev/null +++ b/awx/main/tests/unit/isolated/test_expect.py @@ -0,0 +1,303 @@ +import cStringIO +import mock +import os +import pytest +import re +import shutil +import stat +import tempfile +import time +from collections import OrderedDict + +from Crypto.PublicKey import RSA +from Crypto import Random + +from awx.main.isolated import run, isolated_manager + +HERE, FILENAME = os.path.split(__file__) + + +@pytest.fixture(scope='function') +def rsa_key(request): + passphrase = 'passme' + key = RSA.generate(1024, Random.new().read) + return (key.exportKey('PEM', passphrase, pkcs=1), passphrase) + + +@pytest.fixture(scope='function') +def private_data_dir(request): + path = tempfile.mkdtemp(prefix='ansible_tower_unit_test') + request.addfinalizer(lambda: shutil.rmtree(path)) + return path + + +@pytest.fixture(autouse=True) +def mock_sleep(request): + # the process teardown mechanism uses `time.sleep` to wait on processes to + # respond to SIGTERM; these are tests and don't care about being nice + m = mock.patch('time.sleep') + m.start() + request.addfinalizer(m.stop) + + +def test_simple_spawn(): + stdout = cStringIO.StringIO() + status, rc = run.run_pexpect( + ['ls', '-la'], + HERE, + {}, + stdout, + cancelled_callback=lambda: False, + ) + assert status == 'successful' + assert rc == 0 + assert FILENAME in stdout.getvalue() + + +def test_error_rc(): + stdout = cStringIO.StringIO() + status, rc = run.run_pexpect( + ['ls', '-nonsense'], + HERE, + {}, + stdout, + cancelled_callback=lambda: False, + ) + assert status == 'failed' + # I'd expect 2, but we shouldn't risk making this test platform-dependent + assert rc > 0 + + +def test_env_vars(): + stdout = cStringIO.StringIO() + status, rc = run.run_pexpect( + ['python', '-c', 'import os; print os.getenv("X_MY_ENV")'], + HERE, + {'X_MY_ENV': 'abc123'}, + stdout, + cancelled_callback=lambda: False, + ) + assert status == 'successful' + assert rc == 0 + assert 'abc123' in stdout.getvalue() + + +def test_password_prompt(): + stdout = cStringIO.StringIO() + expect_passwords = OrderedDict() + expect_passwords[re.compile(r'Password:\s*?$', re.M)] = 'secret123' + status, rc = run.run_pexpect( + ['python', '-c', 'print raw_input("Password: ")'], + HERE, + {}, + stdout, + cancelled_callback=lambda: False, + expect_passwords=expect_passwords + ) + assert status == 'successful' + assert rc == 0 + assert 'secret123' in stdout.getvalue() + + +def test_job_timeout(): + stdout = cStringIO.StringIO() + extra_update_fields={} + status, rc = run.run_pexpect( + ['python', '-c', 'import time; time.sleep(5)'], + HERE, + {}, + stdout, + cancelled_callback=lambda: False, + extra_update_fields=extra_update_fields, + job_timeout=.01, + pexpect_timeout=0, + ) + assert status == 'failed' + assert extra_update_fields == {'job_explanation': 'Job terminated due to timeout'} + + +def test_manual_cancellation(): + stdout = cStringIO.StringIO() + status, rc = run.run_pexpect( + ['python', '-c', 'print raw_input("Password: ")'], + HERE, + {}, + stdout, + cancelled_callback=lambda: True, # this callable will cause cancellation + # the lack of password inputs will cause stdin to hang + pexpect_timeout=0, + ) + assert status == 'canceled' + + +def test_build_isolated_job_data(private_data_dir, rsa_key): + pem, passphrase = rsa_key + mgr = isolated_manager.IsolatedManager( + ['ls', '-la'], HERE, {}, cStringIO.StringIO(), '' + ) + mgr.private_data_dir = private_data_dir + mgr.build_isolated_job_data() + + path = os.path.join(private_data_dir, 'project') + assert os.path.isdir(path) + + # /project is a soft link to HERE, which is the directory + # _this_ test file lives in + assert os.path.exists(os.path.join(path, FILENAME)) + + path = os.path.join(private_data_dir, 'artifacts') + assert os.path.isdir(path) + assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IRUSR + stat.S_IWUSR # user rw + + path = os.path.join(private_data_dir, 'args') + with open(path, 'r') as f: + assert stat.S_IMODE(os.stat(path).st_mode) == stat.S_IRUSR # user r/o + assert f.read() == '["ls", "-la"]' + + path = os.path.join(private_data_dir, '.rsync-filter') + with open(path, 'r') as f: + data = f.read() + assert data == '\n'.join([ + '- /project/.git', + '- /project/.svn', + '- /project/.hg', + '- /artifacts/job_events/*-partial.json.tmp' + ]) + + +def test_run_isolated_job(private_data_dir, rsa_key): + env = {'JOB_ID': '1'} + pem, passphrase = rsa_key + mgr = isolated_manager.IsolatedManager( + ['ls', '-la'], HERE, env, cStringIO.StringIO(), '' + ) + mgr.private_data_dir = private_data_dir + secrets = { + 'env': env, + 'passwords': { + r'Enter passphrase for .*:\s*?$': passphrase + }, + 'ssh_key_data': pem + } + mgr.build_isolated_job_data() + stdout = cStringIO.StringIO() + # Mock environment variables for callback module + with mock.patch('os.getenv') as env_mock: + env_mock.return_value = '/path/to/awx/lib' + status, rc = run.run_isolated_job(private_data_dir, secrets, stdout) + assert status == 'successful' + assert rc == 0 + assert FILENAME in stdout.getvalue() + + assert '/path/to/awx/lib' in env['PYTHONPATH'] + assert env['ANSIBLE_STDOUT_CALLBACK'] == 'tower_display' + assert env['ANSIBLE_CALLBACK_PLUGINS'] == '/path/to/awx/lib/isolated_callbacks' + assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir + + +def test_run_isolated_adhoc_command(private_data_dir, rsa_key): + env = {'AD_HOC_COMMAND_ID': '1'} + pem, passphrase = rsa_key + mgr = isolated_manager.IsolatedManager( + ['pwd'], HERE, env, cStringIO.StringIO(), '' + ) + mgr.private_data_dir = private_data_dir + secrets = { + 'env': env, + 'passwords': { + r'Enter passphrase for .*:\s*?$': passphrase + }, + 'ssh_key_data': pem + } + mgr.build_isolated_job_data() + stdout = cStringIO.StringIO() + # Mock environment variables for callback module + with mock.patch('os.getenv') as env_mock: + env_mock.return_value = '/path/to/awx/lib' + status, rc = run.run_isolated_job(private_data_dir, secrets, stdout) + assert status == 'successful' + assert rc == 0 + + # for ad-hoc jobs, `ansible` is invoked from the `private_data_dir`, so + # an ad-hoc command that runs `pwd` should print `private_data_dir` to stdout + assert private_data_dir in stdout.getvalue() + + assert '/path/to/awx/lib' in env['PYTHONPATH'] + assert env['ANSIBLE_STDOUT_CALLBACK'] == 'minimal' + assert env['ANSIBLE_CALLBACK_PLUGINS'] == '/path/to/awx/lib/isolated_callbacks' + assert env['AWX_ISOLATED_DATA_DIR'] == private_data_dir + + +def test_check_isolated_job(private_data_dir, rsa_key): + pem, passphrase = rsa_key + stdout = cStringIO.StringIO() + mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '') + mgr.private_data_dir = private_data_dir + mgr.instance = mock.Mock(pk=123, verbosity=5, spec_set=['pk', 'verbosity']) + mgr.started_at = time.time() + mgr.host = 'isolated-host' + + os.mkdir(os.path.join(private_data_dir, 'artifacts')) + with mock.patch('awx.main.isolated.run.run_pexpect') as run_pexpect: + + def _synchronize_job_artifacts(args, cwd, env, buff, **kw): + buff.write('checking job status...') + for filename, data in ( + ['status', 'failed'], + ['rc', '1'], + ['stdout', 'KABOOM!'], + ): + with open(os.path.join(private_data_dir, 'artifacts', filename), 'w') as f: + f.write(data) + return ('successful', 0) + + run_pexpect.side_effect = _synchronize_job_artifacts + with mock.patch.object(mgr, '_missing_artifacts') as missing_artifacts: + missing_artifacts.return_value = False + status, rc = mgr.check() + + assert status == 'failed' + assert rc == 1 + assert stdout.getvalue() == 'KABOOM!' + + run_pexpect.assert_called_with( + [ + 'ansible-playbook', '-u', 'root', '-i', 'isolated-host,', + 'check_isolated.yml', '-e', '{"src": "%s", "job_id": "123"}' % private_data_dir, + '-vvvvv' + ], + '/tower_devel/awx/playbooks', mgr.env, mock.ANY, + cancelled_callback=None, + idle_timeout=0, + job_timeout=0, + pexpect_timeout=5, + proot_cmd='bwrap' + ) + + +def test_check_isolated_job_timeout(private_data_dir, rsa_key): + pem, passphrase = rsa_key + stdout = cStringIO.StringIO() + extra_update_fields = {} + mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '', + job_timeout=1, + extra_update_fields=extra_update_fields) + mgr.private_data_dir = private_data_dir + mgr.instance = mock.Mock(pk=123, verbosity=5, spec_set=['pk', 'verbosity']) + mgr.started_at = time.time() + mgr.host = 'isolated-host' + + with mock.patch('awx.main.isolated.run.run_pexpect') as run_pexpect: + + def _synchronize_job_artifacts(args, cwd, env, buff, **kw): + buff.write('checking job status...') + return ('failed', 1) + + run_pexpect.side_effect = _synchronize_job_artifacts + status, rc = mgr.check() + + assert status == 'failed' + assert rc == 1 + assert stdout.getvalue() == 'checking job status...' + + assert extra_update_fields['job_explanation'] == 'Job terminated due to timeout' diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 9b575da56d..d576f661ce 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -3,14 +3,16 @@ from datetime import datetime from functools import partial import ConfigParser import json +import os +import shutil import tempfile -import os import fcntl -import pytest import mock +import pytest import yaml + from awx.main.models import ( Credential, CredentialType, @@ -198,11 +200,21 @@ class TestJobExecution: EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----' def setup_method(self, method): + self.project_path = tempfile.mkdtemp(prefix='ansible_tower_project_') + with open(os.path.join(self.project_path, 'helloworld.yml'), 'w') as f: + f.write('---') + + # The primary goal of these tests is to mock our `run_pexpect` call + # and make assertions about the arguments and environment passed to it. + self.run_pexpect = mock.Mock() + self.run_pexpect.return_value = ['successful', 0] + self.patches = [ - mock.patch.object(Project, 'get_project_path', lambda *a, **kw: '/tmp/'), + mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path), # don't emit websocket statuses; they use the DB and complicate testing mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()), - mock.patch.object(Job, 'inventory', mock.Mock(pk=1, spec_set=['pk'])) + mock.patch.object(Job, 'inventory', mock.Mock(pk=1, spec_set=['pk'])), + mock.patch('awx.main.isolated.run.run_pexpect', self.run_pexpect) ] for p in self.patches: p.start() @@ -220,16 +232,13 @@ class TestJobExecution: self.task = self.TASK_CLS() self.task.update_model = mock.Mock(side_effect=status_side_effect) - # The primary goal of these tests is to mock our `run_pexpect` call - # and make assertions about the arguments and environment passed to it. - self.task.run_pexpect = mock.Mock(return_value=['successful', 0]) - # ignore pre-run and post-run hooks, they complicate testing in a variety of ways self.task.pre_run_hook = self.task.post_run_hook = self.task.final_run_hook = mock.Mock() def teardown_method(self, method): for p in self.patches: p.stop() + shutil.rmtree(self.project_path, True) def get_instance(self): job = Job( @@ -238,7 +247,9 @@ class TestJobExecution: status='new', job_type='run', cancel_flag=False, - project=Project() + project=Project(), + playbook='helloworld.yml', + verbosity=3 ) # mock the job.extra_credentials M2M relation so we can avoid DB access @@ -273,12 +284,107 @@ class TestGenericRun(TestJobExecution): def test_uses_bubblewrap(self): self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert args[0] == 'bwrap' +class TestIsolatedExecution(TestJobExecution): + + REMOTE_HOST = 'some-isolated-host' + + def test_with_ssh_credentials(self): + mock_get = mock.Mock() + ssh = CredentialType.defaults['ssh']() + credential = Credential( + pk=1, + credential_type=ssh, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + credential.inputs['password'] = encrypt_field(credential, 'password') + self.instance.credential = credential + + private_data = tempfile.mkdtemp(prefix='ansible_tower_') + self.task.build_private_data_dir = mock.Mock(return_value=private_data) + inventory = json.dumps({"all": {"hosts": ["localhost"]}}) + + def _mock_job_artifacts(*args, **kw): + artifacts = os.path.join(private_data, 'artifacts') + if not os.path.exists(artifacts): + os.makedirs(artifacts) + if 'run_isolated.yml' in args[0]: + for filename, data in ( + ['status', 'successful'], + ['rc', '0'], + ['stdout', 'IT WORKED!'], + ): + with open(os.path.join(artifacts, filename), 'w') as f: + f.write(data) + return ('successful', 0) + self.run_pexpect.side_effect = _mock_job_artifacts + + with mock.patch('time.sleep'): + with mock.patch('requests.get') as mock_get: + mock_get.return_value = mock.Mock(content=inventory) + self.task.run(self.pk, self.REMOTE_HOST) + assert mock_get.call_count == 1 + assert mock.call( + 'http://127.0.0.1:8013/api/v1/inventories/1/script/?hostvars=1', + auth=mock.ANY + ) in mock_get.call_args_list + + playbook_run = self.run_pexpect.call_args_list[0][0] + assert ' '.join(playbook_run[0]).startswith(' '.join([ + 'ansible-playbook', '-u', 'root', '-i', self.REMOTE_HOST + ',', + 'run_isolated.yml', '-e', + ])) + extra_vars = playbook_run[0][playbook_run[0].index('-e') + 1] + extra_vars = json.loads(extra_vars) + assert extra_vars['dest'] == '/tmp' + assert extra_vars['src'] == private_data + assert extra_vars['proot_temp_dir'].startswith('/tmp/ansible_tower_proot_') + assert extra_vars['job_id'] == '1' + + def test_systemctl_failure(self): + # If systemctl fails, read the contents of `artifacts/systemctl_logs` + mock_get = mock.Mock() + ssh = CredentialType.defaults['ssh']() + credential = Credential( + pk=1, + credential_type=ssh, + inputs = {'username': 'bob',} + ) + self.instance.credential = credential + + private_data = tempfile.mkdtemp(prefix='ansible_tower_') + self.task.build_private_data_dir = mock.Mock(return_value=private_data) + inventory = json.dumps({"all": {"hosts": ["localhost"]}}) + + def _mock_job_artifacts(*args, **kw): + artifacts = os.path.join(private_data, 'artifacts') + if not os.path.exists(artifacts): + os.makedirs(artifacts) + if 'run_isolated.yml' in args[0]: + for filename, data in ( + ['systemctl_logs', 'ERROR IN EXPECT.PY'], + ): + with open(os.path.join(artifacts, filename), 'w') as f: + f.write(data) + return ('successful', 0) + self.run_pexpect.side_effect = _mock_job_artifacts + + with mock.patch('time.sleep'): + with mock.patch('requests.get') as mock_get: + mock_get.return_value = mock.Mock(content=inventory) + with pytest.raises(Exception): + self.task.run(self.pk, self.REMOTE_HOST) + + class TestJobCredentials(TestJobExecution): parametrize = { @@ -301,11 +407,11 @@ class TestJobCredentials(TestJobExecution): self.instance.credential = credential self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, call_kwargs = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args - assert passwords[password_name] == 'secret' + assert 'secret' in call_kwargs.get('expect_passwords').values() assert '-u bob' in ' '.join(args) if expected_flag: assert expected_flag in ' '.join(args) @@ -324,7 +430,7 @@ class TestJobCredentials(TestJobExecution): self.instance.credential = credential def run_pexpect_side_effect(private_data, *args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args ssh_key_data_fifo = '/'.join([private_data, 'credential_1']) assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY assert ' '.join(args).startswith( @@ -338,9 +444,7 @@ class TestJobCredentials(TestJobExecution): private_data = tempfile.mkdtemp(prefix='ansible_tower_') self.task.build_private_data_dir = mock.Mock(return_value=private_data) - self.task.run_pexpect = mock.Mock( - side_effect=partial(run_pexpect_side_effect, private_data) - ) + self.run_pexpect.side_effect = partial(run_pexpect_side_effect, private_data) self.task.run(self.pk, private_data_dir=private_data) def test_aws_cloud_credential(self): @@ -354,9 +458,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['AWS_ACCESS_KEY'] == 'bob' assert env['AWS_SECRET_KEY'] == 'secret' @@ -374,9 +478,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['AWS_ACCESS_KEY'] == 'bob' assert env['AWS_SECRET_KEY'] == 'secret' @@ -397,14 +501,14 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['GCE_EMAIL'] == 'bob' assert env['GCE_PROJECT'] == 'some-project' ssh_key_data = env['GCE_PEM_FILE_PATH'] assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_azure_credentials(self): @@ -421,13 +525,13 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['AZURE_SUBSCRIPTION_ID'] == 'bob' ssh_key_data = env['AZURE_CERT_PATH'] assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_azure_rm_with_tenant(self): @@ -447,9 +551,9 @@ class TestJobCredentials(TestJobExecution): self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['AZURE_CLIENT_ID'] == 'some-client' assert env['AZURE_SECRET'] == 'some-secret' @@ -472,9 +576,9 @@ class TestJobCredentials(TestJobExecution): self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription' assert env['AZURE_AD_USER'] == 'bob' @@ -491,9 +595,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['VMWARE_USER'] == 'bob' assert env['VMWARE_PASSWORD'] == 'secret' @@ -515,7 +619,7 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read() assert shade_config == '\n'.join([ 'clouds:', @@ -529,7 +633,7 @@ class TestJobCredentials(TestJobExecution): ]) return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_net_credentials(self): @@ -550,7 +654,7 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['ANSIBLE_NET_USERNAME'] == 'bob' assert env['ANSIBLE_NET_PASSWORD'] == 'secret' assert env['ANSIBLE_NET_AUTHORIZE'] == '1' @@ -558,7 +662,7 @@ class TestJobCredentials(TestJobExecution): assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'rb').read() == self.EXAMPLE_PRIVATE_KEY return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_custom_environment_injectors_with_jinja_syntax_error(self): @@ -614,9 +718,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['MY_CLOUD_API_TOKEN'] == 'ABC123' @@ -646,9 +750,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['JOB_ID'] == str(self.instance.pk) @@ -680,9 +784,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123' assert 'SUPER-SECRET-123' not in json.dumps(self.task.update_model.call_args_list) @@ -713,9 +817,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert '-e {"api_token": "ABC123"}' in ' '.join(args) @@ -750,9 +854,9 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(credential) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, _ = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args assert '-e {"password": "SUPER-SECRET-123"}' in ' '.join(args) assert 'SUPER-SECRET-123' not in json.dumps(self.task.update_model.call_args_list) @@ -787,11 +891,11 @@ class TestJobCredentials(TestJobExecution): self.task.run(self.pk) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert open(env['MY_CLOUD_INI_FILE'], 'rb').read() == '[mycloud]\nABC123' return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_multi_cloud(self): @@ -821,7 +925,7 @@ class TestJobCredentials(TestJobExecution): self.instance.extra_credentials.add(azure_credential) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['GCE_EMAIL'] == 'bob' assert env['GCE_PROJECT'] == 'some-project' @@ -834,7 +938,7 @@ class TestJobCredentials(TestJobExecution): return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) @@ -874,12 +978,12 @@ class TestProjectUpdateCredentials(TestJobExecution): ) self.task.run(self.pk) - assert self.task.run_pexpect.call_count == 1 - call_args, _ = self.task.run_pexpect.call_args_list[0] - job, args, cwd, env, passwords, stdout = call_args + assert self.run_pexpect.call_count == 1 + call_args, call_kwargs = self.run_pexpect.call_args_list[0] + args, cwd, env, stdout = call_args - assert passwords.get('scm_username') == 'bob' - assert passwords.get('scm_password') == 'secret' + assert 'bob' in call_kwargs.get('expect_passwords').values() + assert 'secret' in call_kwargs.get('expect_passwords').values() def test_ssh_key_auth(self, scm_type): ssh = CredentialType.defaults['ssh']() @@ -897,7 +1001,7 @@ class TestProjectUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(private_data, *args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args ssh_key_data_fifo = '/'.join([private_data, 'credential_1']) assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY assert ' '.join(args).startswith( @@ -907,14 +1011,12 @@ class TestProjectUpdateCredentials(TestJobExecution): ssh_key_data_fifo ) ) - assert passwords.get('scm_username') == 'bob' + assert 'bob' in kwargs.get('expect_passwords').values() return ['successful', 0] private_data = tempfile.mkdtemp(prefix='ansible_tower_') self.task.build_private_data_dir = mock.Mock(return_value=private_data) - self.task.run_pexpect = mock.Mock( - side_effect=partial(run_pexpect_side_effect, private_data) - ) + self.run_pexpect.side_effect = partial(run_pexpect_side_effect, private_data) self.task.run(self.pk) @@ -944,7 +1046,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['AWS_ACCESS_KEY_ID'] == 'bob' assert env['AWS_SECRET_ACCESS_KEY'] == 'secret' @@ -955,7 +1057,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): assert 'ec2' in config.sections() return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_vmware_source(self): @@ -971,7 +1073,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args config = ConfigParser.ConfigParser() config.read(env['VMWARE_INI_PATH']) @@ -980,7 +1082,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): assert config.get('vmware', 'server') == 'https://example.org' return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_azure_source(self): @@ -999,13 +1101,13 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['AZURE_SUBSCRIPTION_ID'] == 'bob' ssh_key_data = env['AZURE_CERT_PATH'] assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_gce_source(self): @@ -1025,14 +1127,14 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args assert env['GCE_EMAIL'] == 'bob' assert env['GCE_PROJECT'] == 'some-project' ssh_key_data = env['GCE_PEM_FILE_PATH'] assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_openstack_source(self): @@ -1053,7 +1155,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read() assert '\n'.join([ 'clouds:', @@ -1067,7 +1169,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ]) in shade_config return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_satellite6_source(self): @@ -1087,7 +1189,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args config = ConfigParser.ConfigParser() config.read(env['FOREMAN_INI_PATH']) assert config.get('foreman', 'url') == 'https://example.org' @@ -1095,7 +1197,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): assert config.get('foreman', 'password') == 'secret' return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) def test_cloudforms_source(self): @@ -1115,7 +1217,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): ) def run_pexpect_side_effect(*args, **kwargs): - job, args, cwd, env, passwords, stdout = args + args, cwd, env, stdout = args config = ConfigParser.ConfigParser() config.read(env['CLOUDFORMS_INI_PATH']) assert config.get('cloudforms', 'url') == 'https://example.org' @@ -1124,7 +1226,7 @@ class TestInventoryUpdateCredentials(TestJobExecution): assert config.get('cloudforms', 'ssl_verify') == 'false' return ['successful', 0] - self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.run_pexpect.side_effect = run_pexpect_side_effect self.task.run(self.pk) diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py new file mode 100644 index 0000000000..d0b2b890aa --- /dev/null +++ b/awx/main/tests/unit/utils/test_event_filter.py @@ -0,0 +1,112 @@ +import cStringIO +import pytest +import base64 +import json + +from awx.main.utils import OutputEventFilter + +MAX_WIDTH = 78 +EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd' + + +def write_encoded_event_data(fileobj, data): + b64data = base64.b64encode(json.dumps(data)) + # pattern corresponding to OutputEventFilter expectation + fileobj.write(u'\x1b[K') + for offset in xrange(0, len(b64data), MAX_WIDTH): + chunk = b64data[offset:offset + MAX_WIDTH] + escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk)) + fileobj.write(escaped_chunk) + fileobj.write(u'\x1b[K') + + +@pytest.fixture +def fake_callback(): + return [] + + +@pytest.fixture +def fake_cache(): + return {} + + +@pytest.fixture +def wrapped_handle(job_event_callback): + # Preliminary creation of resources usually done in tasks.py + stdout_handle = cStringIO.StringIO() + return OutputEventFilter(stdout_handle, job_event_callback) + + +@pytest.fixture +def job_event_callback(fake_callback, fake_cache): + def method(event_data): + if 'uuid' in event_data: + cache_event = fake_cache.get(':1:ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) + fake_callback.append(event_data) + return method + + +def test_event_recomb(fake_callback, fake_cache, wrapped_handle): + # Pretend that this is done by the Ansible callback module + fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} + write_encoded_event_data(wrapped_handle, { + 'uuid': EXAMPLE_UUID + }) + wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') + wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') + write_encoded_event_data(wrapped_handle, {}) + # stop pretending + + assert len(fake_callback) == 1 + recomb_data = fake_callback[0] + assert 'event' in recomb_data + assert recomb_data['event'] == 'foo' + + +def test_separate_verbose_events(fake_callback, wrapped_handle): + # Pretend that this is done by the Ansible callback module + wrapped_handle.write('Using /etc/ansible/ansible.cfg as config file\n') + wrapped_handle.write('SSH password: \n') + write_encoded_event_data(wrapped_handle, { # associated with _next_ event + 'uuid': EXAMPLE_UUID + }) + # stop pretending + + assert len(fake_callback) == 2 + for event_data in fake_callback: + assert 'event' in event_data + assert event_data['event'] == 'verbose' + + +def test_verbose_event_no_markings(fake_callback, wrapped_handle): + ''' + This occurs with jobs that do not have events but still generate + and output stream, like system jobs + ''' + wrapped_handle.write('Running tower-manage command \n') + assert wrapped_handle._fileobj.getvalue() == 'Running tower-manage command \n' + + +def test_large_data_payload(fake_callback, fake_cache, wrapped_handle): + # Pretend that this is done by the Ansible callback module + fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} + event_data_to_encode = { + 'uuid': EXAMPLE_UUID, + 'host': 'localhost', + 'role': 'some_path_to_role' + } + assert len(json.dumps(event_data_to_encode)) > MAX_WIDTH + write_encoded_event_data(wrapped_handle, event_data_to_encode) + wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') + wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') + write_encoded_event_data(wrapped_handle, {}) + # stop pretending + + assert len(fake_callback) == 1 + recomb_data = fake_callback[0] + assert 'role' in recomb_data + assert recomb_data['role'] == 'some_path_to_role' + assert 'event' in recomb_data + assert recomb_data['event'] == 'foo' diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 83e27558cb..b78f5b4ba5 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -609,17 +609,19 @@ def build_proot_temp_dir(): def wrap_args_with_proot(args, cwd, **kwargs): ''' Wrap existing command line with proot to restrict access to: + - /tmp (except for own tmp files) + For non-isolated nodes: - /etc/tower (to prevent obtaining db info or secret key) - /var/lib/awx (except for current project) - /var/log/tower - /var/log/supervisor - - /tmp (except for own tmp files) ''' from django.conf import settings new_args = [getattr(settings, 'AWX_PROOT_CMD', 'bwrap'), '--unshare-pid', '--dev-bind', '/', '/'] - hide_paths = ['/etc/tower', '/var/lib/awx', '/var/log', - tempfile.gettempdir(), settings.PROJECTS_ROOT, - settings.JOBOUTPUT_ROOT] + hide_paths = [tempfile.gettempdir()] + if not kwargs.get('isolated'): + hide_paths.extend(['/etc/tower', '/var/lib/awx', '/var/log', + settings.PROJECTS_ROOT, settings.JOBOUTPUT_ROOT]) hide_paths.extend(getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or []) for path in sorted(set(hide_paths)): if not os.path.exists(path): @@ -633,13 +635,15 @@ def wrap_args_with_proot(args, cwd, **kwargs): os.close(handle) os.chmod(new_path, stat.S_IRUSR | stat.S_IWUSR) new_args.extend(['--bind', '%s' %(new_path,), '%s' % (path,)]) - if 'private_data_dir' in kwargs: + if kwargs.get('isolated'): + show_paths = [kwargs['private_data_dir']] + elif 'private_data_dir' in kwargs: show_paths = [cwd, kwargs['private_data_dir']] else: show_paths = [cwd] if settings.ANSIBLE_USE_VENV: show_paths.append(settings.ANSIBLE_VENV_PATH) - if settings.TOWER_USE_VENV: + if settings.TOWER_USE_VENV and not kwargs.get('isolated'): show_paths.append(settings.TOWER_VENV_PATH) show_paths.extend(getattr(settings, 'AWX_PROOT_SHOW_PATHS', None) or []) for path in sorted(set(show_paths)): @@ -647,7 +651,15 @@ def wrap_args_with_proot(args, cwd, **kwargs): continue path = os.path.realpath(path) new_args.extend(['--bind', '%s' % (path,), '%s' % (path,)]) - new_args.extend(['--chdir', cwd]) + if kwargs.get('isolated'): + if 'ansible-playbook' in args: + # playbook runs should cwd to the SCM checkout dir + new_args.extend(['--chdir', os.path.join(kwargs['private_data_dir'], 'project')]) + else: + # ad-hoc runs should cwd to the root of the private data dir + new_args.extend(['--chdir', kwargs['private_data_dir']]) + else: + new_args.extend(['--chdir', cwd]) new_args.extend(args) return new_args diff --git a/awx/playbooks/check_isolated.yml b/awx/playbooks/check_isolated.yml new file mode 100644 index 0000000000..f928da3572 --- /dev/null +++ b/awx/playbooks/check_isolated.yml @@ -0,0 +1,28 @@ +--- + +# The following variables will be set by the runner of this playbook: +# src: /tmp/some/path/private_data_dir/ +# job_id: + +- hosts: all + gather_facts: false + + tasks: + + - name: copy artifacts from the isolated host + synchronize: + src: "{{src}}/artifacts/" + dest: "{{src}}/artifacts/" + mode: pull + recursive: yes + + - name: check to see if the job is inactive + command: "systemctl is-active playbook@{{job_id}}.service" + register: is_active + failed_when: "is_active.rc == 0" + + - shell: journalctl -u playbook@{{job_id}}.service --no-pager + register: systemctl_logs + ignore_errors: True + + - local_action: copy content={{ systemctl_logs.stdout }} dest={{src}}/artifacts/systemctl_logs mode=0600 diff --git a/awx/playbooks/clean_isolated.yml b/awx/playbooks/clean_isolated.yml new file mode 100644 index 0000000000..d0df3775f3 --- /dev/null +++ b/awx/playbooks/clean_isolated.yml @@ -0,0 +1,18 @@ +--- + +# The following variables will be set by the runner of this playbook: +# private_dirs: ['/tmp/path/private_data_dir/', '/tmp//path/proot_temp_dir/'] +# job_id: + +- hosts: all + gather_facts: false + + tasks: + + - name: cancel the job + command: "systemctl stop playbook@{{job_id}}.service" + ignore_errors: yes + + - name: remove build artifacts + file: path="{{item}}" state=absent + with_items: "{{private_dirs}}" diff --git a/awx/playbooks/library/mkfifo.py b/awx/playbooks/library/mkfifo.py new file mode 100755 index 0000000000..3d5997d6c8 --- /dev/null +++ b/awx/playbooks/library/mkfifo.py @@ -0,0 +1,24 @@ +import os +import stat + +from ansible.module_utils.basic import AnsibleModule + + +def main(): + module = AnsibleModule( + argument_spec={ + 'path': {'required': True, 'type': 'str'}, + 'content': {'required': True, 'type': 'str'} + }, + supports_check_mode=False + ) + + path = module.params['path'] + os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) + with open(path, 'w') as fifo: + fifo.write(module.params['content']) + module.exit_json(dest=path, changed=True) + + +if __name__ == '__main__': + main() diff --git a/awx/playbooks/run_isolated.yml b/awx/playbooks/run_isolated.yml new file mode 100644 index 0000000000..5928edbc31 --- /dev/null +++ b/awx/playbooks/run_isolated.yml @@ -0,0 +1,45 @@ +--- + +# The following variables will be set by the runner of this playbook: +# src: /tmp/some/path/private_data_dir +# dest: /tmp/some/path/ +# proot_temp_dir: /tmp/some/path +# job_id: + +- hosts: all + gather_facts: false + vars_prompt: + - prompt: "Secret" + name: "secret" + + tasks: + + - name: create a proot/bwrap temp dir (if necessary) + synchronize: + src: "{{proot_temp_dir}}" + dest: "/tmp" + when: proot_temp_dir is defined + + - name: synchronize job environment with isolated host + synchronize: + copy_links: true + src: "{{src}}" + dest: "{{dest}}" + + - name: create a directory for running jobs + file: path=/tmp/ansible_tower/jobs state=directory mode=0700 + + - name: create symlink keyed by job ID + file: src="{{src}}" dest="/tmp/ansible_tower/jobs/{{job_id}}" state=link + + - name: create a named pipe for secret environment data + command: "mkfifo /tmp/ansible_tower/jobs/{{job_id}}/env" + + - name: spawn the playbook + command: "systemctl start playbook@{{job_id}}.service" + + - name: write the secret environment data + mkfifo: + content: "{{secret}}" + path: "/tmp/ansible_tower/jobs/{{job_id}}/env" + no_log: True diff --git a/awx/plugins/inventory/awxrest.py b/awx/plugins/inventory/awxrest.py index 1f3630b6a6..9121e2e4e2 100755 --- a/awx/plugins/inventory/awxrest.py +++ b/awx/plugins/inventory/awxrest.py @@ -65,7 +65,7 @@ class InventoryScript(object): def __init__(self, **options): self.options = options - def get_data(self): + def get_data(self, output): parts = urlparse.urlsplit(self.base_url) if parts.username and parts.password: auth = (parts.username, parts.password) @@ -89,10 +89,10 @@ class InventoryScript(object): url = urlparse.urljoin(url, url_path) response = requests.get(url, auth=auth) response.raise_for_status() - sys.stdout.write(json.dumps(json.loads(response.content), + output.write(json.dumps(json.loads(response.content), indent=self.indent) + '\n') - def run(self): + def run(self, output=sys.stdout): try: self.base_url = self.options.get('base_url', '') or \ os.getenv('REST_API_URL', '') @@ -123,11 +123,11 @@ class InventoryScript(object): if self.list_ and self.hostname: raise RuntimeError('Only --list or --host may be specified') elif self.list_ or self.hostname: - self.get_data() + self.get_data(output) else: raise RuntimeError('Either --list or --host must be specified') except Exception, e: - sys.stdout.write('%s\n' % json.dumps(dict(failed=True))) + output.write('%s\n' % json.dumps(dict(failed=True))) if self.options.get('traceback', False): sys.stderr.write(traceback.format_exc()) else: @@ -137,7 +137,7 @@ class InventoryScript(object): sys.stderr.write('%s\n' % e.response.content) else: sys.stderr.write('%s\n' % e.response) - sys.exit(1) + raise def main(): parser = optparse.OptionParser() @@ -173,7 +173,11 @@ def main(): parser.add_option('--indent', dest='indent', type='int', default=None, help='Indentation level for pretty printing output') options, args = parser.parse_args() - InventoryScript(**vars(options)).run() + try: + InventoryScript(**vars(options)).run() + except Exception: + sys.exit(1) + if __name__ == '__main__': main() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2df269ca33..1297bbcd9f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -604,6 +604,12 @@ AWX_ANSIBLE_CALLBACK_PLUGINS = "" # Time at which an HA node is considered active AWX_ACTIVE_NODE_TIME = 7200 +# The number of seconds to sleep between status checks for jobs running on isolated nodes +AWX_ISOLATED_CHECK_INTERVAL = 5 + +# The timeout (in seconds) for launching jobs on isolated nodes +AWX_ISOLATED_LAUNCH_TIMEOUT = 600 + # Enable Pendo on the UI, possible values are 'off', 'anonymous', and 'detailed' # Note: This setting may be overridden by database settings. PENDO_TRACKING_STATE = "off" diff --git a/awx/settings/development.py b/awx/settings/development.py index 0a0bc748f2..dbfcfa2c9e 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -46,6 +46,8 @@ CALLBACK_QUEUE = "callback_tasks" # Note: This setting may be overridden by database settings. AWX_PROOT_ENABLED = True +AWX_ISOLATED_USERNAME = 'root' + # Disable Pendo on the UI for development/test. # Note: This setting may be overridden by database settings. PENDO_TRACKING_STATE = "off" diff --git a/awx/settings/production.py b/awx/settings/production.py index 19afcab9c9..ad668784d2 100644 --- a/awx/settings/production.py +++ b/awx/settings/production.py @@ -50,6 +50,8 @@ ANSIBLE_VENV_PATH = "/var/lib/awx/venv/ansible" TOWER_USE_VENV = True TOWER_VENV_PATH = "/var/lib/awx/venv/tower" +AWX_ISOLATED_USERNAME = 'awx' + LOGGING['handlers']['tower_warnings']['filename'] = '/var/log/tower/tower.log' LOGGING['handlers']['callback_receiver']['filename'] = '/var/log/tower/callback_receiver.log' LOGGING['handlers']['task_system']['filename'] = '/var/log/tower/task_system.log' diff --git a/requirements/requirements_isolated.txt b/requirements/requirements_isolated.txt new file mode 100644 index 0000000000..13209d2fd8 --- /dev/null +++ b/requirements/requirements_isolated.txt @@ -0,0 +1,2 @@ +psutil==5.2.2 +pexpect==4.2.1 \ No newline at end of file diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index d0b5416abc..c7c209355c 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -10,6 +10,7 @@ services: RABBITMQ_PASS: guest RABBITMQ_VHOST: / CELERY_RDB_HOST: 0.0.0.0 + DOCKER_TOOLS_DIR: tools/docker-compose ports: - "8080:8080" - "5555:5555" diff --git a/Procfile b/tools/docker-compose/Procfile similarity index 100% rename from Procfile rename to tools/docker-compose/Procfile diff --git a/tools/docker-compose/start_development.sh b/tools/docker-compose/start_development.sh index 966f11c9ba..3246164806 100755 --- a/tools/docker-compose/start_development.sh +++ b/tools/docker-compose/start_development.sh @@ -31,7 +31,7 @@ yes | cp -rf /tower_devel/tools/docker-compose/supervisor.conf /supervisor.conf # Tower bootstrapping make version_file make migrate -make init +make init DOCKER_TOOLS_DIR=${DOCKER_TOOLS_DIR} mkdir -p /tower_devel/awx/public/static mkdir -p /tower_devel/awx/ui/static @@ -41,5 +41,5 @@ mkdir -p /tower_devel/awx/ui/static if [ -f "/tower_devel/tools/docker-compose/use_dev_supervisor.txt" ]; then make supervisor else - make honcho + honcho start -f "${DOCKER_TOOLS_DIR}/Procfile" fi diff --git a/tools/docker-isolated-override.yml b/tools/docker-isolated-override.yml new file mode 100644 index 0000000000..dab80afcde --- /dev/null +++ b/tools/docker-isolated-override.yml @@ -0,0 +1,17 @@ +version: '3' +services: + # Primary Tower Development Container link + tower: + environment: + DOCKER_TOOLS_DIR: tools/docker-isolated + links: + - isolated + # Isolated Rampart Container + isolated: + image: gcr.io/ansible-tower-engineering/tower_isolated:${TAG} + hostname: isolated + volumes: + - "../awx/main/isolated:/tower_isolated" + - "../awx/lib:/tower_lib" + - "/sys/fs/cgroup:/sys/fs/cgroup:ro" + privileged: true diff --git a/tools/docker-isolated/Dockerfile b/tools/docker-isolated/Dockerfile new file mode 100644 index 0000000000..f130bb4e3e --- /dev/null +++ b/tools/docker-isolated/Dockerfile @@ -0,0 +1,31 @@ +FROM centos/systemd +RUN yum clean all + +ADD Makefile /tmp/Makefile +RUN mkdir /tmp/requirements +ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/ +RUN yum -y update && yum -y install curl epel-release +RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - +RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ +RUN pip install virtualenv +WORKDIR /tmp +RUN make requirements_ansible +RUN make requirements_isolated +RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8 +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US:en +ENV LC_ALL en_US.UTF-8 +WORKDIR / +EXPOSE 22 +ADD tools/docker-isolated/playbook@.service /lib/systemd/system/playbook@.service + +RUN rm -f /etc/ssh/ssh_host_ecdsa_key /etc/ssh/ssh_host_rsa_key +RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_ecdsa_key +RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key +RUN sed -i "s/#UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config +RUN sed -i "s/UsePAM.*/UsePAM yes/g" /etc/ssh/sshd_config +RUN ssh-keygen -A +RUN mkdir -p /root/.ssh +RUN touch /root/.ssh/authorized_keys + +CMD ["/usr/sbin/init"] diff --git a/tools/docker-isolated/Procfile b/tools/docker-isolated/Procfile new file mode 100644 index 0000000000..35d7db873c --- /dev/null +++ b/tools/docker-isolated/Procfile @@ -0,0 +1,8 @@ +nginx: make nginx +runworker: make runworker +daphne: make daphne +celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon +receiver: make receiver +factcacher: make factcacher +flower: make flower +uwsgi: make uwsgi diff --git a/tools/docker-isolated/README.md b/tools/docker-isolated/README.md new file mode 100644 index 0000000000..af5f47a437 --- /dev/null +++ b/tools/docker-isolated/README.md @@ -0,0 +1,71 @@ +## Instructions on using an isolated node + +The building of the isolated node is done in the `make docker-compose-build` +target. Its image uses a different tag from the tools_tower container. + +Given that the images are built, you can run the combined docker compose target. This uses +the base `docker-compose.yml` with modifications found in `docker-isolated-override.yml`. +You will still need to give COMPOSE_TAG with whatever your intended +base branch is. For example: + +```bash +make docker-isolated COMPOSE_TAG=devel +``` + +This will automatically exchange the keys in order for the `tools_tower_1` +container to access the `tools_isolated_1` container over ssh. +After that, it will bring up all the containers like the normal docker-compose +workflow. + +### Running a job on the Isolated Node + +Create a job template that runs normally. Add the id of the instance +group named `thepentagon` to the JT's instance groups. To do this, POST +the id (probably id=2) to `/api/v2/job_templates/N/instance_groups/`. +After that, run the job template. + +The models are automatically created when running the Makefile target, +and they are structured as follows: + + +-------+ +-------------+ + | tower |<----+ thepentagon | + +-------+ +-------------+ + ^ ^ + | | + | | + +---+---+ +-----+----+ + | tower | | isolated | + +-------+ +----------+ + +The `controller` for the group "thepentagon" and all hosts therein is +determined by a ForeignKey within the instance group. + +## Development Testing Notes + +### Test the SSH connection between containers + +While the environment is running, you can test the connection like so: + +```bash +docker exec -i -t tools_tower_1 /bin/bash +``` + +Inside the context of that container: + +```bash +ssh root@isolated +``` + +(note: awx user has been deprecated) + +This should give a shell to the `tools_isolated_1` container, as the +`tools_tower_1` container sees it. + +### Start the playbook service + +The following command would run the playbook for job 57. + +```bash +systemctl start playbook@57.service +``` + diff --git a/tools/docker-isolated/playbook@.service b/tools/docker-isolated/playbook@.service new file mode 100644 index 0000000000..fa84635c8c --- /dev/null +++ b/tools/docker-isolated/playbook@.service @@ -0,0 +1,7 @@ +[Unit] +Description=Run of Ansible Tower job %I + +[Service] +ExecStart=/venv/tower_isolated/bin/python /tower_isolated/run.py %I +Restart=no +Environment=TOWER_LIB_DIRECTORY=/tower_lib \ No newline at end of file