update isolated task execution for ansible-runner

This commit is contained in:
Ryan Petrello
2019-03-01 13:08:26 -05:00
committed by chris meyers
parent 8fb65b40de
commit 602ef9750f
20 changed files with 190 additions and 1344 deletions

View File

@@ -168,13 +168,6 @@ requirements_ansible_dev:
$(VENV_BASE)/ansible/bin/pip install pytest mock; \ $(VENV_BASE)/ansible/bin/pip install pytest mock; \
fi fi
requirements_isolated:
if [ ! -d "$(VENV_BASE)/awx" ]; then \
$(PYTHON) -m venv $(VENV_BASE)/awx; \
fi;
echo "include-system-site-packages = true" >> $(VENV_BASE)/awx/lib/python$(PYTHON_VERSION)/pyvenv.cfg
$(VENV_BASE)/awx/bin/pip install -r requirements/requirements_isolated.txt
# Install third-party requirements needed for AWX's environment. # Install third-party requirements needed for AWX's environment.
requirements_awx: virtualenv_awx requirements_awx: virtualenv_awx
if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \ if [[ "$(PIP_OPTIONS)" == *"--no-index"* ]]; then \
@@ -570,7 +563,6 @@ docker-isolated:
TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create
docker start tools_awx_1 docker start tools_awx_1
docker start tools_isolated_1 docker start tools_isolated_1
echo "__version__ = '`git describe --long | cut -d - -f 1-1`'" | docker exec -i tools_isolated_1 /bin/bash -c "cat > /venv/awx/lib/python$(PYTHON_VERSION)/site-packages/awx.py"
CURRENT_UID=$(shell id -u) TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up CURRENT_UID=$(shell id -u) TAG=$(COMPOSE_TAG) DEV_DOCKER_TAG_BASE=$(DEV_DOCKER_TAG_BASE) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up
# Docker Compose Development environment # Docker Compose Development environment

View File

@@ -1,25 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# AWX Display Callback
from . import cleanup # noqa (registers control persistent cleanup)
from . import display # noqa (wraps ansible.display.Display methods)
from .module import AWXDefaultCallbackModule, AWXMinimalCallbackModule
__all__ = ['AWXDefaultCallbackModule', 'AWXMinimalCallbackModule']

View File

@@ -1,85 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import atexit
import glob
import os
import pwd
# PSUtil
try:
import psutil
except ImportError:
raise ImportError('psutil is missing; {}bin/pip install psutil'.format(
os.environ['VIRTUAL_ENV']
))
__all__ = []
main_pid = os.getpid()
@atexit.register
def terminate_ssh_control_masters():
# Only run this cleanup from the main process.
if os.getpid() != main_pid:
return
# Determine if control persist is being used and if any open sockets
# exist after running the playbook.
cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
if not cp_path:
return
cp_dir = os.path.dirname(cp_path)
if not os.path.exists(cp_dir):
return
cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
cp_files = glob.glob(cp_pattern)
if not cp_files:
return
# Attempt to find any running control master processes.
username = pwd.getpwuid(os.getuid())[0]
ssh_cm_procs = []
for proc in psutil.process_iter():
try:
pname = proc.name()
pcmdline = proc.cmdline()
pusername = proc.username()
except psutil.NoSuchProcess:
continue
if pusername != username:
continue
if pname != 'ssh':
continue
for cp_file in cp_files:
if pcmdline and cp_file in pcmdline[0]:
ssh_cm_procs.append(proc)
break
# Terminate then kill control master processes. Workaround older
# version of psutil that may not have wait_procs implemented.
for proc in ssh_cm_procs:
try:
proc.terminate()
except psutil.NoSuchProcess:
continue
procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
for proc in procs_alive:
proc.kill()

View File

@@ -1,98 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import functools
import sys
import uuid
# Ansible
from ansible.utils.display import Display
# Tower Display Callback
from .events import event_context
__all__ = []
def with_context(**context):
global event_context
def wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
return wrap
for attr in dir(Display):
if attr.startswith('_') or 'cow' in attr or 'prompt' in attr:
continue
if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'):
continue
if not callable(getattr(Display, attr)):
continue
setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr)))
def with_verbosity(f):
global event_context
@functools.wraps(f)
def wrapper(*args, **kwargs):
host = args[2] if len(args) >= 3 else kwargs.get('host', None)
caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2)
context = dict(verbose=True, verbosity=(caplevel + 1))
if host is not None:
context['remote_addr'] = host
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
Display.verbose = with_verbosity(Display.verbose)
def display_with_context(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
event_uuid = event_context.get().get('uuid', None)
with event_context.display_lock:
# If writing only to a log file or there is already an event UUID
# set (from a callback module method), skip dumping the event data.
if log_only or event_uuid:
return f(*args, **kwargs)
try:
fileobj = sys.stderr if stderr else sys.stdout
event_context.add_local(uuid=str(uuid.uuid4()))
event_context.dump_begin(fileobj)
return f(*args, **kwargs)
finally:
event_context.dump_end(fileobj)
event_context.remove_local(uuid=None)
return wrapper
Display.display = display_with_context(Display.display)

View File

@@ -1,186 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import base64
import contextlib
import datetime
import json
import multiprocessing
import os
import stat
import threading
import uuid
try:
import memcache
except ImportError:
raise ImportError('python-memcached is missing; {}bin/pip install python-memcached'.format(
os.environ['VIRTUAL_ENV']
))
__all__ = ['event_context']
class IsolatedFileWrite:
'''
Stand-in class that will write partial event data to a file as a
replacement for memcache when a job is running on an isolated host.
'''
def __init__(self):
self.private_data_dir = os.getenv('AWX_ISOLATED_DATA_DIR')
def set(self, key, value):
# Strip off the leading memcache key identifying characters :1:ev-
event_uuid = key[len(':1:ev-'):]
# Write data in a staging area and then atomic move to pickup directory
filename = '{}-partial.json'.format(event_uuid)
dropoff_location = os.path.join(self.private_data_dir, 'artifacts', 'job_events', filename)
write_location = '.'.join([dropoff_location, 'tmp'])
with os.fdopen(os.open(write_location, os.O_WRONLY | os.O_CREAT, stat.S_IRUSR | stat.S_IWUSR), 'w') as f:
f.write(value)
os.rename(write_location, dropoff_location)
class EventContext(object):
'''
Store global and local (per thread/process) data associated with callback
events and other display output methods.
'''
def __init__(self):
self.display_lock = multiprocessing.RLock()
cache_actual = os.getenv('CACHE', '127.0.0.1:11211')
if os.getenv('AWX_ISOLATED_DATA_DIR', False):
self.cache = IsolatedFileWrite()
else:
self.cache = memcache.Client([cache_actual], debug=0)
def add_local(self, **kwargs):
if not hasattr(self, '_local'):
self._local = threading.local()
self._local._ctx = {}
self._local._ctx.update(kwargs)
def remove_local(self, **kwargs):
if hasattr(self, '_local'):
for key in kwargs.keys():
self._local._ctx.pop(key, None)
@contextlib.contextmanager
def set_local(self, **kwargs):
try:
self.add_local(**kwargs)
yield
finally:
self.remove_local(**kwargs)
def get_local(self):
return getattr(getattr(self, '_local', None), '_ctx', {})
def add_global(self, **kwargs):
if not hasattr(self, '_global_ctx'):
self._global_ctx = {}
self._global_ctx.update(kwargs)
def remove_global(self, **kwargs):
if hasattr(self, '_global_ctx'):
for key in kwargs.keys():
self._global_ctx.pop(key, None)
@contextlib.contextmanager
def set_global(self, **kwargs):
try:
self.add_global(**kwargs)
yield
finally:
self.remove_global(**kwargs)
def get_global(self):
return getattr(self, '_global_ctx', {})
def get(self):
ctx = {}
ctx.update(self.get_global())
ctx.update(self.get_local())
return ctx
def get_begin_dict(self):
event_data = self.get()
if os.getenv('JOB_ID', ''):
event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
if os.getenv('AD_HOC_COMMAND_ID', ''):
event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
if os.getenv('PROJECT_UPDATE_ID', ''):
event_data['project_update_id'] = int(os.getenv('PROJECT_UPDATE_ID', '0'))
event_data.setdefault('pid', os.getpid())
event_data.setdefault('uuid', str(uuid.uuid4()))
event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
if not event_data.get('parent_uuid', None) and event_data.get('job_id', None):
for key in ('task_uuid', 'play_uuid', 'playbook_uuid'):
parent_uuid = event_data.get(key, None)
if parent_uuid and parent_uuid != event_data.get('uuid', None):
event_data['parent_uuid'] = parent_uuid
break
event = event_data.pop('event', None)
if not event:
event = 'verbose'
for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'):
if event_data.get(key, False):
event = key
break
max_res = int(os.getenv("MAX_EVENT_RES", 700000))
if event not in ('playbook_on_stats',) and "res" in event_data and len(str(event_data['res'])) > max_res:
event_data['res'] = {}
event_dict = dict(event=event, event_data=event_data)
for key in list(event_data.keys()):
if key in ('job_id', 'ad_hoc_command_id', 'project_update_id', 'uuid', 'parent_uuid', 'created',):
event_dict[key] = event_data.pop(key)
elif key in ('verbosity', 'pid'):
event_dict[key] = event_data[key]
return event_dict
def get_end_dict(self):
return {}
def dump(self, fileobj, data, max_width=78, flush=False):
b64data = base64.b64encode(json.dumps(data).encode('utf-8')).decode()
with self.display_lock:
# pattern corresponding to OutputEventFilter expectation
fileobj.write(u'\x1b[K')
for offset in range(0, len(b64data), max_width):
chunk = b64data[offset:offset + max_width]
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write(u'\x1b[K')
if flush:
fileobj.flush()
def dump_begin(self, fileobj):
begin_dict = self.get_begin_dict()
self.cache.set(":1:ev-{}".format(begin_dict['uuid']), json.dumps(begin_dict))
self.dump(fileobj, {'uuid': begin_dict['uuid']})
def dump_end(self, fileobj):
self.dump(fileobj, self.get_end_dict(), flush=True)
event_context = EventContext()

View File

@@ -1,29 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
# Ansible
import ansible
# Because of the way Ansible loads plugins, it's not possible to import
# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh.
with open(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py')) as in_file:
exec(in_file.read())

View File

@@ -1,535 +0,0 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import codecs
import collections
import contextlib
import json
import os
import stat
import sys
import uuid
from copy import copy
# Ansible
from ansible import constants as C
from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
# AWX Display Callback
from .events import event_context
from .minimal import CallbackModule as MinimalCallbackModule
CENSORED = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" # noqa
class BaseCallbackModule(CallbackBase):
'''
Callback module for logging ansible/ansible-playbook events.
'''
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
# These events should never have an associated play.
EVENTS_WITHOUT_PLAY = [
'playbook_on_start',
'playbook_on_stats',
]
# These events should never have an associated task.
EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
'playbook_on_setup',
'playbook_on_notify',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
]
def __init__(self):
super(BaseCallbackModule, self).__init__()
self.task_uuids = set()
self.duplicate_task_counts = collections.defaultdict(lambda: 1)
self.play_uuids = set()
self.duplicate_play_counts = collections.defaultdict(lambda: 1)
@contextlib.contextmanager
def capture_event_data(self, event, **event_data):
event_data.setdefault('uuid', str(uuid.uuid4()))
if event not in self.EVENTS_WITHOUT_TASK:
task = event_data.pop('task', None)
else:
task = None
if event_data.get('res'):
if event_data['res'].get('_ansible_no_log', False):
event_data['res'] = {'censored': CENSORED}
if event_data['res'].get('results', []):
event_data['res']['results'] = copy(event_data['res']['results'])
for i, item in enumerate(event_data['res'].get('results', [])):
if isinstance(item, dict) and item.get('_ansible_no_log', False):
event_data['res']['results'][i] = {'censored': CENSORED}
with event_context.display_lock:
try:
event_context.add_local(event=event, **event_data)
if task:
self.set_task(task, local=True)
event_context.dump_begin(sys.stdout)
yield
finally:
event_context.dump_end(sys.stdout)
if task:
self.clear_task(local=True)
event_context.remove_local(event=None, **event_data)
def set_playbook(self, playbook):
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.
self.playbook_uuid = str(uuid.uuid4())
file_name = getattr(playbook, '_file_name', '???')
event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid)
self.clear_play()
def set_play(self, play):
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern)
self.clear_task()
def clear_play(self):
event_context.remove_global(play=None, play_uuid=None, play_pattern=None)
self.clear_task()
def set_task(self, task, local=False):
# FIXME: Task is "global" unless using free strategy!
task_ctx = dict(
task=(task.name or task.action),
task_uuid=str(task._uuid),
task_action=task.action,
task_args='',
)
try:
task_ctx['task_path'] = task.get_path()
except AttributeError:
pass
if C.DISPLAY_ARGS_TO_STDOUT:
if task.no_log:
task_ctx['task_args'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
else:
task_args = ', '.join(('%s=%s' % a for a in task.args.items()))
task_ctx['task_args'] = task_args
if getattr(task, '_role', None):
task_role = task._role._role_name
else:
task_role = getattr(task, 'role_name', '')
if task_role:
task_ctx['role'] = task_role
if local:
event_context.add_local(**task_ctx)
else:
event_context.add_global(**task_ctx)
def clear_task(self, local=False):
task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None)
if local:
event_context.remove_local(**task_ctx)
else:
event_context.remove_global(**task_ctx)
def v2_playbook_on_start(self, playbook):
self.set_playbook(playbook)
event_data = dict(
uuid=self.playbook_uuid,
)
with self.capture_event_data('playbook_on_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_start(playbook)
def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
event_data = dict(
varname=varname,
private=private,
prompt=prompt,
encrypt=encrypt,
confirm=confirm,
salt_size=salt_size,
salt=salt,
default=default,
)
with self.capture_event_data('playbook_on_vars_prompt', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(
varname, private, prompt, encrypt, confirm, salt_size, salt,
default,
)
def v2_playbook_on_include(self, included_file):
event_data = dict(
included_file=included_file._filename if included_file is not None else None,
)
with self.capture_event_data('playbook_on_include', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_include(included_file)
def v2_playbook_on_play_start(self, play):
play_uuid = str(play._uuid)
if play_uuid in self.play_uuids:
# When this play UUID repeats, it means the play is using the
# free strategy (or serial:1) so different hosts may be running
# different tasks within a play (where duplicate UUIDS are common).
#
# When this is the case, modify the UUID slightly to append
# a counter so we can still _track_ duplicate events, but also
# avoid breaking the display in these scenarios.
self.duplicate_play_counts[play_uuid] += 1
play_uuid = '_'.join([
play_uuid,
str(self.duplicate_play_counts[play_uuid])
])
self.play_uuids.add(play_uuid)
play._uuid = play_uuid
self.set_play(play)
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_data = dict(
name=name,
pattern=pattern,
uuid=str(play._uuid),
)
with self.capture_event_data('playbook_on_play_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_play_start(play)
def v2_playbook_on_import_for_host(self, result, imported_file):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file)
def v2_playbook_on_not_import_for_host(self, result, missing_file):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_not_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file)
def v2_playbook_on_setup(self):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_setup'):
super(BaseCallbackModule, self).v2_playbook_on_setup()
def v2_playbook_on_task_start(self, task, is_conditional):
# FIXME: Flag task path output as vv.
task_uuid = str(task._uuid)
if task_uuid in self.task_uuids:
# When this task UUID repeats, it means the play is using the
# free strategy (or serial:1) so different hosts may be running
# different tasks within a play (where duplicate UUIDS are common).
#
# When this is the case, modify the UUID slightly to append
# a counter so we can still _track_ duplicate events, but also
# avoid breaking the display in these scenarios.
self.duplicate_task_counts[task_uuid] += 1
task_uuid = '_'.join([
task_uuid,
str(self.duplicate_task_counts[task_uuid])
])
self.task_uuids.add(task_uuid)
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
is_conditional=is_conditional,
uuid=task_uuid,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# NOTE: Not used by Ansible 2.x.
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
is_conditional=True,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task)
def v2_playbook_on_handler_task_start(self, task):
# NOTE: Re-using playbook_on_task_start event for this v2-specific
# event, but setting is_conditional=True, which is how v1 identified a
# task run as a handler.
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
is_conditional=True,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task)
def v2_playbook_on_no_hosts_matched(self):
with self.capture_event_data('playbook_on_no_hosts_matched'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched()
def v2_playbook_on_no_hosts_remaining(self):
with self.capture_event_data('playbook_on_no_hosts_remaining'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining()
def v2_playbook_on_notify(self, handler, host):
# NOTE: Not used by Ansible < 2.5.
event_data = dict(
host=host.get_name(),
handler=handler.get_name(),
)
with self.capture_event_data('playbook_on_notify', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_notify(handler, host)
'''
ansible_stats is, retoractively, added in 2.2
'''
def v2_playbook_on_stats(self, stats):
self.clear_play()
# FIXME: Add count of plays/tasks.
event_data = dict(
changed=stats.changed,
dark=stats.dark,
failures=stats.failures,
ignored=getattr(stats, 'ignored', 0),
ok=stats.ok,
processed=stats.processed,
rescued=getattr(stats, 'rescued', 0),
skipped=stats.skipped
)
# write custom set_stat artifact data to the local disk so that it can
# be persisted by awx after the process exits
custom_artifact_data = stats.custom.get('_run', {}) if hasattr(stats, 'custom') else {}
if custom_artifact_data:
# create the directory for custom stats artifacts to live in (if it doesn't exist)
custom_artifacts_dir = os.path.join(os.getenv('AWX_PRIVATE_DATA_DIR'), 'artifacts')
if not os.path.isdir(custom_artifacts_dir):
os.makedirs(custom_artifacts_dir, mode=stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR)
custom_artifacts_path = os.path.join(custom_artifacts_dir, 'custom')
with codecs.open(custom_artifacts_path, 'w', encoding='utf-8') as f:
os.chmod(custom_artifacts_path, stat.S_IRUSR | stat.S_IWUSR)
json.dump(custom_artifact_data, f)
with self.capture_event_data('playbook_on_stats', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_stats(stats)
@staticmethod
def _get_event_loop(task):
if hasattr(task, 'loop_with'): # Ansible >=2.5
return task.loop_with
elif hasattr(task, 'loop'): # Ansible <2.4
return task.loop
return None
def v2_runner_on_ok(self, result):
# FIXME: Display detailed results or not based on verbosity.
# strip environment vars from the job event; it already exists on the
# job and sensitive values are filtered there
if result._task.action in ('setup', 'gather_facts'):
result._result.get('ansible_facts', {}).pop('ansible_env', None)
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
res=result._result,
event_loop=self._get_event_loop(result._task),
)
with self.capture_event_data('runner_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_on_ok(result)
def v2_runner_on_failed(self, result, ignore_errors=False):
# FIXME: Add verbosity for exception/results output.
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
res=result._result,
task=result._task,
ignore_errors=ignore_errors,
event_loop=self._get_event_loop(result._task),
)
with self.capture_event_data('runner_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors)
def v2_runner_on_skipped(self, result):
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
event_loop=self._get_event_loop(result._task),
)
with self.capture_event_data('runner_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_on_skipped(result)
def v2_runner_on_unreachable(self, result):
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_on_unreachable', **event_data):
super(BaseCallbackModule, self).v2_runner_on_unreachable(result)
def v2_runner_on_no_hosts(self, task):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
task=task,
)
with self.capture_event_data('runner_on_no_hosts', **event_data):
super(BaseCallbackModule, self).v2_runner_on_no_hosts(task)
def v2_runner_on_async_poll(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_poll', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_poll(result)
def v2_runner_on_async_ok(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_ok(result)
def v2_runner_on_async_failed(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_failed(result)
def v2_runner_on_file_diff(self, result, diff):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
diff=diff,
)
with self.capture_event_data('runner_on_file_diff', **event_data):
super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff)
def v2_on_file_diff(self, result):
# NOTE: Logged as runner_on_file_diff.
event_data = dict(
host=result._host.get_name(),
task=result._task,
diff=result._result.get('diff'),
)
with self.capture_event_data('runner_on_file_diff', **event_data):
super(BaseCallbackModule, self).v2_on_file_diff(result)
def v2_runner_item_on_ok(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_ok(result)
def v2_runner_item_on_failed(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_failed(result)
def v2_runner_item_on_skipped(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_skipped(result)
def v2_runner_retry(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_retry', **event_data):
super(BaseCallbackModule, self).v2_runner_retry(result)
def v2_runner_on_start(self, host, task):
event_data = dict(
host=host.get_name(),
task=task
)
with self.capture_event_data('runner_on_start', **event_data):
super(BaseCallbackModule, self).v2_runner_on_start(host, task)
class AWXDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule):
CALLBACK_NAME = 'awx_display'
class AWXMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule):
CALLBACK_NAME = 'minimal'
def v2_playbook_on_play_start(self, play):
pass
def v2_playbook_on_task_start(self, task, is_conditional):
self.set_task(task)

View File

@@ -6,8 +6,8 @@ import shutil
import stat import stat
import tempfile import tempfile
import time import time
import uuid
import logging import logging
from distutils.version import LooseVersion as Version
from io import StringIO from io import StringIO
from django.conf import settings from django.conf import settings
@@ -24,23 +24,12 @@ playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
class IsolatedManager(object): class IsolatedManager(object):
def __init__(self, args, cwd, env, stdout_handle, ssh_key_path, def __init__(self, env, cancelled_callback=None, job_timeout=0,
expect_passwords={}, cancelled_callback=None, job_timeout=0,
idle_timeout=None, extra_update_fields=None, idle_timeout=None, extra_update_fields=None,
pexpect_timeout=5, proot_cmd='bwrap'): pexpect_timeout=5, proot_cmd='bwrap'):
""" """
:param args: a list of `subprocess.call`-style arguments
representing a subprocess e.g.,
['ansible-playbook', '...']
:param cwd: the directory where the subprocess should run,
generally the directory where playbooks exist
:param env: a dict containing environment variables for the :param env: a dict containing environment variables for the
subprocess, ala `os.environ` subprocess, ala `os.environ`
:param stdout_handle: a file-like object for capturing stdout
:param ssh_key_path: a filepath where SSH key data can be read
:param expect_passwords: a dict of regular expression password prompts
to input values, i.e., {r'Password:*?$':
'some_password'}
:param cancelled_callback: a callable - which returns `True` or `False` :param cancelled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely - signifying if the job has been prematurely
cancelled cancelled
@@ -56,13 +45,7 @@ class IsolatedManager(object):
`pexpect.spawn().expect()` calls `pexpect.spawn().expect()` calls
:param proot_cmd the command used to isolate processes, `bwrap` :param proot_cmd the command used to isolate processes, `bwrap`
""" """
self.args = args
self.cwd = cwd
self.isolated_env = self._redact_isolated_env(env.copy())
self.management_env = self._base_management_env() self.management_env = self._base_management_env()
self.stdout_handle = stdout_handle
self.ssh_key_path = ssh_key_path
self.expect_passwords = {k.pattern: v for k, v in expect_passwords.items()}
self.cancelled_callback = cancelled_callback self.cancelled_callback = cancelled_callback
self.job_timeout = job_timeout self.job_timeout = job_timeout
self.idle_timeout = idle_timeout self.idle_timeout = idle_timeout
@@ -106,18 +89,6 @@ class IsolatedManager(object):
args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY))) args.append('-%s' % ('v' * min(5, settings.AWX_ISOLATED_VERBOSITY)))
return args return args
@staticmethod
def _redact_isolated_env(env):
'''
strips some environment variables that aren't applicable to
job execution within the isolated instance
'''
for var in (
'HOME', 'RABBITMQ_HOST', 'RABBITMQ_PASS', 'RABBITMQ_USER', 'CACHE',
'DJANGO_PROJECT_DIR', 'DJANGO_SETTINGS_MODULE', 'RABBITMQ_VHOST'):
env.pop(var, None)
return env
@classmethod @classmethod
def awx_playbook_path(cls): def awx_playbook_path(cls):
return os.path.abspath(os.path.join( return os.path.abspath(os.path.join(
@@ -128,55 +99,26 @@ class IsolatedManager(object):
def path_to(self, *args): def path_to(self, *args):
return os.path.join(self.private_data_dir, *args) return os.path.join(self.private_data_dir, *args)
def dispatch(self): def dispatch(self, playbook):
''' '''
Compile the playbook, its environment, and metadata into a series Ship the runner payload to a remote host for isolated execution.
of files, and ship to a remote host for isolated execution.
''' '''
self.started_at = time.time() self.started_at = time.time()
secrets = {
'env': self.isolated_env,
'passwords': self.expect_passwords,
'ssh_key_data': None,
'idle_timeout': self.idle_timeout,
'job_timeout': self.job_timeout,
'pexpect_timeout': self.pexpect_timeout
}
# if an ssh private key fifo exists, read its contents and delete it
if self.ssh_key_path:
buff = StringIO()
with open(self.ssh_key_path, 'r') as fifo:
for line in fifo:
buff.write(line)
secrets['ssh_key_data'] = buff.getvalue()
os.remove(self.ssh_key_path)
# write the entire secret payload to a named pipe
# the run_isolated.yml playbook will use a lookup to read this data
# into a variable, and will replicate the data into a named pipe on the
# isolated instance
secrets_path = os.path.join(self.private_data_dir, 'env')
run.open_fifo_write(
secrets_path,
smart_str(base64.b64encode(smart_bytes(json.dumps(secrets))))
)
self.build_isolated_job_data() self.build_isolated_job_data()
extra_vars = { extra_vars = {
'src': self.private_data_dir, 'src': self.private_data_dir,
'dest': settings.AWX_PROOT_BASE_PATH, 'dest': settings.AWX_PROOT_BASE_PATH,
'playbook': playbook,
'ident': self.ident
} }
if self.proot_temp_dir:
extra_vars['proot_temp_dir'] = self.proot_temp_dir
# Run ansible-playbook to launch a job on the isolated host. This: # Run ansible-playbook to launch a job on the isolated host. This:
# #
# - sets up a temporary directory for proot/bwrap (if necessary) # - sets up a temporary directory for proot/bwrap (if necessary)
# - copies encrypted job data from the controlling host to the isolated host (with rsync) # - copies encrypted job data from the controlling host to the isolated host (with rsync)
# - writes the encryption secret to a named pipe on the isolated host # - writes the encryption secret to a named pipe on the isolated host
# - launches the isolated playbook runner via `awx-expect start <job-id>` # - launches ansible-runner
args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars) args = self._build_args('run_isolated.yml', '%s,' % self.host, extra_vars)
if self.instance.verbosity: if self.instance.verbosity:
args.append('-%s' % ('v' * min(5, self.instance.verbosity))) args.append('-%s' % ('v' * min(5, self.instance.verbosity)))
@@ -188,10 +130,15 @@ class IsolatedManager(object):
job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT, job_timeout=settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
pexpect_timeout=5 pexpect_timeout=5
) )
output = buff.getvalue().encode('utf-8') output = buff.getvalue()
playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output)) playbook_logger.info('Isolated job {} dispatch:\n{}'.format(self.instance.id, output))
if status != 'successful': if status != 'successful':
self.stdout_handle.write(output) event_data = {
'event': 'verbose',
'stdout': output
}
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
return status, rc return status, rc
@classmethod @classmethod
@@ -215,11 +162,8 @@ class IsolatedManager(object):
def build_isolated_job_data(self): def build_isolated_job_data(self):
''' '''
Write the playbook and metadata into a collection of files on the local Write metadata related to the playbook run into a collection of files
file system. on the local file system.
This function is intended to be used to compile job data so that it
can be shipped to a remote, isolated host (via ssh).
''' '''
rsync_exclude = [ rsync_exclude = [
@@ -229,42 +173,18 @@ class IsolatedManager(object):
'- /project/.hg', '- /project/.hg',
# don't rsync job events that are in the process of being written # don't rsync job events that are in the process of being written
'- /artifacts/job_events/*-partial.json.tmp', '- /artifacts/job_events/*-partial.json.tmp',
# rsync can't copy named pipe data - we're replicating this manually ourselves in the playbook # don't rsync the ssh_key FIFO
'- /env' '- /env/ssh_key',
] ]
for filename, data in ( for filename, data in (
['.rsync-filter', '\n'.join(rsync_exclude)], ['.rsync-filter', '\n'.join(rsync_exclude)],
['args', json.dumps(self.args)]
): ):
path = self.path_to(filename) path = self.path_to(filename)
with open(path, 'w') as f: with open(path, 'w') as f:
f.write(data) f.write(data)
os.chmod(path, stat.S_IRUSR) os.chmod(path, stat.S_IRUSR)
# symlink the scm checkout (if there is one) so that it's rsync'ed over, too
if 'AD_HOC_COMMAND_ID' not in self.isolated_env:
os.symlink(self.cwd, self.path_to('project'))
# create directories for build artifacts to live in
os.makedirs(self.path_to('artifacts', 'job_events'), mode=stat.S_IXUSR + stat.S_IWUSR + stat.S_IRUSR)
def _missing_artifacts(self, path_list, output):
missing_artifacts = list(filter(lambda path: not os.path.exists(path), path_list))
for path in missing_artifacts:
self.stdout_handle.write('ansible did not exit cleanly, missing `{}`.\n'.format(path))
if missing_artifacts:
daemon_path = self.path_to('artifacts', 'daemon.log')
if os.path.exists(daemon_path):
# If available, show log files from the run.py call
with codecs.open(daemon_path, 'r', encoding='utf-8') as f:
self.stdout_handle.write(f.read())
else:
# Provide the management playbook standard out if not available
self.stdout_handle.write(output)
return True
return False
def check(self, interval=None): def check(self, interval=None):
""" """
Repeatedly poll the isolated node to determine if the job has run. Repeatedly poll the isolated node to determine if the job has run.
@@ -290,8 +210,9 @@ class IsolatedManager(object):
rc = None rc = None
buff = StringIO() buff = StringIO()
last_check = time.time() last_check = time.time()
seek = 0
job_timeout = remaining = self.job_timeout job_timeout = remaining = self.job_timeout
handled_events = set()
dispatcher = CallbackQueueDispatcher()
while status == 'failed': while status == 'failed':
if job_timeout != 0: if job_timeout != 0:
remaining = max(0, job_timeout - (time.time() - self.started_at)) remaining = max(0, job_timeout - (time.time() - self.started_at))
@@ -322,31 +243,35 @@ class IsolatedManager(object):
output = buff.getvalue().encode('utf-8') output = buff.getvalue().encode('utf-8')
playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output)) playbook_logger.info('Isolated job {} check:\n{}'.format(self.instance.id, output))
path = self.path_to('artifacts', 'stdout') # discover new events and ingest them
if os.path.exists(path): events_path = self.path_to('artifacts', self.ident, 'job_events')
with codecs.open(path, 'r', encoding='utf-8') as f: for event in set(os.listdir(events_path)) - handled_events:
f.seek(seek) path = os.path.join(events_path, event)
for line in f: if os.path.exists(path):
self.stdout_handle.write(line) event_data = json.load(
seek += len(line) open(os.path.join(events_path, event), 'r')
)
event_data.setdefault(self.event_data_key, self.instance.id)
dispatcher.dispatch(event_data)
handled_events.add(event)
last_check = time.time() last_check = time.time()
if status == 'successful': if status == 'successful':
status_path = self.path_to('artifacts', 'status') status_path = self.path_to('artifacts', self.ident, 'status')
rc_path = self.path_to('artifacts', 'rc') rc_path = self.path_to('artifacts', self.ident, 'rc')
if self._missing_artifacts([status_path, rc_path], output): with open(status_path, 'r') as f:
status = 'failed' status = f.readline()
rc = 1 with open(rc_path, 'r') as f:
else: rc = int(f.readline())
with open(status_path, 'r') as f:
status = f.readline() # emit an EOF event
with open(rc_path, 'r') as f: event_data = {
rc = int(f.readline()) 'event': 'EOF',
elif status == 'failed': 'final_counter': len(handled_events)
# if we were unable to retrieve job reults from the isolated host, }
# print stdout of the `check_isolated.yml` playbook for clues event_data.setdefault(self.event_data_key, self.instance.id)
self.stdout_handle.write(smart_str(output)) dispatcher.dispatch(event_data)
return status, rc return status, rc
@@ -356,7 +281,6 @@ class IsolatedManager(object):
'private_data_dir': self.private_data_dir, 'private_data_dir': self.private_data_dir,
'cleanup_dirs': [ 'cleanup_dirs': [
self.private_data_dir, self.private_data_dir,
self.proot_temp_dir,
], ],
} }
args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars) args = self._build_args('clean_isolated.yml', '%s,' % self.host, extra_vars)
@@ -377,23 +301,15 @@ class IsolatedManager(object):
@classmethod @classmethod
def update_capacity(cls, instance, task_result, awx_application_version): def update_capacity(cls, instance, task_result, awx_application_version):
instance.version = task_result['version'] instance.version = 'ansible-runner-{}'.format(task_result['version'])
isolated_version = instance.version.split("-", 1)[0] if instance.capacity == 0 and task_result['capacity_cpu']:
cluster_version = awx_application_version.split("-", 1)[0] logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
instance.cpu_capacity = int(task_result['capacity_cpu'])
if Version(cluster_version) > Version(isolated_version): instance.mem_capacity = int(task_result['capacity_mem'])
err_template = "Isolated instance {} reports version {}, cluster node is at {}, setting capacity to zero." instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment,
logger.error(err_template.format(instance.hostname, instance.version, awx_application_version)) cpu_capacity=int(task_result['capacity_cpu']),
instance.capacity = 0 mem_capacity=int(task_result['capacity_mem']))
else:
if instance.capacity == 0 and task_result['capacity_cpu']:
logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
instance.cpu_capacity = int(task_result['capacity_cpu'])
instance.mem_capacity = int(task_result['capacity_mem'])
instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment,
cpu_capacity=int(task_result['capacity_cpu']),
mem_capacity=int(task_result['capacity_mem']))
instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified'])
@classmethod @classmethod
@@ -460,28 +376,7 @@ class IsolatedManager(object):
if os.path.exists(facts_path): if os.path.exists(facts_path):
shutil.rmtree(facts_path) shutil.rmtree(facts_path)
@staticmethod def run(self, instance, private_data_dir, playbook, event_data_key):
def get_stdout_handle(instance, private_data_dir, event_data_key='job_id'):
dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data):
event_data.setdefault(event_data_key, instance.id)
if 'uuid' in event_data:
filename = '{}-partial.json'.format(event_data['uuid'])
partial_filename = os.path.join(private_data_dir, 'artifacts', 'job_events', filename)
try:
with codecs.open(partial_filename, 'r', encoding='utf-8') as f:
partial_event_data = json.load(f)
event_data.update(partial_event_data)
except IOError:
if event_data.get('event', '') != 'verbose':
logger.error('Missing callback data for event type `{}`, uuid {}, job {}.\nevent_data: {}'.format(
event_data.get('event', ''), event_data['uuid'], instance.id, event_data))
dispatcher.dispatch(event_data)
return OutputEventFilter(job_event_callback)
def run(self, instance, private_data_dir, proot_temp_dir):
""" """
Run a job on an isolated host. Run a job on an isolated host.
@@ -489,18 +384,19 @@ class IsolatedManager(object):
:param private_data_dir: an absolute path on the local file system :param private_data_dir: an absolute path on the local file system
where job-specific data should be written where job-specific data should be written
(i.e., `/tmp/ansible_awx_xyz/`) (i.e., `/tmp/ansible_awx_xyz/`)
:param proot_temp_dir: a temporary directory which bwrap maps :param playbook: the playbook to run
restricted paths to :param event_data_key: e.g., job_id, inventory_id, ...
For a completed job run, this function returns (status, rc), For a completed job run, this function returns (status, rc),
representing the status and return code of the isolated representing the status and return code of the isolated
`ansible-playbook` run. `ansible-playbook` run.
""" """
self.ident = str(uuid.uuid4())
self.event_data_key = event_data_key
self.instance = instance self.instance = instance
self.host = instance.execution_node self.host = instance.execution_node
self.private_data_dir = private_data_dir self.private_data_dir = private_data_dir
self.proot_temp_dir = proot_temp_dir status, rc = self.dispatch(playbook)
status, rc = self.dispatch()
if status == 'successful': if status == 'successful':
status, rc = self.check() status, rc = self.check()
self.cleanup() self.cleanup()

View File

@@ -28,7 +28,7 @@ class Command(BaseCommand):
args = [ args = [
'ansible', 'all', '-i', '{},'.format(hostname), '-u', 'ansible', 'all', '-i', '{},'.format(hostname), '-u',
settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell', settings.AWX_ISOLATED_USERNAME, '-T5', '-m', 'shell',
'-a', 'awx-expect -h', '-vvv' '-a', 'ansible-runner --version', '-vvv'
] ]
if all([ if all([
getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True,

View File

@@ -16,6 +16,7 @@ import stat
import tempfile import tempfile
import time import time
import traceback import traceback
from distutils.dir_util import copy_tree
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
import yaml import yaml
import fcntl import fcntl
@@ -936,29 +937,6 @@ class BaseTask(object):
''' '''
return OrderedDict() return OrderedDict()
def get_stdout_handle(self, instance):
'''
Return an virtual file object for capturing stdout and/or events.
'''
dispatcher = CallbackQueueDispatcher()
if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)):
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(json.loads(cache_event))
dispatcher.dispatch(event_data)
return OutputEventFilter(event_callback)
else:
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
return OutputVerboseFilter(event_callback)
def pre_run_hook(self, instance): def pre_run_hook(self, instance):
''' '''
Hook for any steps to run before the job/task starts Hook for any steps to run before the job/task starts
@@ -1056,13 +1034,6 @@ class BaseTask(object):
) )
self.write_args_file(private_data_dir, args) self.write_args_file(private_data_dir, args)
if instance.is_isolated() is False:
stdout_handle = self.get_stdout_handle(instance)
else:
stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle(
instance, private_data_dir, event_data_key=self.event_data_key)
# If there is an SSH key path defined, wrap args with ssh-agent.
ssh_key_path = self.get_ssh_key_path(instance, private_data_files)
# If we're executing on an isolated host, don't bother adding the # If we're executing on an isolated host, don't bother adding the
# key to the agent in this environment # key to the agent in this environment
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
@@ -1078,122 +1049,123 @@ class BaseTask(object):
) )
instance = self.update_model(instance.pk, output_replacements=output_replacements) instance = self.update_model(instance.pk, output_replacements=output_replacements)
# TODO: Satisfy isolated, refactor this to a single should_use_proot() def event_handler(self, instance, event_data):
# call when isolated migrated to runner should_write_event = False
dispatcher = CallbackQueueDispatcher()
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
self.event_ct += 1
'''
Handle artifacts
'''
if event_data.get('event_data', {}).get('artifact_data', {}):
instance.artifacts = event_data['event_data']['artifact_data']
instance.save(update_fields=['artifacts'])
return should_write_event
def cancel_callback(instance):
instance = self.update_model(pk)
if instance.cancel_flag or instance.status == 'canceled':
cancel_wait = (now() - instance.modified).seconds if instance.modified else 0
if cancel_wait > 5:
logger.warn('Request to cancel {} took {} seconds to complete.'.format(instance.log_format, cancel_wait))
return True
return False
def finished_callback(self, instance, runner_obj):
dispatcher = CallbackQueueDispatcher()
event_data = {
'event': 'EOF',
'final_counter': self.event_ct,
}
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
params = {
'ident': instance.id,
'private_data_dir': private_data_dir,
'project_dir': cwd,
'playbook': self.build_playbook_path_relative_to_cwd(instance, private_data_dir),
'inventory': self.build_inventory(instance, private_data_dir),
'passwords': expect_passwords,
'envvars': env,
'event_handler': functools.partial(event_handler, self, instance),
'cancel_callback': functools.partial(cancel_callback, instance),
'finished_callback': functools.partial(finished_callback, self, instance),
'settings': {
'idle_timeout': self.get_idle_timeout() or "",
'job_timeout': self.get_instance_timeout(instance),
'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5),
}
}
if self.should_use_proot(instance): if self.should_use_proot(instance):
proot_temp_dir = build_proot_temp_dir() process_isolation_params = {
'process_isolation': True,
'process_isolation_path': settings.AWX_PROOT_BASE_PATH,
'process_isolation_show_paths': self.proot_show_paths + [private_data_dir, cwd],
'process_isolation_hide_paths': [
settings.AWX_PROOT_BASE_PATH,
'/etc/tower',
'/var/lib/awx',
'/var/log',
settings.PROJECTS_ROOT,
settings.JOBOUTPUT_ROOT,
] + getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or [],
'process_isolation_ro_paths': [],
}
if settings.AWX_PROOT_SHOW_PATHS:
process_isolation_params['process_isolation_show_paths'].extend(settings.AWX_PROOT_SHOW_PATHS)
if settings.ANSIBLE_VENV_PATH:
process_isolation_params['process_isolation_ro_paths'].append(settings.ANSIBLE_VENV_PATH)
if settings.AWX_VENV_PATH:
process_isolation_params['process_isolation_ro_paths'].append(settings.AWX_VENV_PATH)
if proot_custom_virtualenv:
process_isolation_params['process_isolation_ro_paths'].append(proot_custom_virtualenv)
params = {**params, **process_isolation_params}
if isinstance(instance, AdHocCommand):
params['module'] = self.build_module_name(instance)
params['module_args'] = self.build_module_args(instance)
if getattr(instance, 'use_fact_cache', False):
# Enable Ansible fact cache.
params['fact_cache_type'] = 'jsonfile'
else:
# Disable Ansible fact cache.
params['fact_cache_type'] = ''
'''
Delete parameters if the values are None or empty array
'''
for v in ['passwords', 'playbook', 'inventory']:
if not params[v]:
del params[v]
if instance.is_isolated() is True: if instance.is_isolated() is True:
manager_instance = isolated_manager.IsolatedManager( playbook = params['playbook']
args, cwd, env, stdout_handle, ssh_key_path, **_kw shutil.move(
params.pop('inventory'),
os.path.join(private_data_dir, 'inventory')
) )
copy_tree(cwd, os.path.join(private_data_dir, 'project'))
ansible_runner.utils.dump_artifacts(params)
manager_instance = isolated_manager.IsolatedManager(env, **_kw)
status, rc = manager_instance.run(instance, status, rc = manager_instance.run(instance,
private_data_dir, private_data_dir,
proot_temp_dir) playbook,
event_data_key=self.event_data_key)
else: else:
def event_handler(self, instance, event_data):
should_write_event = False
dispatcher = CallbackQueueDispatcher()
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
self.event_ct += 1
'''
Handle artifacts
'''
if event_data.get('event_data', {}).get('artifact_data', {}):
instance.artifacts = event_data['event_data']['artifact_data']
instance.save(update_fields=['artifacts'])
return should_write_event
def cancel_callback(instance):
instance = self.update_model(pk)
if instance.cancel_flag or instance.status == 'canceled':
cancel_wait = (now() - instance.modified).seconds if instance.modified else 0
if cancel_wait > 5:
logger.warn('Request to cancel {} took {} seconds to complete.'.format(instance.log_format, cancel_wait))
return True
return False
def finished_callback(self, instance, runner_obj):
dispatcher = CallbackQueueDispatcher()
event_data = {
'event': 'EOF',
'final_counter': self.event_ct,
}
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
params = {
'ident': instance.id,
'private_data_dir': private_data_dir,
'project_dir': cwd,
'playbook': self.build_playbook_path_relative_to_cwd(instance, private_data_dir),
'inventory': self.build_inventory(instance, private_data_dir),
'passwords': expect_passwords,
'envvars': env,
'event_handler': functools.partial(event_handler, self, instance),
'cancel_callback': functools.partial(cancel_callback, instance),
'finished_callback': functools.partial(finished_callback, self, instance),
'settings': {
'idle_timeout': self.get_idle_timeout() or "",
'job_timeout': self.get_instance_timeout(instance),
'pexpect_timeout': getattr(settings, 'PEXPECT_TIMEOUT', 5),
}
}
if self.should_use_proot(instance):
process_isolation_params = {
'process_isolation': True,
'process_isolation_path': settings.AWX_PROOT_BASE_PATH,
'process_isolation_show_paths': self.proot_show_paths + [private_data_dir, cwd],
'process_isolation_hide_paths': [
settings.AWX_PROOT_BASE_PATH,
'/etc/tower',
'/var/lib/awx',
'/var/log',
settings.PROJECTS_ROOT,
settings.JOBOUTPUT_ROOT,
] + getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or [],
'process_isolation_ro_paths': [],
}
if settings.AWX_PROOT_SHOW_PATHS:
process_isolation_params['process_isolation_show_paths'].extend(settings.AWX_PROOT_SHOW_PATHS)
if settings.ANSIBLE_VENV_PATH:
process_isolation_params['process_isolation_ro_paths'].append(settings.ANSIBLE_VENV_PATH)
if settings.AWX_VENV_PATH:
process_isolation_params['process_isolation_ro_paths'].append(settings.AWX_VENV_PATH)
if proot_custom_virtualenv:
process_isolation_params['process_isolation_ro_paths'].append(proot_custom_virtualenv)
params = {**params, **process_isolation_params}
if isinstance(instance, AdHocCommand):
params['module'] = self.build_module_name(instance)
params['module_args'] = self.build_module_args(instance)
if getattr(instance, 'use_fact_cache', False):
# Enable Ansible fact cache.
params['fact_cache_type'] = 'jsonfile'
else:
# Disable Ansible fact cache.
params['fact_cache_type'] = ''
'''
Delete parameters if the values are None or empty array
'''
for v in ['passwords', 'playbook', 'inventory']:
if not params[v]:
del params[v]
res = ansible_runner.interface.run(**params) res = ansible_runner.interface.run(**params)
status = res.status status = res.status
rc = res.rc rc = res.rc
if status == 'timeout': if status == 'timeout':
instance.job_explanation = "Job terminated due to timeout" instance.job_explanation = "Job terminated due to timeout"
status = 'failed' status = 'failed'
extra_update_fields['job_explanation'] = instance.job_explanation extra_update_fields['job_explanation'] = instance.job_explanation
except Exception: except Exception:
# run_pexpect does not throw exceptions for cancel or timeout # run_pexpect does not throw exceptions for cancel or timeout
@@ -1226,21 +1198,6 @@ class BaseTask(object):
else: else:
raise AwxTaskError.TaskError(instance, rc) raise AwxTaskError.TaskError(instance, rc)
def get_ssh_key_path(self, instance, private_data_files):
'''
If using an SSH key, return the path for use by ssh-agent.
'''
if 'ssh' in private_data_files.get('credentials', {}):
return private_data_files['credentials']['ssh']
'''
Note: Don't inject network ssh key data into ssh-agent for network
credentials because the ansible modules do not yet support it.
We will want to add back in support when/if Ansible network modules
support this.
'''
return ''
@task() @task()
class RunJob(BaseTask): class RunJob(BaseTask):

View File

@@ -1,5 +1,4 @@
--- ---
# The following variables will be set by the runner of this playbook: # The following variables will be set by the runner of this playbook:
# src: /tmp/some/path/private_data_dir/ # src: /tmp/some/path/private_data_dir/
@@ -10,7 +9,7 @@
tasks: tasks:
- name: Determine if daemon process is alive. - name: Determine if daemon process is alive.
shell: "awx-expect is-alive {{src}}" shell: "ansible-runner is-alive {{src}}"
register: is_alive register: is_alive
ignore_errors: true ignore_errors: true

View File

@@ -11,7 +11,7 @@
tasks: tasks:
- name: cancel the job - name: cancel the job
command: "awx-expect stop {{private_data_dir}}" command: "ansible-runner stop {{private_data_dir}}"
ignore_errors: yes ignore_errors: yes
- name: remove build artifacts - name: remove build artifacts

View File

@@ -3,36 +3,34 @@
# The following variables will be set by the runner of this playbook: # The following variables will be set by the runner of this playbook:
# src: /tmp/some/path/private_data_dir # src: /tmp/some/path/private_data_dir
# dest: /tmp/some/path/ # dest: /tmp/some/path/
# proot_temp_dir: /tmp/some/path
- name: Prepare data, dispatch job in isolated environment. - name: Prepare data, dispatch job in isolated environment.
hosts: all hosts: all
gather_facts: false gather_facts: false
vars: vars:
secret: "{{ lookup('pipe', 'cat ' + src + '/env') }}" secret: "{{ lookup('pipe', 'cat ' + src + '/env/ssh_key') }}"
tasks: tasks:
- name: create a proot/bwrap temp dir (if necessary)
synchronize:
src: "{{proot_temp_dir}}"
dest: "{{dest}}"
when: proot_temp_dir is defined
- name: synchronize job environment with isolated host - name: synchronize job environment with isolated host
synchronize: synchronize:
copy_links: true copy_links: true
src: "{{src}}" src: "{{src}}"
dest: "{{dest}}" dest: "{{dest}}"
- stat: path="{{src}}/env/ssh_key"
register: key
- name: create a named pipe for secret environment data - name: create a named pipe for secret environment data
command: "mkfifo {{src}}/env" command: "mkfifo {{src}}/env/ssh_key"
when: key.stat.exists
- name: spawn the playbook - name: spawn the playbook
command: "awx-expect start {{src}}" command: "ansible-runner start {{src}} -p {{playbook}} -i {{ident}}"
- name: write the secret environment data - name: write the secret environment data
mkfifo: mkfifo:
content: "{{secret}}" content: "{{secret}}"
path: "{{src}}/env" path: "{{src}}/env/ssh_key"
when: key.stat.exists
no_log: True no_log: True

View File

@@ -50,7 +50,7 @@ def main():
) )
try: try:
version = subprocess.check_output( version = subprocess.check_output(
['awx-expect', '--version'], ['ansible-runner', '--version'],
stderr=subprocess.STDOUT stderr=subprocess.STDOUT
).strip() ).strip()
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:

View File

@@ -51,7 +51,7 @@ def main():
try: try:
re_match = re.match(r'\/tmp\/ansible_awx_\d+_.+', path) re_match = re.match(r'\/tmp\/ansible_awx_\d+_.+', path)
if re_match is not None: if re_match is not None:
if subprocess.check_call(['awx-expect', 'is-alive', path]) == 0: if subprocess.check_call(['ansible-runner', 'is-alive', path]) == 0:
continue continue
else: else:
module.debug('Deleting path {} its job has completed.'.format(path)) module.debug('Deleting path {} its job has completed.'.format(path))

View File

@@ -63,9 +63,6 @@ index aa8b304..eb05f91 100644
+ virtualenv $(VENV_BASE)/my-custom-env + virtualenv $(VENV_BASE)/my-custom-env
+ $(VENV_BASE)/my-custom-env/bin/pip install python-memcached psutil + $(VENV_BASE)/my-custom-env/bin/pip install python-memcached psutil
+ +
requirements_isolated:
if [ ! -d "$(VENV_BASE)/awx" ]; then \
virtualenv --system-site-packages $(VENV_BASE)/awx && \
diff --git a/installer/image_build/templates/Dockerfile.j2 b/installer/image_build/templates/Dockerfile.j2 diff --git a/installer/image_build/templates/Dockerfile.j2 b/installer/image_build/templates/Dockerfile.j2
index d69e2c9..a08bae5 100644 index d69e2c9..a08bae5 100644
--- a/installer/image_build/templates/Dockerfile.j2 --- a/installer/image_build/templates/Dockerfile.j2

View File

@@ -40,38 +40,6 @@ else:
# The .spec will create symlinks to support multiple versions of sosreport # The .spec will create symlinks to support multiple versions of sosreport
sosconfig = "/usr/share/sosreport/sos/plugins" sosconfig = "/usr/share/sosreport/sos/plugins"
#####################################################################
# Isolated packaging
#####################################################################
class sdist_isolated(sdist):
includes = [
'include VERSION',
'include Makefile',
'include awx/__init__.py',
'include awx/main/expect/run.py',
'include tools/scripts/awx-expect',
'include requirements/requirements_isolated.txt',
'recursive-include awx/lib *.py',
]
def __init__(self, dist):
sdist.__init__(self, dist)
dist.metadata.version = get_version()
def get_file_list(self):
self.filelist.process_template_line('include setup.py')
for line in self.includes:
self.filelist.process_template_line(line)
self.write_manifest()
def make_release_tree(self, base_dir, files):
sdist.make_release_tree(self, base_dir, files)
with open(os.path.join(base_dir, 'MANIFEST.in'), 'w') as f:
f.write('\n'.join(self.includes))
##################################################################### #####################################################################
# Helper Functions # Helper Functions
@@ -160,12 +128,10 @@ setup(
"tools/scripts/awx-python", "tools/scripts/awx-python",
"tools/scripts/ansible-tower-setup"]), "tools/scripts/ansible-tower-setup"]),
("%s" % sosconfig, ["tools/sosreport/tower.py"])]), ("%s" % sosconfig, ["tools/sosreport/tower.py"])]),
cmdclass = {'sdist_isolated': sdist_isolated},
options = { options = {
'aliases': { 'aliases': {
'dev_build': 'clean --all egg_info sdist', 'dev_build': 'clean --all egg_info sdist',
'release_build': 'clean --all egg_info -b "" sdist', 'release_build': 'clean --all egg_info -b "" sdist'
'isolated_build': 'clean --all egg_info -b "" sdist_isolated',
}, },
'build_scripts': { 'build_scripts': {
'executable': '/usr/bin/awx-python', 'executable': '/usr/bin/awx-python',

View File

@@ -3,23 +3,22 @@ RUN yum clean all
ADD Makefile /tmp/Makefile ADD Makefile /tmp/Makefile
RUN mkdir /tmp/requirements RUN mkdir /tmp/requirements
ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/ ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt /tmp/requirements/
RUN yum -y update && yum -y install curl epel-release RUN yum -y update && yum -y install curl epel-release
RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python36 python36-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python36 python36-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2
RUN ln -s /usr/bin/python36 /usr/bin/python3 RUN ln -s /usr/bin/python36 /usr/bin/python3
RUN python36 -m ensurepip RUN python36 -m ensurepip
RUN pip3 install virtualenv RUN pip3 install virtualenv
RUN pip3 install git+https://github.com/ansible/ansible-runner@master#egg=ansible_runner
WORKDIR /tmp WORKDIR /tmp
RUN make requirements_ansible RUN make requirements_ansible
RUN make requirements_isolated
RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8 RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8
ENV LANG en_US.UTF-8 ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8 ENV LC_ALL en_US.UTF-8
WORKDIR / WORKDIR /
EXPOSE 22 EXPOSE 22
ADD tools/docker-isolated/awx-expect /usr/local/bin/awx-expect
RUN rm -f /etc/ssh/ssh_host_ecdsa_key /etc/ssh/ssh_host_rsa_key RUN rm -f /etc/ssh/ssh_host_ecdsa_key /etc/ssh/ssh_host_rsa_key
RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_ecdsa_key RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_ecdsa_key
@@ -30,4 +29,7 @@ RUN sed -i "s/#StrictModes.*/StrictModes no/g" /etc/ssh/sshd_config
RUN mkdir -p /root/.ssh RUN mkdir -p /root/.ssh
RUN ln -s /awx_devel/authorized_keys /root/.ssh/authorized_keys RUN ln -s /awx_devel/authorized_keys /root/.ssh/authorized_keys
CMD ["/usr/sbin/init"] ADD https://github.com/krallin/tini/releases/download/v0.14.0/tini /tini
RUN chmod +x /tini
ENTRYPOINT ["/tini", "--"]
CMD ["/usr/sbin/sshd", "-D"]

View File

@@ -61,7 +61,7 @@ Example location of a private data directory:
The following command would run the playbook corresponding to that job. The following command would run the playbook corresponding to that job.
```bash ```bash
awx-expect start /tmp/ansible_awx_29_OM6Mnx/ ansible-runner start /tmp/ansible_awx_29_OM6Mnx/ -p some_playbook.yml
``` ```
Other awx-expect commands include `start`, `is-alive`, and `stop`. Other ansible-runner commands include `start`, `is-alive`, and `stop`.

View File

@@ -1,3 +0,0 @@
#!/bin/bash
. /venv/awx/bin/activate
exec env AWX_LIB_DIRECTORY=/awx_lib /awx_devel/run.py "$@"