Merge pull request #3756 from cchurch/job-event-stdout

Capture stdout associated with job and ad hoc command events.
This commit is contained in:
Chris Church
2016-10-29 10:25:22 -04:00
committed by GitHub
21 changed files with 1520 additions and 869 deletions

View File

@@ -189,8 +189,6 @@ class TaskPermission(ModelAccessPermission):
# token. # token.
if view.model == Inventory and request.method.lower() in ('head', 'get'): if view.model == Inventory and request.method.lower() in ('head', 'get'):
return bool(not obj or obj.pk == unified_job.inventory_id) return bool(not obj or obj.pk == unified_job.inventory_id)
elif view.model in (JobEvent, AdHocCommandEvent) and request.method.lower() == 'post':
return bool(not obj or obj.pk == unified_job.pk)
else: else:
return False return False

View File

@@ -2417,7 +2417,9 @@ class JobEventSerializer(BaseSerializer):
model = JobEvent model = JobEvent
fields = ('*', '-name', '-description', 'job', 'event', 'counter', fields = ('*', '-name', '-description', 'job', 'event', 'counter',
'event_display', 'event_data', 'event_level', 'failed', 'event_display', 'event_data', 'event_level', 'failed',
'changed', 'host', 'host_name', 'parent', 'play', 'task', 'role') 'changed', 'uuid', 'host', 'host_name', 'parent', 'playbook',
'play', 'task', 'role', 'stdout', 'start_line', 'end_line',
'verbosity')
def get_related(self, obj): def get_related(self, obj):
res = super(JobEventSerializer, self).get_related(obj) res = super(JobEventSerializer, self).get_related(obj)
@@ -2453,16 +2455,8 @@ class AdHocCommandEventSerializer(BaseSerializer):
model = AdHocCommandEvent model = AdHocCommandEvent
fields = ('*', '-name', '-description', 'ad_hoc_command', 'event', fields = ('*', '-name', '-description', 'ad_hoc_command', 'event',
'counter', 'event_display', 'event_data', 'failed', 'counter', 'event_display', 'event_data', 'failed',
'changed', 'host', 'host_name') 'changed', 'uuid', 'host', 'host_name', 'stdout',
'start_line', 'end_line', 'verbosity')
def to_internal_value(self, data):
ret = super(AdHocCommandEventSerializer, self).to_internal_value(data)
# AdHocCommandAdHocCommandEventsList should be the only view creating
# AdHocCommandEvent instances, so keep the ad_hoc_command it sets, even
# though ad_hoc_command is a read-only field.
if 'ad_hoc_command' in data:
ret['ad_hoc_command'] = data['ad_hoc_command']
return ret
def get_related(self, obj): def get_related(self, obj):
res = super(AdHocCommandEventSerializer, self).get_related(obj) res = super(AdHocCommandEventSerializer, self).get_related(obj)

View File

@@ -3050,21 +3050,6 @@ class GroupJobEventsList(BaseJobEventsList):
class JobJobEventsList(BaseJobEventsList): class JobJobEventsList(BaseJobEventsList):
parent_model = Job parent_model = Job
authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = (TaskPermission,)
# Post allowed for job event callback only.
def post(self, request, *args, **kwargs):
parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
data = request.data.copy()
data['job'] = parent_obj.pk
serializer = self.get_serializer(data=data)
if serializer.is_valid():
self.instance = serializer.save()
headers = {'Location': serializer.data['url']}
return Response(serializer.data, status=status.HTTP_201_CREATED,
headers=headers)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class JobJobPlaysList(BaseJobEventsList): class JobJobPlaysList(BaseJobEventsList):
@@ -3455,25 +3440,8 @@ class HostAdHocCommandEventsList(BaseAdHocCommandEventsList):
class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList): class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList):
parent_model = AdHocCommand parent_model = AdHocCommand
authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = (TaskPermission,)
new_in_220 = True new_in_220 = True
# Post allowed for ad hoc event callback only.
def post(self, request, *args, **kwargs):
if request.user:
raise PermissionDenied()
parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
data = request.data.copy()
data['ad_hoc_command'] = parent_obj
serializer = self.get_serializer(data=data)
if serializer.is_valid():
self.instance = serializer.save()
headers = {'Location': serializer.data['url']}
return Response(serializer.data, status=status.HTTP_201_CREATED,
headers=headers)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class AdHocCommandActivityStreamList(SubListAPIView): class AdHocCommandActivityStreamList(SubListAPIView):
@@ -3583,7 +3551,11 @@ class UnifiedJobStdout(RetrieveAPIView):
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val)) dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line) content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content)) body = ansiconv.to_html(cgi.escape(content))
context = { context = {
'title': get_view_name(self.__class__), 'title': get_view_name(self.__class__),
'body': mark_safe(body), 'body': mark_safe(body),

22
awx/lib/sitecustomize.py Normal file
View File

@@ -0,0 +1,22 @@
# Python
import os
import sys
# Based on http://stackoverflow.com/a/6879344/131141 -- Initialize tower display
# callback as early as possible to wrap ansible.display.Display methods.
def argv_ready(argv):
if argv and os.path.basename(argv[0]) in {'ansible', 'ansible-playbook'}:
import tower_display_callback # noqa
class argv_placeholder(object):
def __del__(self):
argv_ready(sys.argv)
if hasattr(sys, 'argv'):
argv_ready(sys.argv)
else:
sys.argv = argv_placeholder()

View File

@@ -0,0 +1,25 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Tower Display Callback
from . import cleanup # noqa (registers control persistent cleanup)
from . import display # noqa (wraps ansible.display.Display methods)
from .module import TowerDefaultCallbackModule, TowerMinimalCallbackModule
__all__ = ['TowerDefaultCallbackModule', 'TowerMinimalCallbackModule']

View File

@@ -0,0 +1,72 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import atexit
import glob
import os
import pwd
# PSUtil
import psutil
__all__ = []
@atexit.register
def terminate_ssh_control_masters():
# Determine if control persist is being used and if any open sockets
# exist after running the playbook.
cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
if not cp_path:
return
cp_dir = os.path.dirname(cp_path)
if not os.path.exists(cp_dir):
return
cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
cp_files = glob.glob(cp_pattern)
if not cp_files:
return
# Attempt to find any running control master processes.
username = pwd.getpwuid(os.getuid())[0]
ssh_cm_procs = []
for proc in psutil.process_iter():
try:
pname = proc.name()
pcmdline = proc.cmdline()
pusername = proc.username()
except psutil.NoSuchProcess:
continue
if pusername != username:
continue
if pname != 'ssh':
continue
for cp_file in cp_files:
if pcmdline and cp_file in pcmdline[0]:
ssh_cm_procs.append(proc)
break
# Terminate then kill control master processes. Workaround older
# version of psutil that may not have wait_procs implemented.
for proc in ssh_cm_procs:
proc.terminate()
procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
for proc in procs_alive:
proc.kill()

View File

@@ -0,0 +1,92 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import functools
import sys
import uuid
# Ansible
from ansible.utils.display import Display
# Tower Display Callback
from tower_display_callback.events import event_context
__all__ = []
def with_context(**context):
global event_context
def wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
return wrap
for attr in dir(Display):
if attr.startswith('_') or 'cow' in attr or 'prompt' in attr:
continue
if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'):
continue
if not callable(getattr(Display, attr)):
continue
setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr)))
def with_verbosity(f):
global event_context
@functools.wraps(f)
def wrapper(*args, **kwargs):
host = args[2] if len(args) >= 3 else kwargs.get('host', None)
caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2)
context = dict(verbose=True, verbosity=(caplevel + 1))
if host is not None:
context['remote_addr'] = host
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
Display.verbose = with_verbosity(Display.verbose)
def display_with_context(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
fileobj = sys.stderr if stderr else sys.stdout
event_uuid = event_context.get().get('uuid', None)
try:
if not log_only and not event_uuid:
event_context.add_local(uuid=str(uuid.uuid4()))
event_context.dump_begin(fileobj)
return f(*args, **kwargs)
finally:
if not log_only and not event_uuid:
event_context.dump_end(fileobj)
event_context.remove_local(uuid=None)
return wrapper
Display.display = display_with_context(Display.display)

View File

@@ -0,0 +1,137 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import base64
import contextlib
import datetime
import json
import os
import threading
import uuid
__all__ = ['event_context']
class EventContext(object):
'''
Store global and local (per thread/process) data associated with callback
events and other display output methods.
'''
def add_local(self, **kwargs):
if not hasattr(self, '_local'):
self._local = threading.local()
self._local._ctx = {}
self._local._ctx.update(kwargs)
def remove_local(self, **kwargs):
if hasattr(self, '_local'):
for key in kwargs.keys():
self._local._ctx.pop(key, None)
@contextlib.contextmanager
def set_local(self, **kwargs):
try:
self.add_local(**kwargs)
yield
finally:
self.remove_local(**kwargs)
def get_local(self):
return getattr(getattr(self, '_local', None), '_ctx', {})
def add_global(self, **kwargs):
if not hasattr(self, '_global_ctx'):
self._global_ctx = {}
self._global_ctx.update(kwargs)
def remove_global(self, **kwargs):
if hasattr(self, '_global_ctx'):
for key in kwargs.keys():
self._global_ctx.pop(key, None)
@contextlib.contextmanager
def set_global(self, **kwargs):
try:
self.add_global(**kwargs)
yield
finally:
self.remove_global(**kwargs)
def get_global(self):
return getattr(self, '_global_ctx', {})
def get(self):
ctx = {}
ctx.update(self.get_global())
ctx.update(self.get_local())
return ctx
def get_begin_dict(self):
event_data = self.get()
if os.getenv('JOB_ID', ''):
event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
if os.getenv('AD_HOC_COMMAND_ID', ''):
event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
event_data.setdefault('pid', os.getpid())
event_data.setdefault('uuid', str(uuid.uuid4()))
event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
if not event_data.get('parent_uuid', None) and event_data.get('job_id', None):
for key in ('task_uuid', 'play_uuid', 'playbook_uuid'):
parent_uuid = event_data.get(key, None)
if parent_uuid and parent_uuid != event_data.get('uuid', None):
event_data['parent_uuid'] = parent_uuid
break
event = event_data.pop('event', None)
if not event:
event = 'verbose'
for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'):
if event_data.get(key, False):
event = key
break
event_dict = dict(event=event, event_data=event_data)
for key in event_data.keys():
if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'):
event_dict[key] = event_data.pop(key)
elif key in ('verbosity', 'pid'):
event_dict[key] = event_data[key]
return event_dict
def get_end_dict(self):
return {}
def dump(self, fileobj, data, max_width=78):
b64data = base64.b64encode(json.dumps(data))
fileobj.write(u'\x1b[K')
for offset in xrange(0, len(b64data), max_width):
chunk = b64data[offset:offset + max_width]
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write(u'\x1b[K')
def dump_begin(self, fileobj):
self.dump(fileobj, self.get_begin_dict())
def dump_end(self, fileobj):
self.dump(fileobj, self.get_end_dict())
event_context = EventContext()

View File

@@ -0,0 +1,28 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
# Ansible
import ansible
# Because of the way Ansible loads plugins, it's not possible to import
# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh.
execfile(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py'))

View File

@@ -0,0 +1,488 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import contextlib
import copy
import re
import sys
import uuid
# Ansible
from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
# Tower Display Callback
from tower_display_callback.events import event_context
from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule
class BaseCallbackModule(CallbackBase):
'''
Callback module for logging ansible/ansible-playbook events.
'''
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
# These events should never have an associated play.
EVENTS_WITHOUT_PLAY = [
'playbook_on_start',
'playbook_on_stats',
]
# These events should never have an associated task.
EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
'playbook_on_setup',
'playbook_on_notify',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
]
CENSOR_FIELD_WHITELIST = [
'msg',
'failed',
'changed',
'results',
'start',
'end',
'delta',
'cmd',
'_ansible_no_log',
'rc',
'failed_when_result',
'skipped',
'skip_reason',
]
def __init__(self):
super(BaseCallbackModule, self).__init__()
self.task_uuids = set()
def censor_result(self, res, no_log=False):
if not isinstance(res, dict):
if no_log:
return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return res
if res.get('_ansible_no_log', no_log):
new_res = {}
for k in self.CENSOR_FIELD_WHITELIST:
if k in res:
new_res[k] = res[k]
if k == 'cmd' and k in res:
if isinstance(res['cmd'], list):
res['cmd'] = ' '.join(res['cmd'])
if re.search(r'\s', res['cmd']):
new_res['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
r'\1 <censored>',
res['cmd'])
new_res['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
res = new_res
if 'results' in res:
if isinstance(res['results'], list):
for i in xrange(len(res['results'])):
res['results'][i] = self.censor_result(res['results'][i], res.get('_ansible_no_log', no_log))
elif res.get('_ansible_no_log', False):
res['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return res
@contextlib.contextmanager
def capture_event_data(self, event, **event_data):
event_data.setdefault('uuid', str(uuid.uuid4()))
if 'res' in event_data:
event_data['res'] = self.censor_result(copy.deepcopy(event_data['res']))
res = event_data.get('res', None)
if res and isinstance(res, dict):
if 'artifact_data' in res:
event_data['artifact_data'] = res['artifact_data']
if event not in self.EVENTS_WITHOUT_TASK:
task = event_data.pop('task', None)
else:
task = None
try:
event_context.add_local(event=event, **event_data)
if task:
self.set_task(task, local=True)
event_context.dump_begin(sys.stdout)
yield
finally:
event_context.dump_end(sys.stdout)
if task:
self.clear_task(local=True)
event_context.remove_local(event=None, **event_data)
def set_playbook(self, playbook):
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.
self.playbook_uuid = str(uuid.uuid4())
file_name = getattr(playbook, '_file_name', '???')
event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid)
self.clear_play()
def set_play(self, play):
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern)
self.clear_task()
def clear_play(self):
event_context.remove_global(play=None, play_uuid=None, play_pattern=None)
self.clear_task()
def set_task(self, task, local=False):
# FIXME: Task is "global" unless using free strategy!
task_ctx = dict(
task=(task.name or task.action),
task_path=task.get_path(),
task_uuid=str(task._uuid),
task_action=task.action,
)
if not task.no_log:
task_args = ', '.join(('%s=%s' % a for a in task.args.items()))
task_ctx['task_args'] = task_args
if getattr(task, '_role', None):
task_role = task._role._role_name
else:
task_role = getattr(task, 'role_name', '')
if task_role:
task_ctx['role'] = task_role
if local:
event_context.add_local(**task_ctx)
else:
event_context.add_global(**task_ctx)
def clear_task(self, local=False):
task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None)
if local:
event_context.remove_local(**task_ctx)
else:
event_context.remove_global(**task_ctx)
def v2_playbook_on_start(self, playbook):
self.set_playbook(playbook)
event_data = dict(
uuid=self.playbook_uuid,
)
with self.capture_event_data('playbook_on_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_start(playbook)
def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
event_data = dict(
varname=varname,
private=private,
prompt=prompt,
encrypt=encrypt,
confirm=confirm,
salt_size=salt_size,
salt=salt,
default=default,
)
with self.capture_event_data('playbook_on_vars_prompt', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(
varname, private, prompt, encrypt, confirm, salt_size, salt,
default,
)
def v2_playbook_on_include(self, included_file):
event_data = dict(
included_file=included_file,
)
with self.capture_event_data('playbook_on_include', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_include(included_file)
def v2_playbook_on_play_start(self, play):
self.set_play(play)
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_data = dict(
name=name,
pattern=pattern,
uuid=str(play._uuid),
)
with self.capture_event_data('playbook_on_play_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_play_start(play)
def v2_playbook_on_import_for_host(self, result, imported_file):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file)
def v2_playbook_on_not_import_for_host(self, result, missing_file):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_not_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file)
def v2_playbook_on_setup(self):
# NOTE: Not used by Ansible 2.x.
with self.capture_event_data('playbook_on_setup'):
super(BaseCallbackModule, self).v2_playbook_on_setup()
def v2_playbook_on_task_start(self, task, is_conditional):
# FIXME: Flag task path output as vv.
task_uuid = str(task._uuid)
if task_uuid in self.task_uuids:
# FIXME: When this task UUID repeats, it means the play is using the
# free strategy, so different hosts may be running different tasks
# within a play.
return
self.task_uuids.add(task_uuid)
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
is_conditional=is_conditional,
uuid=task_uuid,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# NOTE: Not used by Ansible 2.x.
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
is_conditional=True,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task)
def v2_playbook_on_handler_task_start(self, task):
# NOTE: Re-using playbook_on_task_start event for this v2-specific
# event, but setting is_conditional=True, which is how v1 identified a
# task run as a handler.
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
is_conditional=True,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task)
def v2_playbook_on_no_hosts_matched(self):
with self.capture_event_data('playbook_on_no_hosts_matched'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched()
def v2_playbook_on_no_hosts_remaining(self):
with self.capture_event_data('playbook_on_no_hosts_remaining'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining()
def v2_playbook_on_notify(self, result, handler):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
handler=handler,
)
with self.capture_event_data('playbook_on_notify', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_notify(result, handler)
def v2_playbook_on_stats(self, stats):
self.clear_play()
# FIXME: Add count of plays/tasks.
event_data = dict(
changed=stats.changed,
dark=stats.dark,
failures=stats.failures,
ok=stats.ok,
processed=stats.processed,
skipped=stats.skipped,
)
with self.capture_event_data('playbook_on_stats', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_stats(stats)
def v2_runner_on_ok(self, result):
# FIXME: Display detailed results or not based on verbosity.
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
res=result._result,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_on_ok(result)
def v2_runner_on_failed(self, result, ignore_errors=False):
# FIXME: Add verbosity for exception/results output.
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
res=result._result,
task=result._task,
ignore_errors=ignore_errors,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors)
def v2_runner_on_skipped(self, result):
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_on_skipped(result)
def v2_runner_on_unreachable(self, result):
event_data = dict(
host=result._host.get_name(),
remote_addr=result._host.address,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_on_unreachable', **event_data):
super(BaseCallbackModule, self).v2_runner_on_unreachable(result)
def v2_runner_on_no_hosts(self, task):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
task=task,
)
with self.capture_event_data('runner_on_no_hosts', **event_data):
super(BaseCallbackModule, self).v2_runner_on_no_hosts(task)
def v2_runner_on_async_poll(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_poll', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_poll(result)
def v2_runner_on_async_ok(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_ok(result)
def v2_runner_on_async_failed(self, result):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
jid=result._result.get('ansible_job_id'),
)
with self.capture_event_data('runner_on_async_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_on_async_failed(result)
def v2_runner_on_file_diff(self, result, diff):
# NOTE: Not used by Ansible 2.x.
event_data = dict(
host=result._host.get_name(),
task=result._task,
diff=diff,
)
with self.capture_event_data('runner_on_file_diff', **event_data):
super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff)
def v2_on_file_diff(self, result):
# NOTE: Logged as runner_on_file_diff.
event_data = dict(
host=result._host.get_name(),
task=result._task,
diff=result._result.get('diff'),
)
with self.capture_event_data('runner_on_file_diff', **event_data):
super(BaseCallbackModule, self).v2_on_file_diff(result)
def v2_runner_item_on_ok(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_ok(result)
def v2_runner_item_on_failed(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_failed(result)
def v2_runner_item_on_skipped(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_skipped(result)
def v2_runner_retry(self, result):
event_data = dict(
host=result._host.get_name(),
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_retry', **event_data):
super(BaseCallbackModule, self).v2_runner_retry(result)
class TowerDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule):
CALLBACK_NAME = 'tower_display'
class TowerMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule):
CALLBACK_NAME = 'minimal'
def v2_playbook_on_play_start(self, play):
pass
def v2_playbook_on_task_start(self, task, is_conditional):
self.set_task(task)

View File

@@ -2,9 +2,7 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import datetime
import logging import logging
import json
from kombu import Connection, Exchange, Queue from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
@@ -12,10 +10,7 @@ from kombu.mixins import ConsumerMixin
# Django # Django
from django.conf import settings from django.conf import settings
from django.core.management.base import NoArgsCommand from django.core.management.base import NoArgsCommand
from django.core.cache import cache
from django.db import DatabaseError from django.db import DatabaseError
from django.utils.dateparse import parse_datetime
from django.utils.timezone import FixedOffset
# AWX # AWX
from awx.main.models import * # noqa from awx.main.models import * # noqa
@@ -36,112 +31,26 @@ class CallbackBrokerWorker(ConsumerMixin):
def process_task(self, body, message): def process_task(self, body, message):
try: try:
if "event" not in body: if 'event' not in body:
raise Exception("Payload does not have an event") raise Exception('Payload does not have an event')
if "job_id" not in body: if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception("Payload does not have a job_id") raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG: if settings.DEBUG:
logger.info("Body: {}".format(body)) logger.info('Body: {}'.format(body))
logger.info("Message: {}".format(message)) logger.info('Message: {}'.format(message))
self.process_job_event(body) try:
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc: except Exception as exc:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc) logger.error('Callback Task Processor Raised Exception: %r', exc)
message.ack() message.ack()
def process_job_event(self, payload):
# Get the correct "verbose" value from the job.
# If for any reason there's a problem, just use 0.
if 'ad_hoc_command_id' in payload:
event_type_key = 'ad_hoc_command_id'
event_object_type = AdHocCommand
else:
event_type_key = 'job_id'
event_object_type = Job
try:
verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity
except Exception as e:
verbose=0
# TODO: cache
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(payload['created'], datetime.datetime):
payload['created'] = parse_datetime(payload['created'])
if not payload['created'].tzinfo:
payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
payload.pop('created', None)
event_uuid = payload.get("uuid", '')
parent_event_uuid = payload.get("parent_uuid", '')
artifact_data = payload.get("artifact_data", None)
# Sanity check: Don't honor keys that we don't recognize.
for key in payload.keys():
if key not in (event_type_key, 'event', 'event_data',
'created', 'counter', 'uuid'):
payload.pop(key)
try:
# If we're not in verbose mode, wipe out any module
# arguments.
res = payload['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
if 'ad_hoc_command_id' in payload:
AdHocCommandEvent.objects.create(**data)
return
j = JobEvent(**payload)
if payload['event'] == 'playbook_on_start':
j.save()
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
return
else:
if parent_event_uuid:
parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None)
if parent_id is None:
parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id'])
if parent_id_obj.exists(): # Problematic if not there, means the parent hasn't been written yet... TODO
j.parent_id = parent_id_obj[0].id
print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id))
cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300)
else:
print("Cache hit")
j.parent_id = parent_id
j.save(post_process=True)
if event_uuid:
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
except DatabaseError as e:
logger.error("Database Error Saving Job Event: {}".format(e))
if artifact_data:
try:
self.process_artifacts(artifact_data, res, payload)
except DatabaseError as e:
logger.error("Database Error Saving Job Artifacts: {}".format(e))
def process_artifacts(self, artifact_data, res, payload):
artifact_dict = json.loads(artifact_data)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
if artifact_data is not None:
parent_job = Job.objects.filter(pk=payload['job_id']).first()
if parent_job is not None and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''
@@ -158,4 +67,3 @@ class Command(NoArgsCommand):
worker.run() worker.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print('Terminating Callback Receiver') print('Terminating Callback Receiver')

View File

@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0044_v310_project_playbook_files'),
]
operations = [
migrations.AddField(
model_name='adhoccommandevent',
name='end_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='start_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='stdout',
field=models.TextField(default=b'', editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='uuid',
field=models.CharField(default=b'', max_length=1024, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='verbosity',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='end_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='playbook',
field=models.CharField(default=b'', max_length=1024, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='start_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='stdout',
field=models.TextField(default=b'', editable=False),
),
migrations.AddField(
model_name='jobevent',
name='verbosity',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='adhoccommandevent',
name='counter',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='adhoccommandevent',
name='event',
field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_skipped', 'Host Skipped'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
),
migrations.AlterField(
model_name='jobevent',
name='counter',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='jobevent',
name='event',
field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_item_on_ok', 'Item OK'), (b'runner_item_on_failed', 'Item Failed'), (b'runner_item_on_skipped', 'Item Skipped'), (b'runner_retry', 'Host Retry'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_include', 'Including File'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
),
migrations.AlterUniqueTogether(
name='adhoccommandevent',
unique_together=set([]),
),
migrations.AlterIndexTogether(
name='adhoccommandevent',
index_together=set([('ad_hoc_command', 'event'), ('ad_hoc_command', 'uuid'), ('ad_hoc_command', 'end_line'), ('ad_hoc_command', 'start_line')]),
),
migrations.AlterIndexTogether(
name='jobevent',
index_together=set([('job', 'event'), ('job', 'parent'), ('job', 'start_line'), ('job', 'uuid'), ('job', 'end_line')]),
),
]

View File

@@ -2,6 +2,7 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import datetime
import hmac import hmac
import json import json
import logging import logging
@@ -10,7 +11,9 @@ from urlparse import urljoin
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import models from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator from django.utils.text import Truncator
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
@@ -257,24 +260,38 @@ class AdHocCommandEvent(CreatedModifiedModel):
('runner_on_ok', _('Host OK'), False), ('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True), ('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback). # Tower won't see no_hosts (check is done earlier without callback).
#('runner_on_no_hosts', _('No Hosts Matched'), False), # ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that # Tower will see skipped (when running in check mode for a module that
# does not support check mode). # does not support check mode).
('runner_on_skipped', _('Host Skipped'), False), ('runner_on_skipped', _('Host Skipped'), False),
# Tower does not support async for ad hoc commands. # Tower does not support async for ad hoc commands (not used in v2).
#('runner_on_async_poll', _('Host Polling'), False), # ('runner_on_async_poll', _('Host Polling'), False),
#('runner_on_async_ok', _('Host Async OK'), False), # ('runner_on_async_ok', _('Host Async OK'), False),
#('runner_on_async_failed', _('Host Async Failure'), True), # ('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode # Tower does not yet support --diff mode.
#('runner_on_file_diff', _('File Difference'), False), # ('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
] ]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]] FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES] EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
class Meta: class Meta:
app_label = 'main' app_label = 'main'
unique_together = [('ad_hoc_command', 'host_name')]
ordering = ('-pk',) ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
ad_hoc_command = models.ForeignKey( ad_hoc_command = models.ForeignKey(
'AdHocCommand', 'AdHocCommand',
@@ -311,8 +328,30 @@ class AdHocCommandEvent(CreatedModifiedModel):
default=False, default=False,
editable=False, editable=False,
) )
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField( counter = models.PositiveIntegerField(
default=0, default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
) )
def get_absolute_url(self): def get_absolute_url(self):
@@ -350,3 +389,28 @@ class AdHocCommandEvent(CreatedModifiedModel):
except (IndexError, AttributeError): except (IndexError, AttributeError):
pass pass
super(AdHocCommandEvent, self).save(*args, **kwargs) super(AdHocCommandEvent, self).save(*args, **kwargs)
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the ad hoc command event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created',
'counter', 'uuid', 'stdout', 'start_line', 'end_line',
'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
return AdHocCommandEvent.objects.create(**kwargs)

View File

@@ -2,6 +2,7 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import datetime
import hmac import hmac
import json import json
import yaml import yaml
@@ -11,8 +12,12 @@ from urlparse import urljoin
# Django # Django
from django.conf import settings from django.conf import settings
from django.core.cache import cache
from django.db import models from django.db import models
from django.db.models import Q, Count from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from django.utils.encoding import force_text
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
@@ -931,24 +936,29 @@ class JobEvent(CreatedModifiedModel):
# - playbook_on_vars_prompt (for each play, but before play starts, we # - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts) # currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play) # - playbook_on_play_start (once for each play)
# - playbook_on_import_for_host # - playbook_on_import_for_host (not logged, not used for v2)
# - playbook_on_not_import_for_host # - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched # - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining # - playbook_on_no_hosts_remaining
# - playbook_on_setup # - playbook_on_include (only v2 - only used for handlers?)
# - playbook_on_setup (not used for v2)
# - runner_on* # - runner_on*
# - playbook_on_task_start (once for each task within a play) # - playbook_on_task_start (once for each task within a play)
# - runner_on_failed # - runner_on_failed
# - runner_on_ok # - runner_on_ok
# - runner_on_error # - runner_on_error (not used for v2)
# - runner_on_skipped # - runner_on_skipped
# - runner_on_unreachable # - runner_on_unreachable
# - runner_on_no_hosts # - runner_on_no_hosts (not used for v2)
# - runner_on_async_poll # - runner_on_async_poll (not used for v2)
# - runner_on_async_ok # - runner_on_async_ok (not used for v2)
# - runner_on_async_failed # - runner_on_async_failed (not used for v2)
# - runner_on_file_diff # - runner_on_file_diff (v2 event is v2_on_file_diff)
# - playbook_on_notify (once for each notification from the play) # - runner_item_on_ok (v2 only)
# - runner_item_on_failed (v2 only)
# - runner_item_on_skipped (v2 only)
# - runner_retry (v2 only)
# - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats # - playbook_on_stats
EVENT_TYPES = [ EVENT_TYPES = [
@@ -962,22 +972,34 @@ class JobEvent(CreatedModifiedModel):
(3, 'runner_on_async_poll', _('Host Polling'), False), (3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False), (3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True), (3, 'runner_on_async_failed', _('Host Async Failure'), True),
# AWX does not yet support --diff mode (3, 'runner_item_on_ok', _('Item OK'), False),
(3, 'runner_item_on_failed', _('Item Failed'), True),
(3, 'runner_item_on_skipped', _('Item Skipped'), False),
(3, 'runner_retry', _('Host Retry'), False),
# Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False), (3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False), (0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False), (2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False), (2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False), (2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False), (2, 'playbook_on_task_start', _('Task Started'), False),
# AWX does not yet support vars_prompt (and will probably hang :) # Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False), (1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False), (2, 'playbook_on_setup', _('Gathering Facts'), False),
# callback will not record this
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False), (2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
# callback will not record this
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False), (2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False), (1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False), (1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
] ]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]] FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES] EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
@@ -986,6 +1008,13 @@ class JobEvent(CreatedModifiedModel):
class Meta: class Meta:
app_label = 'main' app_label = 'main'
ordering = ('pk',) ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent'),
]
job = models.ForeignKey( job = models.ForeignKey(
'Job', 'Job',
@@ -1032,12 +1061,17 @@ class JobEvent(CreatedModifiedModel):
related_name='job_events', related_name='job_events',
editable=False, editable=False,
) )
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField( play = models.CharField(
max_length=1024, max_length=1024,
default='', default='',
editable=False, editable=False,
) )
role = models.CharField( # FIXME: Determine from callback or task name. role = models.CharField(
max_length=1024, max_length=1024,
default='', default='',
editable=False, editable=False,
@@ -1057,8 +1091,24 @@ class JobEvent(CreatedModifiedModel):
) )
counter = models.PositiveIntegerField( counter = models.PositiveIntegerField(
default=0, default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
) )
def get_absolute_url(self): def get_absolute_url(self):
return reverse('api:job_event_detail', args=(self.pk,)) return reverse('api:job_event_detail', args=(self.pk,))
@@ -1119,7 +1169,8 @@ class JobEvent(CreatedModifiedModel):
pass pass
return msg return msg
def _find_parent(self): def _find_parent_id(self):
# Find the (most likely) parent event for this event.
parent_events = set() parent_events = set()
if self.event in ('playbook_on_play_start', 'playbook_on_stats', if self.event in ('playbook_on_play_start', 'playbook_on_stats',
'playbook_on_vars_prompt'): 'playbook_on_vars_prompt'):
@@ -1135,101 +1186,55 @@ class JobEvent(CreatedModifiedModel):
parent_events.add('playbook_on_setup') parent_events.add('playbook_on_setup')
parent_events.add('playbook_on_task_start') parent_events.add('playbook_on_task_start')
if parent_events: if parent_events:
try: qs = JobEvent.objects.filter(job_id=self.job_id, event__in=parent_events).order_by('-pk')
qs = JobEvent.objects.filter(job_id=self.job_id) if self.pk:
if self.pk: qs = qs.filter(pk__lt=self.pk)
qs = qs.filter(pk__lt=self.pk, event__in=parent_events) return qs.only('id').values_list('id', flat=True).first()
else:
qs = qs.filter(event__in=parent_events)
return qs.order_by('-pk')[0]
except IndexError:
pass
return None
def save(self, *args, **kwargs): def _update_from_event_data(self):
from awx.main.models.inventory import Host # Update job event model fields from event data.
# If update_fields has been specified, add our field names to it, updated_fields = set()
# if it hasn't been specified, then we're just doing a normal save. job = self.job
update_fields = kwargs.get('update_fields', []) verbosity = job.verbosity
# Skip normal checks on save if we're only updating failed/changed event_data = self.event_data
# flags triggered from a child event. res = event_data.get('res', None)
from_parent_update = kwargs.pop('from_parent_update', False) if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
if not from_parent_update: self.failed = True
res = self.event_data.get('res', None) updated_fields.add('failed')
# Workaround for Ansible 1.2, where the runner_on_async_ok event is if isinstance(res, dict):
# created even when the async task failed. Change the event to be if res.get('changed', False):
# correct.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
self.event = 'runner_on_async_failed'
except (AttributeError, TypeError):
pass
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True self.changed = True
if 'changed' not in update_fields: updated_fields.add('changed')
update_fields.append('changed') # If we're not in verbose mode, wipe out any module arguments.
if self.event == 'playbook_on_stats': invocation = res.get('invocation', None)
try: if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation:
failures_dict = self.event_data.get('failures', {}) event_data['res']['invocation']['module_args'] = ''
dark_dict = self.event_data.get('dark', {}) self.event_data = event_data
self.failed = bool(sum(failures_dict.values()) + update_fields.add('event_data')
sum(dark_dict.values())) if self.event == 'playbook_on_stats':
if 'failed' not in update_fields:
update_fields.append('failed')
changed_dict = self.event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
if 'changed' not in update_fields:
update_fields.append('changed')
except (AttributeError, TypeError):
pass
self.play = self.event_data.get('play', '').strip()
if 'play' not in update_fields:
update_fields.append('play')
self.task = self.event_data.get('task', '').strip()
if 'task' not in update_fields:
update_fields.append('task')
self.role = self.event_data.get('role', '').strip()
if 'role' not in update_fields:
update_fields.append('role')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
# Only update job event hierarchy and related models during post
# processing (after running job).
post_process = kwargs.pop('post_process', False)
if post_process:
try: try:
if not self.host_id and self.host_name: failures_dict = event_data.get('failures', {})
host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name) dark_dict = event_data.get('dark', {})
host_id = host_qs.only('id').values_list('id', flat=True) self.failed = bool(sum(failures_dict.values()) +
if host_id.exists(): sum(dark_dict.values()))
self.host_id = host_id[0] updated_fields.add('failed')
if 'host_id' not in update_fields: changed_dict = event_data.get('changed', {})
update_fields.append('host_id') self.changed = bool(sum(changed_dict.values()))
except (IndexError, AttributeError): updated_fields.add('changed')
except (AttributeError, TypeError):
pass pass
if self.parent is None: for field in ('playbook', 'play', 'task', 'role', 'host'):
self.parent = self._find_parent() value = force_text(event_data.get(field, '')).strip()
if 'parent' not in update_fields: if field == 'host':
update_fields.append('parent') field = 'host_name'
super(JobEvent, self).save(*args, **kwargs) if value != getattr(self, field):
if post_process and not from_parent_update: setattr(self, field, value)
self.update_parent_failed_and_changed() updated_fields.add(field)
# FIXME: The update_hosts() call (and its queries) are the current return updated_fields
# performance bottleneck....
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self.update_hosts()
self.update_host_summary_from_stats()
def update_parent_failed_and_changed(self): def _update_parent_failed_and_changed(self):
# Propagage failed and changed flags to parent events. # Propagate failed and changed flags to parent events.
if self.parent: if self.parent_id:
parent = self.parent parent = self.parent
update_fields = [] update_fields = []
if self.failed and not parent.failed: if self.failed and not parent.failed:
@@ -1240,9 +1245,10 @@ class JobEvent(CreatedModifiedModel):
update_fields.append('changed') update_fields.append('changed')
if update_fields: if update_fields:
parent.save(update_fields=update_fields, from_parent_update=True) parent.save(update_fields=update_fields, from_parent_update=True)
parent.update_parent_failed_and_changed() parent._update_parent_failed_and_changed()
def update_hosts(self, extra_host_pks=None): def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
from awx.main.models.inventory import Host from awx.main.models.inventory import Host
extra_host_pks = set(extra_host_pks or []) extra_host_pks = set(extra_host_pks or [])
hostnames = set() hostnames = set()
@@ -1256,16 +1262,14 @@ class JobEvent(CreatedModifiedModel):
pass pass
qs = Host.objects.filter(inventory__jobs__id=self.job_id) qs = Host.objects.filter(inventory__jobs__id=self.job_id)
qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks)) qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id) qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs.only('id'): for host in qs:
self.hosts.add(host) self.hosts.add(host)
if self.parent: if self.parent_id:
self.parent.update_hosts(self.hosts.only('id').values_list('id', flat=True)) self.parent._update_hosts(qs.values_list('id', flat=True))
def update_host_summary_from_stats(self): def _update_host_summary_from_stats(self):
from awx.main.models.inventory import Host from awx.main.models.inventory import Host
if self.event != 'playbook_on_stats':
return
hostnames = set() hostnames = set()
try: try:
for v in self.event_data.values(): for v in self.event_data.values():
@@ -1276,7 +1280,6 @@ class JobEvent(CreatedModifiedModel):
qs = Host.objects.filter(inventory__jobs__id=self.job_id, qs = Host.objects.filter(inventory__jobs__id=self.job_id,
name__in=hostnames) name__in=hostnames)
job = self.job job = self.job
#for host in qs.only('id', 'name'):
for host in hostnames: for host in hostnames:
host_stats = {} host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'): for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
@@ -1300,6 +1303,112 @@ class JobEvent(CreatedModifiedModel):
job.inventory.update_computed_fields() job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id)) emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id))
def save(self, *args, **kwargs):
from awx.main.models.inventory import Host
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
# Update parent related field if not set.
if self.parent_id is None:
self.parent_id = self._find_parent_id()
if self.parent_id and 'parent_id' not in update_fields:
update_fields.append('parent_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if self.parent_id:
self._update_parent_failed_and_changed()
# FIXME: The update_hosts() call (and its queries) are the current
# performance bottleneck....
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_host_summary_from_stats()
@classmethod
def create_from_data(self, **kwargs):
# Must have a job_id specified.
if not kwargs.get('job_id', None):
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Save UUID and parent UUID for determining parent-child relationship.
job_event_uuid = kwargs.get('uuid', None)
parent_event_uuid = kwargs.get('parent_uuid', None)
artifact_data = kwargs.get('artifact_data', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play',
'role', 'task', 'created', 'counter', 'uuid', 'stdout',
'start_line', 'end_line', 'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
# Try to find a parent event based on UUID.
if parent_event_uuid:
cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid)
parent_id = cache.get(cache_key)
if parent_id is None:
parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first()
if parent_id:
print("Settings cache: {} with value {}".format(cache_key, parent_id))
cache.set(cache_key, parent_id, 300)
if parent_id:
kwargs['parent_id'] = parent_id
job_event = JobEvent.objects.create(**kwargs)
# Cache this job event ID vs. UUID for future parent lookups.
if job_event_uuid:
cache_key = '{}_{}'.format(kwargs['job_id'], job_event_uuid)
cache.set(cache_key, job_event.id, 300)
# Save artifact data to parent job (if provided).
if artifact_data:
artifact_dict = json.loads(artifact_data)
event_data = kwargs.get('event_data', None)
if event_data and isinstance(event_data, dict):
res = event_data.get('res', None)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
parent_job = Job.objects.filter(pk=kwargs['job_id']).first()
if parent_job and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@classmethod @classmethod
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None): def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
''' '''

View File

@@ -696,8 +696,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return StringIO(msg['missing' if self.finished else 'pending']) return StringIO(msg['missing' if self.finished else 'pending'])
def _escape_ascii(self, content): def _escape_ascii(self, content):
ansi_escape = re.compile(r'\x1b[^m]*m') # Remove ANSI escape sequences used to embed event data.
return ansi_escape.sub('', content) content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
# Remove ANSI color escape sequences.
content = re.sub(r'\x1b[^m]*m', '', content)
return content
def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False): def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False):
content = self.result_stdout_raw_handle().read() content = self.result_stdout_raw_handle().read()

View File

@@ -1,9 +1,19 @@
# Copyright (c) 2015 Ansible, Inc. # Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved. # All Rights Reserved.
# Python
import json import json
import logging
import os
# Django
from django.conf import settings
# Kombu
from kombu import Connection, Exchange, Producer
__all__ = ['FifoQueue', 'CallbackQueueDispatcher']
__all__ = ['FifoQueue']
# TODO: Figure out wtf to do with this class # TODO: Figure out wtf to do with this class
class FifoQueue(object): class FifoQueue(object):
@@ -33,3 +43,39 @@ class FifoQueue(object):
answer = None answer = None
if answer: if answer:
return json.loads(answer) return json.loads(answer)
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'CALLBACK_CONNECTION', None)
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
def dispatch(self, obj):
if not self.callback_connection or not self.connection_queue:
return
active_pid = os.getpid()
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
producer.publish(obj,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)

View File

@@ -47,9 +47,11 @@ from django.contrib.auth.models import User
from awx.main.constants import CLOUD_PROVIDERS from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa from awx.main.models import * # noqa
from awx.main.models import UnifiedJob from awx.main.models import UnifiedJob
from awx.main.queue import CallbackQueueDispatcher
from awx.main.task_engine import TaskEnhancer from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
OutputEventFilter)
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
@@ -397,6 +399,8 @@ class BaseTask(Task):
if os.path.isdir(os.path.join(venv_libdir, python_ver)): if os.path.isdir(os.path.join(venv_libdir, python_ver)):
env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":" env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":"
break break
# Add awx/lib to PYTHONPATH.
env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')]))
return env return env
def add_tower_venv(self, env): def add_tower_venv(self, env):
@@ -494,6 +498,17 @@ class BaseTask(Task):
''' '''
return OrderedDict() return OrderedDict()
def get_stdout_handle(self, instance):
'''
Return an open file object for capturing stdout.
'''
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1())))
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
assert stdout_handle.name == stdout_filename
return stdout_handle
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None, extra_update_fields=None): output_replacements=None, extra_update_fields=None):
''' '''
@@ -643,10 +658,7 @@ class BaseTask(Task):
cwd = self.build_cwd(instance, **kwargs) cwd = self.build_cwd(instance, **kwargs)
env = self.build_env(instance, **kwargs) env = self.build_env(instance, **kwargs)
safe_env = self.build_safe_env(instance, **kwargs) safe_env = self.build_safe_env(instance, **kwargs)
if not os.path.exists(settings.JOBOUTPUT_ROOT): stdout_handle = self.get_stdout_handle(instance)
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (pk, str(uuid.uuid1())))
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
if self.should_use_proot(instance, **kwargs): if self.should_use_proot(instance, **kwargs):
if not check_proot_installed(): if not check_proot_installed():
raise RuntimeError('proot is not installed') raise RuntimeError('proot is not installed')
@@ -660,7 +672,7 @@ class BaseTask(Task):
args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields) extra_update_fields=extra_update_fields)
except Exception: except Exception:
@@ -779,6 +791,7 @@ class RunJob(BaseTask):
if job.project: if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display'
env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or '' env['REST_API_TOKEN'] = job.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -974,6 +987,25 @@ class RunJob(BaseTask):
d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password' d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password'
return d return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunJob, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data):
event_data.setdefault('job_id', instance.id)
dispatcher.dispatch(event_data)
else:
def job_event_callback(event_data):
event_data.setdefault('job_id', instance.id)
JobEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, job_event_callback)
def get_ssh_key_path(self, instance, **kwargs): def get_ssh_key_path(self, instance, **kwargs):
''' '''
If using an SSH key, return the path for use by ssh-agent. If using an SSH key, return the path for use by ssh-agent.
@@ -1019,11 +1051,6 @@ class RunJob(BaseTask):
pass pass
else: else:
update_inventory_computed_fields.delay(inventory.id, True) update_inventory_computed_fields.delay(inventory.id, True)
# Update job event fields after job has completed (only when using REST
# API callback).
if not getattr(settings, 'CALLBACK_CONSUMER_PORT', None) and not getattr(settings, 'CALLBACK_QUEUE', None):
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)
class RunProjectUpdate(BaseTask): class RunProjectUpdate(BaseTask):
@@ -1597,6 +1624,7 @@ class RunAdHocCommand(BaseTask):
env['INVENTORY_HOSTVARS'] = str(True) env['INVENTORY_HOSTVARS'] = str(True)
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1' env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline).
env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = ad_hoc_command.task_auth_token or '' env['REST_API_TOKEN'] = ad_hoc_command.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -1693,6 +1721,25 @@ class RunAdHocCommand(BaseTask):
d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password' d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password'
return d return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def ad_hoc_command_event_callback(event_data):
event_data.setdefault('ad_hoc_command_id', instance.id)
dispatcher.dispatch(event_data)
else:
def ad_hoc_command_event_callback(event_data):
event_data.setdefault('ad_hoc_command_id', instance.id)
AdHocCommandEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)
def get_ssh_key_path(self, instance, **kwargs): def get_ssh_key_path(self, instance, **kwargs):
''' '''
If using an SSH key, return the path for use by ssh-agent. If using an SSH key, return the path for use by ssh-agent.

View File

@@ -4,6 +4,7 @@
# Python # Python
import base64 import base64
import hashlib import hashlib
import json
import logging import logging
import os import os
import re import re
@@ -36,7 +37,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean', 'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps'] 'get_current_apps', 'set_current_apps', 'OutputEventFilter']
def get_object_or_400(klass, *args, **kwargs): def get_object_or_400(klass, *args, **kwargs):
@@ -640,3 +641,71 @@ def set_current_apps(apps):
def get_current_apps(): def get_current_apps():
global current_apps global current_apps
return current_apps return current_apps
class OutputEventFilter(object):
'''
File-like object that looks for encoded job events in stdout data.
'''
EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
def __init__(self, fileobj=None, event_callback=None):
self._fileobj = fileobj
self._event_callback = event_callback
self._counter = 1
self._start_line = 0
self._buffer = ''
self._current_event_data = None
def __getattr__(self, attr):
return getattr(self._fileobj, attr)
def write(self, data):
if self._fileobj:
self._fileobj.write(data)
self._buffer += data
while True:
match = self.EVENT_DATA_RE.search(self._buffer)
if not match:
break
try:
base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1))
event_data = json.loads(base64.b64decode(base64_data))
except ValueError:
event_data = {}
self._emit_event(self._buffer[:match.start()], event_data)
self._buffer = self._buffer[match.end():]
def close(self):
if self._fileobj:
self._fileobj.close()
if self._buffer:
self._emit_event(self._buffer)
self._buffer = ''
def _emit_event(self, buffered_stdout, next_event_data=None):
if self._current_event_data:
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]
elif buffered_stdout:
event_data = dict(event='verbose')
stdout_chunks = buffered_stdout.splitlines(True)
else:
stdout_chunks = []
for stdout_chunk in stdout_chunks:
event_data['counter'] = self._counter
self._counter += 1
event_data['stdout'] = stdout_chunk
n_lines = stdout_chunk.count('\n')
event_data['start_line'] = self._start_line
event_data['end_line'] = self._start_line + n_lines
self._start_line += n_lines
if self._event_callback:
self._event_callback(event_data)
if next_event_data.get('uuid', None):
self._current_event_data = next_event_data
else:
self._current_event_data = None

View File

@@ -1,579 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# This file is a utility Ansible plugin that is not part of the AWX or Ansible
# packages. It does not import any code from either package, nor does its
# license apply to Ansible or AWX.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# Neither the name of the <ORGANIZATION> nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Python
import datetime
import glob
import json
import logging
import os
import pwd
import urlparse
import re
from copy import deepcopy
from uuid import uuid4
# Kombu
from kombu import Connection, Exchange, Producer
# Requests
import requests
import psutil
CENSOR_FIELD_WHITELIST = [
'msg',
'failed',
'changed',
'results',
'start',
'end',
'delta',
'cmd',
'_ansible_no_log',
'rc',
'failed_when_result',
'skipped',
'skip_reason',
]
def censor(obj, no_log=False):
if not isinstance(obj, dict):
if no_log:
return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return obj
if obj.get('_ansible_no_log', no_log):
new_obj = {}
for k in CENSOR_FIELD_WHITELIST:
if k in obj:
new_obj[k] = obj[k]
if k == 'cmd' and k in obj:
if isinstance(obj['cmd'], list):
obj['cmd'] = ' '.join(obj['cmd'])
if re.search(r'\s', obj['cmd']):
new_obj['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
r'\1 <censored>',
obj['cmd'])
new_obj['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
obj = new_obj
if 'results' in obj:
if isinstance(obj['results'], list):
for i in xrange(len(obj['results'])):
obj['results'][i] = censor(obj['results'][i], obj.get('_ansible_no_log', no_log))
elif obj.get('_ansible_no_log', False):
obj['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return obj
class TokenAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, request):
request.headers['Authorization'] = 'Token %s' % self.token
return request
# TODO: non v2_ events are deprecated and should be purge/refactored out
class BaseCallbackModule(object):
'''
Callback module for logging ansible-playbook job events via the REST API.
'''
def __init__(self):
self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '')
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
self._init_connection()
self.counter = 0
self.active_playbook = None
self.active_play = None
self.active_task = None
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def _init_connection(self):
self.connection = None
def _start_connection(self):
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
def _post_job_event_queue_msg(self, event, event_data):
self.counter += 1
msg = {
'event': event,
'event_data': event_data,
'counter': self.counter,
'created': datetime.datetime.utcnow().isoformat(),
}
if event in ('playbook_on_play_start',
'playbook_on_stats',
'playbook_on_vars_prompt'):
msg['parent_uuid'] = str(self.active_playbook)
elif event in ('playbook_on_notify',
'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_include',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
msg['parent_uuid'] = str(self.active_play)
elif event.startswith('runner_on_') or event.startswith('runner_item_on_'):
msg['parent_uuid'] = str(self.active_task)
else:
msg['parent_uuid'] = ''
if "uuid" in event_data:
msg['uuid'] = str(event_data['uuid'])
else:
msg['uuid'] = ''
if getattr(self, 'job_id', None):
msg['job_id'] = self.job_id
if getattr(self, 'ad_hoc_command_id', None):
msg['ad_hoc_command_id'] = self.ad_hoc_command_id
if getattr(self, 'artifact_data', None):
msg['artifact_data'] = self.artifact_data
active_pid = os.getpid()
if self.job_callback_debug:
msg.update({
'pid': active_pid,
})
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self._init_connection()
if self.connection is None:
self._start_connection()
producer = Producer(self.connection)
producer.publish(msg,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
retry_count += 1
if retry_count >= 3:
break
def _post_rest_api_event(self, event, event_data):
data = json.dumps({
'event': event,
'event_data': event_data,
})
parts = urlparse.urlsplit(self.base_url)
if parts.username and parts.password:
auth = (parts.username, parts.password)
elif self.auth_token:
auth = TokenAuth(self.auth_token)
else:
auth = None
port = parts.port or (443 if parts.scheme == 'https' else 80)
url = urlparse.urlunsplit([parts.scheme,
'%s:%d' % (parts.hostname, port),
parts.path, parts.query, parts.fragment])
url = urlparse.urljoin(url, self.rest_api_path)
headers = {'content-type': 'application/json'}
response = requests.post(url, data=data, headers=headers, auth=auth)
response.raise_for_status()
def _log_event(self, event, **event_data):
if 'res' in event_data:
event_data['res'] = censor(deepcopy(event_data['res']))
if self.callback_connection:
self._post_job_event_queue_msg(event, event_data)
else:
self._post_rest_api_event(event, event_data)
def on_any(self, *args, **kwargs):
pass
def runner_on_failed(self, host, res, ignore_errors=False):
self._log_event('runner_on_failed', host=host, res=res,
ignore_errors=ignore_errors)
def v2_runner_on_failed(self, result, ignore_errors=False):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_failed', host=result._host.name,
res=result._result, task=result._task,
ignore_errors=ignore_errors, event_loop=event_is_loop)
def runner_on_ok(self, host, res):
self._log_event('runner_on_ok', host=host, res=res)
def v2_runner_on_ok(self, result):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_ok', host=result._host.name,
task=result._task, res=result._result,
event_loop=event_is_loop)
def runner_on_error(self, host, msg):
self._log_event('runner_on_error', host=host, msg=msg)
def v2_runner_on_error(self, result):
pass # Currently not implemented in v2
def runner_on_skipped(self, host, item=None):
self._log_event('runner_on_skipped', host=host, item=item)
def v2_runner_on_skipped(self, result):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_skipped', host=result._host.name,
task=result._task, event_loop=event_is_loop)
def runner_on_unreachable(self, host, res):
self._log_event('runner_on_unreachable', host=host, res=res)
def v2_runner_on_unreachable(self, result):
self._log_event('runner_on_unreachable', host=result._host.name,
task=result._task, res=result._result)
def runner_on_no_hosts(self):
self._log_event('runner_on_no_hosts')
def v2_runner_on_no_hosts(self, task):
self._log_event('runner_on_no_hosts', task=task)
# V2 does not use the _on_async callbacks (yet).
def runner_on_async_poll(self, host, res, jid, clock):
self._log_event('runner_on_async_poll', host=host, res=res, jid=jid,
clock=clock)
def runner_on_async_ok(self, host, res, jid):
self._log_event('runner_on_async_ok', host=host, res=res, jid=jid)
def runner_on_async_failed(self, host, res, jid):
self._log_event('runner_on_async_failed', host=host, res=res, jid=jid)
def runner_on_file_diff(self, host, diff):
self._log_event('runner_on_file_diff', host=host, diff=diff)
def v2_runner_on_file_diff(self, result, diff):
self._log_event('runner_on_file_diff', host=result._host.name,
task=result._task, diff=diff)
def v2_runner_item_on_ok(self, result):
self._log_event('runner_item_on_ok', res=result._result, host=result._host.name,
task=result._task)
def v2_runner_item_on_failed(self, result):
self._log_event('runner_item_on_failed', res=result._result, host=result._host.name,
task=result._task)
def v2_runner_item_on_skipped(self, result):
self._log_event('runner_item_on_skipped', res=result._result, host=result._host.name,
task=result._task)
@staticmethod
def terminate_ssh_control_masters():
# Determine if control persist is being used and if any open sockets
# exist after running the playbook.
cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
if not cp_path:
return
cp_dir = os.path.dirname(cp_path)
if not os.path.exists(cp_dir):
return
cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
cp_files = glob.glob(cp_pattern)
if not cp_files:
return
# Attempt to find any running control master processes.
username = pwd.getpwuid(os.getuid())[0]
ssh_cm_procs = []
for proc in psutil.process_iter():
try:
pname = proc.name()
pcmdline = proc.cmdline()
pusername = proc.username()
except psutil.NoSuchProcess:
continue
if pusername != username:
continue
if pname != 'ssh':
continue
for cp_file in cp_files:
if pcmdline and cp_file in pcmdline[0]:
ssh_cm_procs.append(proc)
break
# Terminate then kill control master processes. Workaround older
# version of psutil that may not have wait_procs implemented.
for proc in ssh_cm_procs:
proc.terminate()
procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
for proc in procs_alive:
proc.kill()
class JobCallbackModule(BaseCallbackModule):
'''
Callback module for logging ansible-playbook job events via the REST API.
'''
# These events should never have an associated play.
EVENTS_WITHOUT_PLAY = [
'playbook_on_start',
'playbook_on_stats',
]
# These events should never have an associated task.
EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
'playbook_on_setup',
'playbook_on_notify',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
]
def __init__(self):
self.job_id = int(os.getenv('JOB_ID', '0'))
self.rest_api_path = '/api/v1/jobs/%d/job_events/' % self.job_id
super(JobCallbackModule, self).__init__()
def _log_event(self, event, **event_data):
play = getattr(self, 'play', None)
play_name = getattr(play, 'name', '')
if play_name and event not in self.EVENTS_WITHOUT_PLAY:
event_data['play'] = play_name
task = event_data.pop('task', None) or getattr(self, 'task', None)
task_name = None
role_name = None
if task:
if hasattr(task, 'get_name'):
# in v2, the get_name() method creates the name
task_name = task.get_name()
else:
# v1 datastructure
task_name = getattr(task, 'name', '')
if hasattr(task, '_role') and task._role:
# v2 datastructure
role_name = task._role._role_name
else:
# v1 datastructure
role_name = getattr(task, 'role_name', '')
if task_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['task'] = task_name
if role_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['role'] = role_name
self.artifact_data = None
if 'res' in event_data and 'artifact_data' in event_data['res']:
self.artifact_data = event_data['res']['artifact_data']
super(JobCallbackModule, self)._log_event(event, **event_data)
def playbook_on_start(self):
self._log_event('playbook_on_start')
def v2_playbook_on_start(self, playbook):
# NOTE: the playbook parameter was added late in Ansible 2.0 development
# so we don't currently utilize but could later.
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them
self.active_playbook = str(uuid4())
self._log_event('playbook_on_start', uuid=self.active_playbook)
def playbook_on_notify(self, host, handler):
self._log_event('playbook_on_notify', host=host, handler=handler)
def v2_playbook_on_notify(self, result, handler):
self._log_event('playbook_on_notify', host=result._host.name,
task=result._task, handler=handler)
def playbook_on_no_hosts_matched(self):
self._log_event('playbook_on_no_hosts_matched')
def v2_playbook_on_no_hosts_matched(self):
# since there is no task/play info, this is currently identical
# to the v1 callback which does the same thing
self.playbook_on_no_hosts_matched()
def playbook_on_no_hosts_remaining(self):
self._log_event('playbook_on_no_hosts_remaining')
def v2_playbook_on_no_hosts_remaining(self):
# since there is no task/play info, this is currently identical
# to the v1 callback which does the same thing
self.playbook_on_no_hosts_remaining()
def playbook_on_task_start(self, name, is_conditional):
self._log_event('playbook_on_task_start', name=name,
is_conditional=is_conditional)
def v2_playbook_on_task_start(self, task, is_conditional):
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name(), is_conditional=is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# re-using playbook_on_task_start event here for this v2-specific
# event, though we may consider any changes necessary to distinguish
# this from a normal task
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name())
def playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
self._log_event('playbook_on_vars_prompt', varname=varname,
private=private, prompt=prompt, encrypt=encrypt,
confirm=confirm, salt_size=salt_size, salt=salt,
default=default)
def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
pass # not currently used in v2 (yet)
def playbook_on_setup(self):
self._log_event('playbook_on_setup')
def v2_playbook_on_setup(self):
pass # not currently used in v2 (yet)
def playbook_on_import_for_host(self, host, imported_file):
# don't care about recording this one
# self._log_event('playbook_on_import_for_host', host=host,
# imported_file=imported_file)
pass
def v2_playbook_on_import_for_host(self, result, imported_file):
pass # not currently used in v2 (yet)
def playbook_on_not_import_for_host(self, host, missing_file):
# don't care about recording this one
#self._log_event('playbook_on_not_import_for_host', host=host,
# missing_file=missing_file)
pass
def v2_playbook_on_not_import_for_host(self, result, missing_file):
pass # not currently used in v2 (yet)
def playbook_on_play_start(self, name):
# Only play name is passed via callback, get host pattern from the play.
pattern = getattr(getattr(self, 'play', None), 'hosts', name)
self._log_event('playbook_on_play_start', name=name, pattern=pattern)
def v2_playbook_on_play_start(self, play):
setattr(self, 'play', play)
# Ansible 2.0.0.2 doesn't default .name to hosts like it did in 1.9.4,
# though that default will likely return in a future version of Ansible.
if (not hasattr(play, 'name') or not play.name) and hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
play.name = ','.join(play.hosts)
else:
play.name = play.hosts
self.active_play = play._uuid
self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid),
pattern=play.hosts)
def playbook_on_stats(self, stats):
d = {}
for attr in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
d[attr] = getattr(stats, attr)
self._log_event('playbook_on_stats', **d)
self.terminate_ssh_control_masters()
def v2_playbook_on_stats(self, stats):
self.playbook_on_stats(stats)
def v2_playbook_on_include(self, included_file):
self._log_event('playbook_on_include', included_file=included_file)
class AdHocCommandCallbackModule(BaseCallbackModule):
'''
Callback module for logging ansible ad hoc events via ZMQ or the REST API.
'''
def __init__(self):
self.ad_hoc_command_id = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
self.rest_api_path = '/api/v1/ad_hoc_commands/%d/events/' % self.ad_hoc_command_id
self.skipped_hosts = set()
super(AdHocCommandCallbackModule, self).__init__()
def _log_event(self, event, **event_data):
# Ignore task for ad hoc commands (with v2).
event_data.pop('task', None)
super(AdHocCommandCallbackModule, self)._log_event(event, **event_data)
def runner_on_file_diff(self, host, diff):
pass # Ignore file diff for ad hoc commands.
def runner_on_ok(self, host, res):
# When running in check mode using a module that does not support check
# mode, Ansible v1.9 will call runner_on_skipped followed by
# runner_on_ok for the same host; only capture the skipped event and
# ignore the ok event.
if host not in self.skipped_hosts:
super(AdHocCommandCallbackModule, self).runner_on_ok(host, res)
def runner_on_skipped(self, host, item=None):
super(AdHocCommandCallbackModule, self).runner_on_skipped(host, item)
self.skipped_hosts.add(host)
if os.getenv('JOB_ID', ''):
CallbackModule = JobCallbackModule
elif os.getenv('AD_HOC_COMMAND_ID', ''):
CallbackModule = AdHocCommandCallbackModule

View File

@@ -0,0 +1,30 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
import sys
# Add awx/lib to sys.path.
awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
if awx_lib_path not in sys.path:
sys.path.insert(0, awx_lib_path)
# Tower Display Callback
from tower_display_callback import TowerMinimalCallbackModule as CallbackModule # noqa

View File

@@ -0,0 +1,30 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
import sys
# Add awx/lib to sys.path.
awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
if awx_lib_path not in sys.path:
sys.path.insert(0, awx_lib_path)
# Tower Display Callback
from tower_display_callback import TowerDefaultCallbackModule as CallbackModule # noqa