From 602ef9750f93fdb02e7e88633007a5a735e115a5 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 1 Mar 2019 13:08:26 -0500 Subject: [PATCH] update isolated task execution for ansible-runner --- Makefile | 8 - awx/lib/awx_display_callback/__init__.py | 25 - awx/lib/awx_display_callback/cleanup.py | 85 --- awx/lib/awx_display_callback/display.py | 98 ---- awx/lib/awx_display_callback/events.py | 186 ------ awx/lib/awx_display_callback/minimal.py | 29 - awx/lib/awx_display_callback/module.py | 535 ------------------ awx/main/expect/isolated_manager.py | 222 ++------ .../commands/test_isolated_connection.py | 2 +- awx/main/tasks.py | 259 ++++----- awx/playbooks/check_isolated.yml | 3 +- awx/playbooks/clean_isolated.yml | 2 +- awx/playbooks/run_isolated.yml | 20 +- awx/plugins/isolated/awx_capacity.py | 2 +- awx/plugins/isolated/awx_isolated_cleanup.py | 2 +- docs/custom_virtualenvs.md | 3 - setup.py | 36 +- tools/docker-isolated/Dockerfile | 10 +- tools/docker-isolated/README.md | 4 +- tools/docker-isolated/awx-expect | 3 - 20 files changed, 190 insertions(+), 1344 deletions(-) delete mode 100644 awx/lib/awx_display_callback/__init__.py delete mode 100644 awx/lib/awx_display_callback/cleanup.py delete mode 100644 awx/lib/awx_display_callback/display.py delete mode 100644 awx/lib/awx_display_callback/events.py delete mode 100644 awx/lib/awx_display_callback/minimal.py delete mode 100644 awx/lib/awx_display_callback/module.py delete mode 100755 tools/docker-isolated/awx-expect diff --git a/Makefile b/Makefile index 4c7dd3770b..7578b5aa21 100644 --- a/Makefile +++ b/Makefile @@ -168,13 +168,6 @@ requirements_ansible_dev: $(VENV_BASE)/ansible/bin/pip install pytest mock; \ fi -requirements_isolated: - if [ ! -d "$(VENV_BASE)/awx" ]; then \ - $(PYTHON) -m venv $(VENV_BASE)/awx; \ - fi; - echo "include-system-site-packages = true" >> $(VENV_BASE)/awx/lib/python$(PYTHON_VERSION)/pyvenv.cfg - $(VENV_BASE)/awx/bin/pip install -r requirements/requirements_isolated.txt - # Install third-party requirements needed for AWX's environment. requirements_awx: virtualenv_awx if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \ @@ -570,7 +563,6 @@ docker-isolated: TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create docker start tools_awx_1 docker start tools_isolated_1 - echo "__version__ = '`git describe --long | cut -d - -f 1-1`'" | docker exec -i tools_isolated_1 /bin/bash -c "cat > /venv/awx/lib/python$(PYTHON_VERSION)/site-packages/awx.py" CURRENT_UID=$(shell id -u) TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up # Docker Compose Development environment diff --git a/awx/lib/awx_display_callback/__init__.py b/awx/lib/awx_display_callback/__init__.py deleted file mode 100644 index b7cbf97b9b..0000000000 --- a/awx/lib/awx_display_callback/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# AWX Display Callback -from . import cleanup # noqa (registers control persistent cleanup) -from . import display # noqa (wraps ansible.display.Display methods) -from .module import AWXDefaultCallbackModule, AWXMinimalCallbackModule - -__all__ = ['AWXDefaultCallbackModule', 'AWXMinimalCallbackModule'] diff --git a/awx/lib/awx_display_callback/cleanup.py b/awx/lib/awx_display_callback/cleanup.py deleted file mode 100644 index 497401feea..0000000000 --- a/awx/lib/awx_display_callback/cleanup.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# Python -import atexit -import glob -import os -import pwd - -# PSUtil -try: - import psutil -except ImportError: - raise ImportError('psutil is missing; {}bin/pip install psutil'.format( - os.environ['VIRTUAL_ENV'] - )) - -__all__ = [] - -main_pid = os.getpid() - - -@atexit.register -def terminate_ssh_control_masters(): - # Only run this cleanup from the main process. - if os.getpid() != main_pid: - return - # Determine if control persist is being used and if any open sockets - # exist after running the playbook. - cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '') - if not cp_path: - return - cp_dir = os.path.dirname(cp_path) - if not os.path.exists(cp_dir): - return - cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*') - cp_files = glob.glob(cp_pattern) - if not cp_files: - return - - # Attempt to find any running control master processes. - username = pwd.getpwuid(os.getuid())[0] - ssh_cm_procs = [] - for proc in psutil.process_iter(): - try: - pname = proc.name() - pcmdline = proc.cmdline() - pusername = proc.username() - except psutil.NoSuchProcess: - continue - if pusername != username: - continue - if pname != 'ssh': - continue - for cp_file in cp_files: - if pcmdline and cp_file in pcmdline[0]: - ssh_cm_procs.append(proc) - break - - # Terminate then kill control master processes. Workaround older - # version of psutil that may not have wait_procs implemented. - for proc in ssh_cm_procs: - try: - proc.terminate() - except psutil.NoSuchProcess: - continue - procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5) - for proc in procs_alive: - proc.kill() diff --git a/awx/lib/awx_display_callback/display.py b/awx/lib/awx_display_callback/display.py deleted file mode 100644 index ad5e8ba37a..0000000000 --- a/awx/lib/awx_display_callback/display.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# Python -import functools -import sys -import uuid - -# Ansible -from ansible.utils.display import Display - -# Tower Display Callback -from .events import event_context - -__all__ = [] - - -def with_context(**context): - global event_context - - def wrap(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - with event_context.set_local(**context): - return f(*args, **kwargs) - return wrapper - return wrap - - -for attr in dir(Display): - if attr.startswith('_') or 'cow' in attr or 'prompt' in attr: - continue - if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'): - continue - if not callable(getattr(Display, attr)): - continue - setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr))) - - -def with_verbosity(f): - global event_context - - @functools.wraps(f) - def wrapper(*args, **kwargs): - host = args[2] if len(args) >= 3 else kwargs.get('host', None) - caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2) - context = dict(verbose=True, verbosity=(caplevel + 1)) - if host is not None: - context['remote_addr'] = host - with event_context.set_local(**context): - return f(*args, **kwargs) - return wrapper - - -Display.verbose = with_verbosity(Display.verbose) - - -def display_with_context(f): - - @functools.wraps(f) - def wrapper(*args, **kwargs): - log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False) - stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False) - event_uuid = event_context.get().get('uuid', None) - with event_context.display_lock: - # If writing only to a log file or there is already an event UUID - # set (from a callback module method), skip dumping the event data. - if log_only or event_uuid: - return f(*args, **kwargs) - try: - fileobj = sys.stderr if stderr else sys.stdout - event_context.add_local(uuid=str(uuid.uuid4())) - event_context.dump_begin(fileobj) - return f(*args, **kwargs) - finally: - event_context.dump_end(fileobj) - event_context.remove_local(uuid=None) - - return wrapper - - -Display.display = display_with_context(Display.display) diff --git a/awx/lib/awx_display_callback/events.py b/awx/lib/awx_display_callback/events.py deleted file mode 100644 index 178da75a97..0000000000 --- a/awx/lib/awx_display_callback/events.py +++ /dev/null @@ -1,186 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# Python -import base64 -import contextlib -import datetime -import json -import multiprocessing -import os -import stat -import threading -import uuid - -try: - import memcache -except ImportError: - raise ImportError('python-memcached is missing; {}bin/pip install python-memcached'.format( - os.environ['VIRTUAL_ENV'] - )) - -__all__ = ['event_context'] - - -class IsolatedFileWrite: - ''' - Stand-in class that will write partial event data to a file as a - replacement for memcache when a job is running on an isolated host. - ''' - - def __init__(self): - self.private_data_dir = os.getenv('AWX_ISOLATED_DATA_DIR') - - def set(self, key, value): - # Strip off the leading memcache key identifying characters :1:ev- - event_uuid = key[len(':1:ev-'):] - # Write data in a staging area and then atomic move to pickup directory - filename = '{}-partial.json'.format(event_uuid) - dropoff_location = os.path.join(self.private_data_dir, 'artifacts', 'job_events', filename) - write_location = '.'.join([dropoff_location, 'tmp']) - with os.fdopen(os.open(write_location, os.O_WRONLY | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR), 'w') as f: - f.write(value) - os.rename(write_location, dropoff_location) - - -class EventContext(object): - ''' - Store global and local (per thread/process) data associated with callback - events and other display output methods. - ''' - - def __init__(self): - self.display_lock = multiprocessing.RLock() - cache_actual = os.getenv('CACHE', '127.0.0.1:11211') - if os.getenv('AWX_ISOLATED_DATA_DIR', False): - self.cache = IsolatedFileWrite() - else: - self.cache = memcache.Client([cache_actual], debug=0) - - def add_local(self, **kwargs): - if not hasattr(self, '_local'): - self._local = threading.local() - self._local._ctx = {} - self._local._ctx.update(kwargs) - - def remove_local(self, **kwargs): - if hasattr(self, '_local'): - for key in kwargs.keys(): - self._local._ctx.pop(key, None) - - @contextlib.contextmanager - def set_local(self, **kwargs): - try: - self.add_local(**kwargs) - yield - finally: - self.remove_local(**kwargs) - - def get_local(self): - return getattr(getattr(self, '_local', None), '_ctx', {}) - - def add_global(self, **kwargs): - if not hasattr(self, '_global_ctx'): - self._global_ctx = {} - self._global_ctx.update(kwargs) - - def remove_global(self, **kwargs): - if hasattr(self, '_global_ctx'): - for key in kwargs.keys(): - self._global_ctx.pop(key, None) - - @contextlib.contextmanager - def set_global(self, **kwargs): - try: - self.add_global(**kwargs) - yield - finally: - self.remove_global(**kwargs) - - def get_global(self): - return getattr(self, '_global_ctx', {}) - - def get(self): - ctx = {} - ctx.update(self.get_global()) - ctx.update(self.get_local()) - return ctx - - def get_begin_dict(self): - event_data = self.get() - if os.getenv('JOB_ID', ''): - event_data['job_id'] = int(os.getenv('JOB_ID', '0')) - if os.getenv('AD_HOC_COMMAND_ID', ''): - event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0')) - if os.getenv('PROJECT_UPDATE_ID', ''): - event_data['project_update_id'] = int(os.getenv('PROJECT_UPDATE_ID', '0')) - event_data.setdefault('pid', os.getpid()) - event_data.setdefault('uuid', str(uuid.uuid4())) - event_data.setdefault('created', datetime.datetime.utcnow().isoformat()) - if not event_data.get('parent_uuid', None) and event_data.get('job_id', None): - for key in ('task_uuid', 'play_uuid', 'playbook_uuid'): - parent_uuid = event_data.get(key, None) - if parent_uuid and parent_uuid != event_data.get('uuid', None): - event_data['parent_uuid'] = parent_uuid - break - - event = event_data.pop('event', None) - if not event: - event = 'verbose' - for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'): - if event_data.get(key, False): - event = key - break - max_res = int(os.getenv("MAX_EVENT_RES", 700000)) - if event not in ('playbook_on_stats',) and "res" in event_data and len(str(event_data['res'])) > max_res: - event_data['res'] = {} - event_dict = dict(event=event, event_data=event_data) - for key in list(event_data.keys()): - if key in ('job_id', 'ad_hoc_command_id', 'project_update_id', 'uuid', 'parent_uuid', 'created',): - event_dict[key] = event_data.pop(key) - elif key in ('verbosity', 'pid'): - event_dict[key] = event_data[key] - return event_dict - - def get_end_dict(self): - return {} - - def dump(self, fileobj, data, max_width=78, flush=False): - b64data = base64.b64encode(json.dumps(data).encode('utf-8')).decode() - with self.display_lock: - # pattern corresponding to OutputEventFilter expectation - fileobj.write(u'\x1b[K') - for offset in range(0, len(b64data), max_width): - chunk = b64data[offset:offset + max_width] - escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk)) - fileobj.write(escaped_chunk) - fileobj.write(u'\x1b[K') - if flush: - fileobj.flush() - - def dump_begin(self, fileobj): - begin_dict = self.get_begin_dict() - self.cache.set(":1:ev-{}".format(begin_dict['uuid']), json.dumps(begin_dict)) - self.dump(fileobj, {'uuid': begin_dict['uuid']}) - - def dump_end(self, fileobj): - self.dump(fileobj, self.get_end_dict(), flush=True) - - -event_context = EventContext() diff --git a/awx/lib/awx_display_callback/minimal.py b/awx/lib/awx_display_callback/minimal.py deleted file mode 100644 index 579feeea24..0000000000 --- a/awx/lib/awx_display_callback/minimal.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# Python -import os - -# Ansible -import ansible - -# Because of the way Ansible loads plugins, it's not possible to import -# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh. -with open(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py')) as in_file: - exec(in_file.read()) diff --git a/awx/lib/awx_display_callback/module.py b/awx/lib/awx_display_callback/module.py deleted file mode 100644 index b113502c6c..0000000000 --- a/awx/lib/awx_display_callback/module.py +++ /dev/null @@ -1,535 +0,0 @@ -# Copyright (c) 2016 Ansible by Red Hat, Inc. -# -# This file is part of Ansible Tower, but depends on code imported from Ansible. -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -from __future__ import (absolute_import, division, print_function) - -# Python -import codecs -import collections -import contextlib -import json -import os -import stat -import sys -import uuid -from copy import copy - -# Ansible -from ansible import constants as C -from ansible.plugins.callback import CallbackBase -from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule - -# AWX Display Callback -from .events import event_context -from .minimal import CallbackModule as MinimalCallbackModule - -CENSORED = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" # noqa - - -class BaseCallbackModule(CallbackBase): - ''' - Callback module for logging ansible/ansible-playbook events. - ''' - - CALLBACK_VERSION = 2.0 - CALLBACK_TYPE = 'stdout' - - # These events should never have an associated play. - EVENTS_WITHOUT_PLAY = [ - 'playbook_on_start', - 'playbook_on_stats', - ] - - # These events should never have an associated task. - EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [ - 'playbook_on_setup', - 'playbook_on_notify', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - ] - - def __init__(self): - super(BaseCallbackModule, self).__init__() - self.task_uuids = set() - self.duplicate_task_counts = collections.defaultdict(lambda: 1) - - self.play_uuids = set() - self.duplicate_play_counts = collections.defaultdict(lambda: 1) - - @contextlib.contextmanager - def capture_event_data(self, event, **event_data): - event_data.setdefault('uuid', str(uuid.uuid4())) - - if event not in self.EVENTS_WITHOUT_TASK: - task = event_data.pop('task', None) - else: - task = None - - if event_data.get('res'): - if event_data['res'].get('_ansible_no_log', False): - event_data['res'] = {'censored': CENSORED} - if event_data['res'].get('results', []): - event_data['res']['results'] = copy(event_data['res']['results']) - for i, item in enumerate(event_data['res'].get('results', [])): - if isinstance(item, dict) and item.get('_ansible_no_log', False): - event_data['res']['results'][i] = {'censored': CENSORED} - - with event_context.display_lock: - try: - event_context.add_local(event=event, **event_data) - if task: - self.set_task(task, local=True) - event_context.dump_begin(sys.stdout) - yield - finally: - event_context.dump_end(sys.stdout) - if task: - self.clear_task(local=True) - event_context.remove_local(event=None, **event_data) - - def set_playbook(self, playbook): - # NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them. - self.playbook_uuid = str(uuid.uuid4()) - file_name = getattr(playbook, '_file_name', '???') - event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid) - self.clear_play() - - def set_play(self, play): - if hasattr(play, 'hosts'): - if isinstance(play.hosts, list): - pattern = ','.join(play.hosts) - else: - pattern = play.hosts - else: - pattern = '' - name = play.get_name().strip() or pattern - event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern) - self.clear_task() - - def clear_play(self): - event_context.remove_global(play=None, play_uuid=None, play_pattern=None) - self.clear_task() - - def set_task(self, task, local=False): - # FIXME: Task is "global" unless using free strategy! - task_ctx = dict( - task=(task.name or task.action), - task_uuid=str(task._uuid), - task_action=task.action, - task_args='', - ) - try: - task_ctx['task_path'] = task.get_path() - except AttributeError: - pass - - if C.DISPLAY_ARGS_TO_STDOUT: - if task.no_log: - task_ctx['task_args'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" - else: - task_args = ', '.join(('%s=%s' % a for a in task.args.items())) - task_ctx['task_args'] = task_args - if getattr(task, '_role', None): - task_role = task._role._role_name - else: - task_role = getattr(task, 'role_name', '') - if task_role: - task_ctx['role'] = task_role - if local: - event_context.add_local(**task_ctx) - else: - event_context.add_global(**task_ctx) - - def clear_task(self, local=False): - task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None) - if local: - event_context.remove_local(**task_ctx) - else: - event_context.remove_global(**task_ctx) - - def v2_playbook_on_start(self, playbook): - self.set_playbook(playbook) - event_data = dict( - uuid=self.playbook_uuid, - ) - with self.capture_event_data('playbook_on_start', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_start(playbook) - - def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None, - encrypt=None, confirm=False, salt_size=None, - salt=None, default=None): - event_data = dict( - varname=varname, - private=private, - prompt=prompt, - encrypt=encrypt, - confirm=confirm, - salt_size=salt_size, - salt=salt, - default=default, - ) - with self.capture_event_data('playbook_on_vars_prompt', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_vars_prompt( - varname, private, prompt, encrypt, confirm, salt_size, salt, - default, - ) - - def v2_playbook_on_include(self, included_file): - event_data = dict( - included_file=included_file._filename if included_file is not None else None, - ) - with self.capture_event_data('playbook_on_include', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_include(included_file) - - def v2_playbook_on_play_start(self, play): - play_uuid = str(play._uuid) - if play_uuid in self.play_uuids: - # When this play UUID repeats, it means the play is using the - # free strategy (or serial:1) so different hosts may be running - # different tasks within a play (where duplicate UUIDS are common). - # - # When this is the case, modify the UUID slightly to append - # a counter so we can still _track_ duplicate events, but also - # avoid breaking the display in these scenarios. - self.duplicate_play_counts[play_uuid] += 1 - - play_uuid = '_'.join([ - play_uuid, - str(self.duplicate_play_counts[play_uuid]) - ]) - self.play_uuids.add(play_uuid) - play._uuid = play_uuid - - self.set_play(play) - if hasattr(play, 'hosts'): - if isinstance(play.hosts, list): - pattern = ','.join(play.hosts) - else: - pattern = play.hosts - else: - pattern = '' - name = play.get_name().strip() or pattern - event_data = dict( - name=name, - pattern=pattern, - uuid=str(play._uuid), - ) - with self.capture_event_data('playbook_on_play_start', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_play_start(play) - - def v2_playbook_on_import_for_host(self, result, imported_file): - # NOTE: Not used by Ansible 2.x. - with self.capture_event_data('playbook_on_import_for_host'): - super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file) - - def v2_playbook_on_not_import_for_host(self, result, missing_file): - # NOTE: Not used by Ansible 2.x. - with self.capture_event_data('playbook_on_not_import_for_host'): - super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file) - - def v2_playbook_on_setup(self): - # NOTE: Not used by Ansible 2.x. - with self.capture_event_data('playbook_on_setup'): - super(BaseCallbackModule, self).v2_playbook_on_setup() - - def v2_playbook_on_task_start(self, task, is_conditional): - # FIXME: Flag task path output as vv. - task_uuid = str(task._uuid) - if task_uuid in self.task_uuids: - # When this task UUID repeats, it means the play is using the - # free strategy (or serial:1) so different hosts may be running - # different tasks within a play (where duplicate UUIDS are common). - # - # When this is the case, modify the UUID slightly to append - # a counter so we can still _track_ duplicate events, but also - # avoid breaking the display in these scenarios. - self.duplicate_task_counts[task_uuid] += 1 - - task_uuid = '_'.join([ - task_uuid, - str(self.duplicate_task_counts[task_uuid]) - ]) - self.task_uuids.add(task_uuid) - self.set_task(task) - event_data = dict( - task=task, - name=task.get_name(), - is_conditional=is_conditional, - uuid=task_uuid, - ) - with self.capture_event_data('playbook_on_task_start', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional) - - def v2_playbook_on_cleanup_task_start(self, task): - # NOTE: Not used by Ansible 2.x. - self.set_task(task) - event_data = dict( - task=task, - name=task.get_name(), - uuid=str(task._uuid), - is_conditional=True, - ) - with self.capture_event_data('playbook_on_task_start', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task) - - def v2_playbook_on_handler_task_start(self, task): - # NOTE: Re-using playbook_on_task_start event for this v2-specific - # event, but setting is_conditional=True, which is how v1 identified a - # task run as a handler. - self.set_task(task) - event_data = dict( - task=task, - name=task.get_name(), - uuid=str(task._uuid), - is_conditional=True, - ) - with self.capture_event_data('playbook_on_task_start', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task) - - def v2_playbook_on_no_hosts_matched(self): - with self.capture_event_data('playbook_on_no_hosts_matched'): - super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched() - - def v2_playbook_on_no_hosts_remaining(self): - with self.capture_event_data('playbook_on_no_hosts_remaining'): - super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining() - - def v2_playbook_on_notify(self, handler, host): - # NOTE: Not used by Ansible < 2.5. - event_data = dict( - host=host.get_name(), - handler=handler.get_name(), - ) - with self.capture_event_data('playbook_on_notify', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_notify(handler, host) - - ''' - ansible_stats is, retoractively, added in 2.2 - ''' - def v2_playbook_on_stats(self, stats): - self.clear_play() - # FIXME: Add count of plays/tasks. - event_data = dict( - changed=stats.changed, - dark=stats.dark, - failures=stats.failures, - ignored=getattr(stats, 'ignored', 0), - ok=stats.ok, - processed=stats.processed, - rescued=getattr(stats, 'rescued', 0), - skipped=stats.skipped - ) - - # write custom set_stat artifact data to the local disk so that it can - # be persisted by awx after the process exits - custom_artifact_data = stats.custom.get('_run', {}) if hasattr(stats, 'custom') else {} - if custom_artifact_data: - # create the directory for custom stats artifacts to live in (if it doesn't exist) - custom_artifacts_dir = os.path.join(os.getenv('AWX_PRIVATE_DATA_DIR'), 'artifacts') - if not os.path.isdir(custom_artifacts_dir): - os.makedirs(custom_artifacts_dir, mode=stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR) - - custom_artifacts_path = os.path.join(custom_artifacts_dir, 'custom') - with codecs.open(custom_artifacts_path, 'w', encoding='utf-8') as f: - os.chmod(custom_artifacts_path, stat.S_IRUSR | stat.S_IWUSR) - json.dump(custom_artifact_data, f) - - with self.capture_event_data('playbook_on_stats', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_stats(stats) - - @staticmethod - def _get_event_loop(task): - if hasattr(task, 'loop_with'): # Ansible >=2.5 - return task.loop_with - elif hasattr(task, 'loop'): # Ansible <2.4 - return task.loop - return None - - def v2_runner_on_ok(self, result): - # FIXME: Display detailed results or not based on verbosity. - - # strip environment vars from the job event; it already exists on the - # job and sensitive values are filtered there - if result._task.action in ('setup', 'gather_facts'): - result._result.get('ansible_facts', {}).pop('ansible_env', None) - - event_data = dict( - host=result._host.get_name(), - remote_addr=result._host.address, - task=result._task, - res=result._result, - event_loop=self._get_event_loop(result._task), - ) - with self.capture_event_data('runner_on_ok', **event_data): - super(BaseCallbackModule, self).v2_runner_on_ok(result) - - def v2_runner_on_failed(self, result, ignore_errors=False): - # FIXME: Add verbosity for exception/results output. - event_data = dict( - host=result._host.get_name(), - remote_addr=result._host.address, - res=result._result, - task=result._task, - ignore_errors=ignore_errors, - event_loop=self._get_event_loop(result._task), - ) - with self.capture_event_data('runner_on_failed', **event_data): - super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors) - - def v2_runner_on_skipped(self, result): - event_data = dict( - host=result._host.get_name(), - remote_addr=result._host.address, - task=result._task, - event_loop=self._get_event_loop(result._task), - ) - with self.capture_event_data('runner_on_skipped', **event_data): - super(BaseCallbackModule, self).v2_runner_on_skipped(result) - - def v2_runner_on_unreachable(self, result): - event_data = dict( - host=result._host.get_name(), - remote_addr=result._host.address, - task=result._task, - res=result._result, - ) - with self.capture_event_data('runner_on_unreachable', **event_data): - super(BaseCallbackModule, self).v2_runner_on_unreachable(result) - - def v2_runner_on_no_hosts(self, task): - # NOTE: Not used by Ansible 2.x. - event_data = dict( - task=task, - ) - with self.capture_event_data('runner_on_no_hosts', **event_data): - super(BaseCallbackModule, self).v2_runner_on_no_hosts(task) - - def v2_runner_on_async_poll(self, result): - # NOTE: Not used by Ansible 2.x. - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - jid=result._result.get('ansible_job_id'), - ) - with self.capture_event_data('runner_on_async_poll', **event_data): - super(BaseCallbackModule, self).v2_runner_on_async_poll(result) - - def v2_runner_on_async_ok(self, result): - # NOTE: Not used by Ansible 2.x. - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - jid=result._result.get('ansible_job_id'), - ) - with self.capture_event_data('runner_on_async_ok', **event_data): - super(BaseCallbackModule, self).v2_runner_on_async_ok(result) - - def v2_runner_on_async_failed(self, result): - # NOTE: Not used by Ansible 2.x. - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - jid=result._result.get('ansible_job_id'), - ) - with self.capture_event_data('runner_on_async_failed', **event_data): - super(BaseCallbackModule, self).v2_runner_on_async_failed(result) - - def v2_runner_on_file_diff(self, result, diff): - # NOTE: Not used by Ansible 2.x. - event_data = dict( - host=result._host.get_name(), - task=result._task, - diff=diff, - ) - with self.capture_event_data('runner_on_file_diff', **event_data): - super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff) - - def v2_on_file_diff(self, result): - # NOTE: Logged as runner_on_file_diff. - event_data = dict( - host=result._host.get_name(), - task=result._task, - diff=result._result.get('diff'), - ) - with self.capture_event_data('runner_on_file_diff', **event_data): - super(BaseCallbackModule, self).v2_on_file_diff(result) - - def v2_runner_item_on_ok(self, result): - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - ) - with self.capture_event_data('runner_item_on_ok', **event_data): - super(BaseCallbackModule, self).v2_runner_item_on_ok(result) - - def v2_runner_item_on_failed(self, result): - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - ) - with self.capture_event_data('runner_item_on_failed', **event_data): - super(BaseCallbackModule, self).v2_runner_item_on_failed(result) - - def v2_runner_item_on_skipped(self, result): - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - ) - with self.capture_event_data('runner_item_on_skipped', **event_data): - super(BaseCallbackModule, self).v2_runner_item_on_skipped(result) - - def v2_runner_retry(self, result): - event_data = dict( - host=result._host.get_name(), - task=result._task, - res=result._result, - ) - with self.capture_event_data('runner_retry', **event_data): - super(BaseCallbackModule, self).v2_runner_retry(result) - - def v2_runner_on_start(self, host, task): - event_data = dict( - host=host.get_name(), - task=task - ) - with self.capture_event_data('runner_on_start', **event_data): - super(BaseCallbackModule, self).v2_runner_on_start(host, task) - - - -class AWXDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule): - - CALLBACK_NAME = 'awx_display' - - -class AWXMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule): - - CALLBACK_NAME = 'minimal' - - def v2_playbook_on_play_start(self, play): - pass - - def v2_playbook_on_task_start(self, task, is_conditional): - self.set_task(task) diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 42a5c8a29c..e75cd2cf79 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -6,8 +6,8 @@ import shutil import stat import tempfile import time +import uuid import logging -from distutils.version import LooseVersion as Version from io import StringIO from django.conf import settings @@ -24,23 +24,12 @@ playbook_logger = logging.getLogger('awx.isolated.manager.playbooks') class IsolatedManager(object): - def __init__(self, args, cwd, env, stdout_handle, ssh_key_path, - expect_passwords={}, cancelled_callback=None, job_timeout=0, + def __init__(self, env, cancelled_callback=None, job_timeout=0, idle_timeout=None, extra_update_fields=None, pexpect_timeout=5, proot_cmd='bwrap'): """ - :param args: a list of `subprocess.call`-style arguments - representing a subprocess e.g., - ['ansible-playbook', '...'] - :param cwd: the directory where the subprocess should run, - generally the directory where playbooks exist :param env: a dict containing environment variables for the subprocess, ala `os.environ` - :param stdout_handle: a file-like object for capturing stdout - :param ssh_key_path: a filepath where SSH key data can be read - :param expect_passwords: a dict of regular expression password prompts - to input values, i.e., {r'Password:*?$': - 'some_password'} :param cancelled_callback: a callable - which returns `True` or `False` - signifying if the job has been prematurely cancelled @@ -56,13 +45,7 @@ class IsolatedManager(object): `pexpect.spawn().expect()` calls :param proot_cmd the command used to isolate processes, `bwrap` """ - self.args = args - self.cwd = cwd - self.isolated_env = self._redact_isolated_env(env.copy()) self.management_env = self._base_management_env() - self.stdout_handle = stdout_handle - self.ssh_key_path = ssh_key_path - self.expect_passwords = {k.pattern: v for k, v in expect_passwords.items()} self.cancelled_callback = cancelled_callback self.job_timeout = job_timeout self.idle_timeout = idle_timeout @@ -106,18 +89,6 @@ class IsolatedManager(object): args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY))) return args - @staticmethod - def _redact_isolated_env(env): - ''' - strips some environment variables that aren't applicable to - job execution within the isolated instance - ''' - for var in ( - 'HOME', 'RABBITMQ_HOST', 'RABBITMQ_PASS', 'RABBITMQ_USER', 'CACHE', - 'DJANGO_PROJECT_DIR', 'DJANGO_SETTINGS_MODULE', 'RABBITMQ_VHOST'): - env.pop(var, None) - return env - @classmethod def awx_playbook_path(cls): return os.path.abspath(os.path.join( @@ -128,55 +99,26 @@ class IsolatedManager(object): def path_to(self, *args): return os.path.join(self.private_data_dir, *args) - def dispatch(self): + def dispatch(self, playbook): ''' - Compile the playbook, its environment, and metadata into a series - of files, and ship to a remote host for isolated execution. + Ship the runner payload to a remote host for isolated execution. ''' self.started_at = time.time() - secrets = { - 'env': self.isolated_env, - 'passwords': self.expect_passwords, - 'ssh_key_data': None, - 'idle_timeout': self.idle_timeout, - 'job_timeout': self.job_timeout, - 'pexpect_timeout': self.pexpect_timeout - } - - # if an ssh private key fifo exists, read its contents and delete it - if self.ssh_key_path: - buff = StringIO() - with open(self.ssh_key_path, 'r') as fifo: - for line in fifo: - buff.write(line) - secrets['ssh_key_data'] = buff.getvalue() - os.remove(self.ssh_key_path) - - # write the entire secret payload to a named pipe - # the run_isolated.yml playbook will use a lookup to read this data - # into a variable, and will replicate the data into a named pipe on the - # isolated instance - secrets_path = os.path.join(self.private_data_dir, 'env') - run.open_fifo_write( - secrets_path, - smart_str(base64.b64encode(smart_bytes(json.dumps(secrets)))) - ) self.build_isolated_job_data() - extra_vars = { 'src': self.private_data_dir, 'dest': settings.AWX_PROOT_BASE_PATH, + 'playbook': playbook, + 'ident': self.ident } - if self.proot_temp_dir: - extra_vars['proot_temp_dir'] = self.proot_temp_dir # Run ansible-playbook to launch a job on the isolated host. This: # # - sets up a temporary directory for proot/bwrap (if necessary) # - copies encrypted job data from the controlling host to the isolated host (with rsync) # - writes the encryption secret to a named pipe on the isolated host - # - launches the isolated playbook runner via `awx-expect start ` + # - launches ansible-runner args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars) if self.instance.verbosity: args.append('-%s' % ('v' * min(5, self.instance.verbosity))) @@ -188,10 +130,15 @@ class IsolatedManager(object): job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT, pexpect_timeout=5 ) - output = buff.getvalue().encode('utf-8') + output = buff.getvalue() playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output)) if status != 'successful': - self.stdout_handle.write(output) + event_data = { + 'event': 'verbose', + 'stdout': output + } + event_data.setdefault(self.event_data_key, self.instance.id) + CallbackQueueDispatcher().dispatch(event_data) return status, rc @classmethod @@ -215,11 +162,8 @@ class IsolatedManager(object): def build_isolated_job_data(self): ''' - Write the playbook and metadata into a collection of files on the local - file system. - - This function is intended to be used to compile job data so that it - can be shipped to a remote, isolated host (via ssh). + Write metadata related to the playbook run into a collection of files + on the local file system. ''' rsync_exclude = [ @@ -229,42 +173,18 @@ class IsolatedManager(object): '- /project/.hg', # don't rsync job events that are in the process of being written '- /artifacts/job_events/*-partial.json.tmp', - # rsync can't copy named pipe data - we're replicating this manually ourselves in the playbook - '- /env' + # don't rsync the ssh_key FIFO + '- /env/ssh_key', ] for filename, data in ( ['.rsync-filter', '\n'.join(rsync_exclude)], - ['args', json.dumps(self.args)] ): path = self.path_to(filename) with open(path, 'w') as f: f.write(data) os.chmod(path, stat.S_IRUSR) - # symlink the scm checkout (if there is one) so that it's rsync'ed over, too - if 'AD_HOC_COMMAND_ID' not in self.isolated_env: - os.symlink(self.cwd, self.path_to('project')) - - # create directories for build artifacts to live in - os.makedirs(self.path_to('artifacts', 'job_events'), mode=stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR) - - def _missing_artifacts(self, path_list, output): - missing_artifacts = list(filter(lambda path: not os.path.exists(path), path_list)) - for path in missing_artifacts: - self.stdout_handle.write('ansible did not exit cleanly, missing `{}`.\n'.format(path)) - if missing_artifacts: - daemon_path = self.path_to('artifacts', 'daemon.log') - if os.path.exists(daemon_path): - # If available, show log files from the run.py call - with codecs.open(daemon_path, 'r', encoding='utf-8') as f: - self.stdout_handle.write(f.read()) - else: - # Provide the management playbook standard out if not available - self.stdout_handle.write(output) - return True - return False - def check(self, interval=None): """ Repeatedly poll the isolated node to determine if the job has run. @@ -290,8 +210,9 @@ class IsolatedManager(object): rc = None buff = StringIO() last_check = time.time() - seek = 0 job_timeout = remaining = self.job_timeout + handled_events = set() + dispatcher = CallbackQueueDispatcher() while status == 'failed': if job_timeout != 0: remaining = max(0, job_timeout - (time.time() - self.started_at)) @@ -322,31 +243,35 @@ class IsolatedManager(object): output = buff.getvalue().encode('utf-8') playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output)) - path = self.path_to('artifacts', 'stdout') - if os.path.exists(path): - with codecs.open(path, 'r', encoding='utf-8') as f: - f.seek(seek) - for line in f: - self.stdout_handle.write(line) - seek += len(line) + # discover new events and ingest them + events_path = self.path_to('artifacts', self.ident, 'job_events') + for event in set(os.listdir(events_path)) - handled_events: + path = os.path.join(events_path, event) + if os.path.exists(path): + event_data = json.load( + open(os.path.join(events_path, event), 'r') + ) + event_data.setdefault(self.event_data_key, self.instance.id) + dispatcher.dispatch(event_data) + handled_events.add(event) last_check = time.time() if status == 'successful': - status_path = self.path_to('artifacts', 'status') - rc_path = self.path_to('artifacts', 'rc') - if self._missing_artifacts([status_path, rc_path], output): - status = 'failed' - rc = 1 - else: - with open(status_path, 'r') as f: - status = f.readline() - with open(rc_path, 'r') as f: - rc = int(f.readline()) - elif status == 'failed': - # if we were unable to retrieve job reults from the isolated host, - # print stdout of the `check_isolated.yml` playbook for clues - self.stdout_handle.write(smart_str(output)) + status_path = self.path_to('artifacts', self.ident, 'status') + rc_path = self.path_to('artifacts', self.ident, 'rc') + with open(status_path, 'r') as f: + status = f.readline() + with open(rc_path, 'r') as f: + rc = int(f.readline()) + + # emit an EOF event + event_data = { + 'event': 'EOF', + 'final_counter': len(handled_events) + } + event_data.setdefault(self.event_data_key, self.instance.id) + dispatcher.dispatch(event_data) return status, rc @@ -356,7 +281,6 @@ class IsolatedManager(object): 'private_data_dir': self.private_data_dir, 'cleanup_dirs': [ self.private_data_dir, - self.proot_temp_dir, ], } args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars) @@ -377,23 +301,15 @@ class IsolatedManager(object): @classmethod def update_capacity(cls, instance, task_result, awx_application_version): - instance.version = task_result['version'] + instance.version = 'ansible-runner-{}'.format(task_result['version']) - isolated_version = instance.version.split("-", 1)[0] - cluster_version = awx_application_version.split("-", 1)[0] - - if Version(cluster_version) > Version(isolated_version): - err_template = "Isolated instance {} reports version {}, cluster node is at {}, setting capacity to zero." - logger.error(err_template.format(instance.hostname, instance.version, awx_application_version)) - instance.capacity = 0 - else: - if instance.capacity == 0 and task_result['capacity_cpu']: - logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname)) - instance.cpu_capacity = int(task_result['capacity_cpu']) - instance.mem_capacity = int(task_result['capacity_mem']) - instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment, - cpu_capacity=int(task_result['capacity_cpu']), - mem_capacity=int(task_result['capacity_mem'])) + if instance.capacity == 0 and task_result['capacity_cpu']: + logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname)) + instance.cpu_capacity = int(task_result['capacity_cpu']) + instance.mem_capacity = int(task_result['capacity_mem']) + instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment, + cpu_capacity=int(task_result['capacity_cpu']), + mem_capacity=int(task_result['capacity_mem'])) instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) @classmethod @@ -460,28 +376,7 @@ class IsolatedManager(object): if os.path.exists(facts_path): shutil.rmtree(facts_path) - @staticmethod - def get_stdout_handle(instance, private_data_dir, event_data_key='job_id'): - dispatcher = CallbackQueueDispatcher() - - def job_event_callback(event_data): - event_data.setdefault(event_data_key, instance.id) - if 'uuid' in event_data: - filename = '{}-partial.json'.format(event_data['uuid']) - partial_filename = os.path.join(private_data_dir, 'artifacts', 'job_events', filename) - try: - with codecs.open(partial_filename, 'r', encoding='utf-8') as f: - partial_event_data = json.load(f) - event_data.update(partial_event_data) - except IOError: - if event_data.get('event', '') != 'verbose': - logger.error('Missing callback data for event type `{}`, uuid {}, job {}.\nevent_data: {}'.format( - event_data.get('event', ''), event_data['uuid'], instance.id, event_data)) - dispatcher.dispatch(event_data) - - return OutputEventFilter(job_event_callback) - - def run(self, instance, private_data_dir, proot_temp_dir): + def run(self, instance, private_data_dir, playbook, event_data_key): """ Run a job on an isolated host. @@ -489,18 +384,19 @@ class IsolatedManager(object): :param private_data_dir: an absolute path on the local file system where job-specific data should be written (i.e., `/tmp/ansible_awx_xyz/`) - :param proot_temp_dir: a temporary directory which bwrap maps - restricted paths to + :param playbook: the playbook to run + :param event_data_key: e.g., job_id, inventory_id, ... For a completed job run, this function returns (status, rc), representing the status and return code of the isolated `ansible-playbook` run. """ + self.ident = str(uuid.uuid4()) + self.event_data_key = event_data_key self.instance = instance self.host = instance.execution_node self.private_data_dir = private_data_dir - self.proot_temp_dir = proot_temp_dir - status, rc = self.dispatch() + status, rc = self.dispatch(playbook) if status == 'successful': status, rc = self.check() self.cleanup() diff --git a/awx/main/management/commands/test_isolated_connection.py b/awx/main/management/commands/test_isolated_connection.py index efaf881535..01047cbc44 100644 --- a/awx/main/management/commands/test_isolated_connection.py +++ b/awx/main/management/commands/test_isolated_connection.py @@ -28,7 +28,7 @@ class Command(BaseCommand): args = [ 'ansible', 'all', '-i', '{},'.format(hostname), '-u', settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell', - '-a', 'awx-expect -h', '-vvv' + '-a', 'ansible-runner --version', '-vvv' ] if all([ getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 9e02d9c591..ee24feef44 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -16,6 +16,7 @@ import stat import tempfile import time import traceback +from distutils.dir_util import copy_tree from distutils.version import LooseVersion as Version import yaml import fcntl @@ -776,7 +777,7 @@ class BaseTask(object): if e.errno != errno.EEXIST: raise path = os.path.join(private_data_dir, 'env', 'ssh_key') - ansible_runner.utils.open_fifo_write(path, data.encode()) + ansible_runner.utils.open_fifo_write(path, data.encode()) private_data_files['credentials']['ssh'] = path # Ansible network modules do not yet support ssh-agent. # Instead, ssh private key file is explicitly passed via an @@ -936,29 +937,6 @@ class BaseTask(object): ''' return OrderedDict() - def get_stdout_handle(self, instance): - ''' - Return an virtual file object for capturing stdout and/or events. - ''' - dispatcher = CallbackQueueDispatcher() - - if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)): - def event_callback(event_data): - event_data.setdefault(self.event_data_key, instance.id) - if 'uuid' in event_data: - cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) - if cache_event is not None: - event_data.update(json.loads(cache_event)) - dispatcher.dispatch(event_data) - - return OutputEventFilter(event_callback) - else: - def event_callback(event_data): - event_data.setdefault(self.event_data_key, instance.id) - dispatcher.dispatch(event_data) - - return OutputVerboseFilter(event_callback) - def pre_run_hook(self, instance): ''' Hook for any steps to run before the job/task starts @@ -1056,13 +1034,6 @@ class BaseTask(object): ) self.write_args_file(private_data_dir, args) - if instance.is_isolated() is False: - stdout_handle = self.get_stdout_handle(instance) - else: - stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle( - instance, private_data_dir, event_data_key=self.event_data_key) - # If there is an SSH key path defined, wrap args with ssh-agent. - ssh_key_path = self.get_ssh_key_path(instance, private_data_files) # If we're executing on an isolated host, don't bother adding the # key to the agent in this environment instance = self.update_model(pk, job_args=json.dumps(safe_args), @@ -1078,122 +1049,123 @@ class BaseTask(object): ) instance = self.update_model(instance.pk, output_replacements=output_replacements) - # TODO: Satisfy isolated, refactor this to a single should_use_proot() - # call when isolated migrated to runner + def event_handler(self, instance, event_data): + should_write_event = False + dispatcher = CallbackQueueDispatcher() + event_data.setdefault(self.event_data_key, instance.id) + dispatcher.dispatch(event_data) + self.event_ct += 1 + + ''' + Handle artifacts + ''' + if event_data.get('event_data', {}).get('artifact_data', {}): + instance.artifacts = event_data['event_data']['artifact_data'] + instance.save(update_fields=['artifacts']) + + return should_write_event + + def cancel_callback(instance): + instance = self.update_model(pk) + if instance.cancel_flag or instance.status == 'canceled': + cancel_wait = (now() - instance.modified).seconds if instance.modified else 0 + if cancel_wait > 5: + logger.warn('Request to cancel {} took {} seconds to complete.'.format(instance.log_format, cancel_wait)) + return True + return False + + def finished_callback(self, instance, runner_obj): + dispatcher = CallbackQueueDispatcher() + event_data = { + 'event': 'EOF', + 'final_counter': self.event_ct, + } + event_data.setdefault(self.event_data_key, instance.id) + dispatcher.dispatch(event_data) + + params = { + 'ident': instance.id, + 'private_data_dir': private_data_dir, + 'project_dir': cwd, + 'playbook': self.build_playbook_path_relative_to_cwd(instance, private_data_dir), + 'inventory': self.build_inventory(instance, private_data_dir), + 'passwords': expect_passwords, + 'envvars': env, + 'event_handler': functools.partial(event_handler, self, instance), + 'cancel_callback': functools.partial(cancel_callback, instance), + 'finished_callback': functools.partial(finished_callback, self, instance), + 'settings': { + 'idle_timeout': self.get_idle_timeout() or "", + 'job_timeout': self.get_instance_timeout(instance), + 'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5), + } + } + if self.should_use_proot(instance): - proot_temp_dir = build_proot_temp_dir() + process_isolation_params = { + 'process_isolation': True, + 'process_isolation_path': settings.AWX_PROOT_BASE_PATH, + 'process_isolation_show_paths': self.proot_show_paths + [private_data_dir, cwd], + 'process_isolation_hide_paths': [ + settings.AWX_PROOT_BASE_PATH, + '/etc/tower', + '/var/lib/awx', + '/var/log', + settings.PROJECTS_ROOT, + settings.JOBOUTPUT_ROOT, + ] + getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or [], + 'process_isolation_ro_paths': [], + } + if settings.AWX_PROOT_SHOW_PATHS: + process_isolation_params['process_isolation_show_paths'].extend(settings.AWX_PROOT_SHOW_PATHS) + if settings.ANSIBLE_VENV_PATH: + process_isolation_params['process_isolation_ro_paths'].append(settings.ANSIBLE_VENV_PATH) + if settings.AWX_VENV_PATH: + process_isolation_params['process_isolation_ro_paths'].append(settings.AWX_VENV_PATH) + if proot_custom_virtualenv: + process_isolation_params['process_isolation_ro_paths'].append(proot_custom_virtualenv) + params = {**params, **process_isolation_params} + + if isinstance(instance, AdHocCommand): + params['module'] = self.build_module_name(instance) + params['module_args'] = self.build_module_args(instance) + + if getattr(instance, 'use_fact_cache', False): + # Enable Ansible fact cache. + params['fact_cache_type'] = 'jsonfile' + else: + # Disable Ansible fact cache. + params['fact_cache_type'] = '' + + ''' + Delete parameters if the values are None or empty array + ''' + for v in ['passwords', 'playbook', 'inventory']: + if not params[v]: + del params[v] if instance.is_isolated() is True: - manager_instance = isolated_manager.IsolatedManager( - args, cwd, env, stdout_handle, ssh_key_path, **_kw + playbook = params['playbook'] + shutil.move( + params.pop('inventory'), + os.path.join(private_data_dir, 'inventory') ) + copy_tree(cwd, os.path.join(private_data_dir, 'project')) + ansible_runner.utils.dump_artifacts(params) + manager_instance = isolated_manager.IsolatedManager(env, **_kw) status, rc = manager_instance.run(instance, private_data_dir, - proot_temp_dir) + playbook, + event_data_key=self.event_data_key) else: - def event_handler(self, instance, event_data): - should_write_event = False - dispatcher = CallbackQueueDispatcher() - event_data.setdefault(self.event_data_key, instance.id) - dispatcher.dispatch(event_data) - self.event_ct += 1 - - ''' - Handle artifacts - ''' - if event_data.get('event_data', {}).get('artifact_data', {}): - instance.artifacts = event_data['event_data']['artifact_data'] - instance.save(update_fields=['artifacts']) - - return should_write_event - - def cancel_callback(instance): - instance = self.update_model(pk) - if instance.cancel_flag or instance.status == 'canceled': - cancel_wait = (now() - instance.modified).seconds if instance.modified else 0 - if cancel_wait > 5: - logger.warn('Request to cancel {} took {} seconds to complete.'.format(instance.log_format, cancel_wait)) - return True - return False - - def finished_callback(self, instance, runner_obj): - dispatcher = CallbackQueueDispatcher() - event_data = { - 'event': 'EOF', - 'final_counter': self.event_ct, - } - event_data.setdefault(self.event_data_key, instance.id) - dispatcher.dispatch(event_data) - - params = { - 'ident': instance.id, - 'private_data_dir': private_data_dir, - 'project_dir': cwd, - 'playbook': self.build_playbook_path_relative_to_cwd(instance, private_data_dir), - 'inventory': self.build_inventory(instance, private_data_dir), - 'passwords': expect_passwords, - 'envvars': env, - 'event_handler': functools.partial(event_handler, self, instance), - 'cancel_callback': functools.partial(cancel_callback, instance), - 'finished_callback': functools.partial(finished_callback, self, instance), - 'settings': { - 'idle_timeout': self.get_idle_timeout() or "", - 'job_timeout': self.get_instance_timeout(instance), - 'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5), - } - } - - if self.should_use_proot(instance): - process_isolation_params = { - 'process_isolation': True, - 'process_isolation_path': settings.AWX_PROOT_BASE_PATH, - 'process_isolation_show_paths': self.proot_show_paths + [private_data_dir, cwd], - 'process_isolation_hide_paths': [ - settings.AWX_PROOT_BASE_PATH, - '/etc/tower', - '/var/lib/awx', - '/var/log', - settings.PROJECTS_ROOT, - settings.JOBOUTPUT_ROOT, - ] + getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or [], - 'process_isolation_ro_paths': [], - } - if settings.AWX_PROOT_SHOW_PATHS: - process_isolation_params['process_isolation_show_paths'].extend(settings.AWX_PROOT_SHOW_PATHS) - if settings.ANSIBLE_VENV_PATH: - process_isolation_params['process_isolation_ro_paths'].append(settings.ANSIBLE_VENV_PATH) - if settings.AWX_VENV_PATH: - process_isolation_params['process_isolation_ro_paths'].append(settings.AWX_VENV_PATH) - if proot_custom_virtualenv: - process_isolation_params['process_isolation_ro_paths'].append(proot_custom_virtualenv) - params = {**params, **process_isolation_params} - - if isinstance(instance, AdHocCommand): - params['module'] = self.build_module_name(instance) - params['module_args'] = self.build_module_args(instance) - - if getattr(instance, 'use_fact_cache', False): - # Enable Ansible fact cache. - params['fact_cache_type'] = 'jsonfile' - else: - # Disable Ansible fact cache. - params['fact_cache_type'] = '' - - ''' - Delete parameters if the values are None or empty array - ''' - for v in ['passwords', 'playbook', 'inventory']: - if not params[v]: - del params[v] - res = ansible_runner.interface.run(**params) status = res.status rc = res.rc - if status == 'timeout': - instance.job_explanation = "Job terminated due to timeout" - status = 'failed' - extra_update_fields['job_explanation'] = instance.job_explanation + if status == 'timeout': + instance.job_explanation = "Job terminated due to timeout" + status = 'failed' + extra_update_fields['job_explanation'] = instance.job_explanation except Exception: # run_pexpect does not throw exceptions for cancel or timeout @@ -1226,21 +1198,6 @@ class BaseTask(object): else: raise AwxTaskError.TaskError(instance, rc) - def get_ssh_key_path(self, instance, private_data_files): - ''' - If using an SSH key, return the path for use by ssh-agent. - ''' - if 'ssh' in private_data_files.get('credentials', {}): - return private_data_files['credentials']['ssh'] - ''' - Note: Don't inject network ssh key data into ssh-agent for network - credentials because the ansible modules do not yet support it. - We will want to add back in support when/if Ansible network modules - support this. - ''' - - return '' - @task() class RunJob(BaseTask): diff --git a/awx/playbooks/check_isolated.yml b/awx/playbooks/check_isolated.yml index 775389893c..7cb3724da2 100644 --- a/awx/playbooks/check_isolated.yml +++ b/awx/playbooks/check_isolated.yml @@ -1,5 +1,4 @@ --- - # The following variables will be set by the runner of this playbook: # src: /tmp/some/path/private_data_dir/ @@ -10,7 +9,7 @@ tasks: - name: Determine if daemon process is alive. - shell: "awx-expect is-alive {{src}}" + shell: "ansible-runner is-alive {{src}}" register: is_alive ignore_errors: true diff --git a/awx/playbooks/clean_isolated.yml b/awx/playbooks/clean_isolated.yml index 205dd7199e..2d6767351e 100644 --- a/awx/playbooks/clean_isolated.yml +++ b/awx/playbooks/clean_isolated.yml @@ -11,7 +11,7 @@ tasks: - name: cancel the job - command: "awx-expect stop {{private_data_dir}}" + command: "ansible-runner stop {{private_data_dir}}" ignore_errors: yes - name: remove build artifacts diff --git a/awx/playbooks/run_isolated.yml b/awx/playbooks/run_isolated.yml index bdcc798339..8d5a515bc4 100644 --- a/awx/playbooks/run_isolated.yml +++ b/awx/playbooks/run_isolated.yml @@ -3,36 +3,34 @@ # The following variables will be set by the runner of this playbook: # src: /tmp/some/path/private_data_dir # dest: /tmp/some/path/ -# proot_temp_dir: /tmp/some/path - name: Prepare data, dispatch job in isolated environment. hosts: all gather_facts: false vars: - secret: "{{ lookup('pipe', 'cat ' + src + '/env') }}" + secret: "{{ lookup('pipe', 'cat ' + src + '/env/ssh_key') }}" tasks: - - name: create a proot/bwrap temp dir (if necessary) - synchronize: - src: "{{proot_temp_dir}}" - dest: "{{dest}}" - when: proot_temp_dir is defined - - name: synchronize job environment with isolated host synchronize: copy_links: true src: "{{src}}" dest: "{{dest}}" + - stat: path="{{src}}/env/ssh_key" + register: key + - name: create a named pipe for secret environment data - command: "mkfifo {{src}}/env" + command: "mkfifo {{src}}/env/ssh_key" + when: key.stat.exists - name: spawn the playbook - command: "awx-expect start {{src}}" + command: "ansible-runner start {{src}} -p {{playbook}} -i {{ident}}" - name: write the secret environment data mkfifo: content: "{{secret}}" - path: "{{src}}/env" + path: "{{src}}/env/ssh_key" + when: key.stat.exists no_log: True diff --git a/awx/plugins/isolated/awx_capacity.py b/awx/plugins/isolated/awx_capacity.py index fbd1b5634c..f6e5b138b3 100644 --- a/awx/plugins/isolated/awx_capacity.py +++ b/awx/plugins/isolated/awx_capacity.py @@ -50,7 +50,7 @@ def main(): ) try: version = subprocess.check_output( - ['awx-expect', '--version'], + ['ansible-runner', '--version'], stderr=subprocess.STDOUT ).strip() except subprocess.CalledProcessError as e: diff --git a/awx/plugins/isolated/awx_isolated_cleanup.py b/awx/plugins/isolated/awx_isolated_cleanup.py index a5b4d9b1df..bfc5e8c19e 100644 --- a/awx/plugins/isolated/awx_isolated_cleanup.py +++ b/awx/plugins/isolated/awx_isolated_cleanup.py @@ -51,7 +51,7 @@ def main(): try: re_match = re.match(r'\/tmp\/ansible_awx_\d+_.+', path) if re_match is not None: - if subprocess.check_call(['awx-expect', 'is-alive', path]) == 0: + if subprocess.check_call(['ansible-runner', 'is-alive', path]) == 0: continue else: module.debug('Deleting path {} its job has completed.'.format(path)) diff --git a/docs/custom_virtualenvs.md b/docs/custom_virtualenvs.md index b89ac670c1..42521bdaf5 100644 --- a/docs/custom_virtualenvs.md +++ b/docs/custom_virtualenvs.md @@ -63,9 +63,6 @@ index aa8b304..eb05f91 100644 + virtualenv $(VENV_BASE)/my-custom-env + $(VENV_BASE)/my-custom-env/bin/pip install python-memcached psutil + - requirements_isolated: - if [ ! -d "$(VENV_BASE)/awx" ]; then \ - virtualenv --system-site-packages $(VENV_BASE)/awx && \ diff --git a/installer/image_build/templates/Dockerfile.j2 b/installer/image_build/templates/Dockerfile.j2 index d69e2c9..a08bae5 100644 --- a/installer/image_build/templates/Dockerfile.j2 diff --git a/setup.py b/setup.py index 6c8d226588..ae73ef6a42 100755 --- a/setup.py +++ b/setup.py @@ -40,38 +40,6 @@ else: # The .spec will create symlinks to support multiple versions of sosreport sosconfig = "/usr/share/sosreport/sos/plugins" -##################################################################### -# Isolated packaging -##################################################################### - - -class sdist_isolated(sdist): - includes = [ - 'include VERSION', - 'include Makefile', - 'include awx/__init__.py', - 'include awx/main/expect/run.py', - 'include tools/scripts/awx-expect', - 'include requirements/requirements_isolated.txt', - 'recursive-include awx/lib *.py', - ] - - def __init__(self, dist): - sdist.__init__(self, dist) - dist.metadata.version = get_version() - - def get_file_list(self): - self.filelist.process_template_line('include setup.py') - for line in self.includes: - self.filelist.process_template_line(line) - self.write_manifest() - - def make_release_tree(self, base_dir, files): - sdist.make_release_tree(self, base_dir, files) - with open(os.path.join(base_dir, 'MANIFEST.in'), 'w') as f: - f.write('\n'.join(self.includes)) - - ##################################################################### # Helper Functions @@ -160,12 +128,10 @@ setup( "tools/scripts/awx-python", "tools/scripts/ansible-tower-setup"]), ("%s" % sosconfig, ["tools/sosreport/tower.py"])]), - cmdclass = {'sdist_isolated': sdist_isolated}, options = { 'aliases': { 'dev_build': 'clean --all egg_info sdist', - 'release_build': 'clean --all egg_info -b "" sdist', - 'isolated_build': 'clean --all egg_info -b "" sdist_isolated', + 'release_build': 'clean --all egg_info -b "" sdist' }, 'build_scripts': { 'executable': '/usr/bin/awx-python', diff --git a/tools/docker-isolated/Dockerfile b/tools/docker-isolated/Dockerfile index 072915e3b8..e617a88d37 100644 --- a/tools/docker-isolated/Dockerfile +++ b/tools/docker-isolated/Dockerfile @@ -3,23 +3,22 @@ RUN yum clean all ADD Makefile /tmp/Makefile RUN mkdir /tmp/requirements -ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/ +ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt /tmp/requirements/ RUN yum -y update && yum -y install curl epel-release RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python36 python36-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 RUN ln -s /usr/bin/python36 /usr/bin/python3 RUN python36 -m ensurepip RUN pip3 install virtualenv +RUN pip3 install git+https://github.com/ansible/ansible-runner@master#egg=ansible_runner WORKDIR /tmp RUN make requirements_ansible -RUN make requirements_isolated RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8 ENV LANG en_US.UTF-8 ENV LANGUAGE en_US:en ENV LC_ALL en_US.UTF-8 WORKDIR / EXPOSE 22 -ADD tools/docker-isolated/awx-expect /usr/local/bin/awx-expect RUN rm -f /etc/ssh/ssh_host_ecdsa_key /etc/ssh/ssh_host_rsa_key RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_ecdsa_key @@ -30,4 +29,7 @@ RUN sed -i "s/#StrictModes.*/StrictModes no/g" /etc/ssh/sshd_config RUN mkdir -p /root/.ssh RUN ln -s /awx_devel/authorized_keys /root/.ssh/authorized_keys -CMD ["/usr/sbin/init"] +ADD https://github.com/krallin/tini/releases/download/v0.14.0/tini /tini +RUN chmod +x /tini +ENTRYPOINT ["/tini", "--"] +CMD ["/usr/sbin/sshd", "-D"] diff --git a/tools/docker-isolated/README.md b/tools/docker-isolated/README.md index 397c4485cb..1bed743c61 100644 --- a/tools/docker-isolated/README.md +++ b/tools/docker-isolated/README.md @@ -61,7 +61,7 @@ Example location of a private data directory: The following command would run the playbook corresponding to that job. ```bash -awx-expect start /tmp/ansible_awx_29_OM6Mnx/ +ansible-runner start /tmp/ansible_awx_29_OM6Mnx/ -p some_playbook.yml ``` -Other awx-expect commands include `start`, `is-alive`, and `stop`. +Other ansible-runner commands include `start`, `is-alive`, and `stop`. diff --git a/tools/docker-isolated/awx-expect b/tools/docker-isolated/awx-expect deleted file mode 100755 index bf2efb54d2..0000000000 --- a/tools/docker-isolated/awx-expect +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -. /venv/awx/bin/activate -exec env AWX_LIB_DIRECTORY=/awx_lib /awx_devel/run.py "$@"