Add support for capturing stdout associated with job events and ad hoc command events.

* New event types for stdout lines not associated with a callback event.
* New stdout, start_line, end_line and verbosity fields for job/ahc events.
* Callback plugins to wrap Ansible default/minimal stdout callbacks and embed callback event data using ANSI escape sequences.
* Callback plugin library to wrap ansible.display.Display class methods.
* Output filter to extract event data from stdout and create job/ahc events.
* Update stdout formats to strip new ANSI escape sequences.
This commit is contained in:
Chris Church
2016-10-22 00:15:49 -04:00
parent d253eabe5d
commit c18b6c1352
20 changed files with 1387 additions and 850 deletions

View File

@@ -189,8 +189,6 @@ class TaskPermission(ModelAccessPermission):
# token.
if view.model == Inventory and request.method.lower() in ('head', 'get'):
return bool(not obj or obj.pk == unified_job.inventory_id)
elif view.model in (JobEvent, AdHocCommandEvent) and request.method.lower() == 'post':
return bool(not obj or obj.pk == unified_job.pk)
else:
return False

View File

@@ -2417,7 +2417,9 @@ class JobEventSerializer(BaseSerializer):
model = JobEvent
fields = ('*', '-name', '-description', 'job', 'event', 'counter',
'event_display', 'event_data', 'event_level', 'failed',
'changed', 'host', 'host_name', 'parent', 'play', 'task', 'role')
'changed', 'uuid', 'host', 'host_name', 'parent', 'playbook',
'play', 'task', 'role', 'stdout', 'start_line', 'end_line',
'verbosity')
def get_related(self, obj):
res = super(JobEventSerializer, self).get_related(obj)
@@ -2453,16 +2455,8 @@ class AdHocCommandEventSerializer(BaseSerializer):
model = AdHocCommandEvent
fields = ('*', '-name', '-description', 'ad_hoc_command', 'event',
'counter', 'event_display', 'event_data', 'failed',
'changed', 'host', 'host_name')
def to_internal_value(self, data):
ret = super(AdHocCommandEventSerializer, self).to_internal_value(data)
# AdHocCommandAdHocCommandEventsList should be the only view creating
# AdHocCommandEvent instances, so keep the ad_hoc_command it sets, even
# though ad_hoc_command is a read-only field.
if 'ad_hoc_command' in data:
ret['ad_hoc_command'] = data['ad_hoc_command']
return ret
'changed', 'uuid', 'host', 'host_name', 'stdout',
'start_line', 'end_line', 'verbosity')
def get_related(self, obj):
res = super(AdHocCommandEventSerializer, self).get_related(obj)

View File

@@ -10,8 +10,9 @@ import time
import socket
import sys
import logging
from base64 import b64encode
from base64 import b64encode, b64decode
from collections import OrderedDict
from HTMLParser import HTMLParser
# Django
from django.conf import settings
@@ -3050,21 +3051,6 @@ class GroupJobEventsList(BaseJobEventsList):
class JobJobEventsList(BaseJobEventsList):
parent_model = Job
authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = (TaskPermission,)
# Post allowed for job event callback only.
def post(self, request, *args, **kwargs):
parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
data = request.data.copy()
data['job'] = parent_obj.pk
serializer = self.get_serializer(data=data)
if serializer.is_valid():
self.instance = serializer.save()
headers = {'Location': serializer.data['url']}
return Response(serializer.data, status=status.HTTP_201_CREATED,
headers=headers)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class JobJobPlaysList(BaseJobEventsList):
@@ -3455,25 +3441,8 @@ class HostAdHocCommandEventsList(BaseAdHocCommandEventsList):
class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList):
parent_model = AdHocCommand
authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = (TaskPermission,)
new_in_220 = True
# Post allowed for ad hoc event callback only.
def post(self, request, *args, **kwargs):
if request.user:
raise PermissionDenied()
parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
data = request.data.copy()
data['ad_hoc_command'] = parent_obj
serializer = self.get_serializer(data=data)
if serializer.is_valid():
self.instance = serializer.save()
headers = {'Location': serializer.data['url']}
return Response(serializer.data, status=status.HTTP_201_CREATED,
headers=headers)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class AdHocCommandActivityStreamList(SubListAPIView):
@@ -3583,7 +3552,11 @@ class UnifiedJobStdout(RetrieveAPIView):
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content))
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),

22
awx/lib/sitecustomize.py Normal file
View File

@@ -0,0 +1,22 @@
# Python
import os
import sys
# Based on http://stackoverflow.com/a/6879344/131141 -- Initialize tower display
# callback as early as possible to wrap ansible.display.Display methods.
def argv_ready(argv):
if argv and os.path.basename(argv[0]) in {'ansible', 'ansible-playbook'}:
import tower_display_callback
class argv_placeholder(object):
def __del__(self):
argv_ready(sys.argv)
if hasattr(sys, 'argv'):
argv_ready(sys.argv)
else:
sys.argv = argv_placeholder()

View File

@@ -0,0 +1,25 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Tower Display Callback
from . import cleanup # to register control persistent cleanup.
from . import display # to wrap ansible.display.Display methods.
from .module import TowerDefaultCallbackModule, TowerMinimalCallbackModule
__all__ = ['TowerDefaultCallbackModule', 'TowerMinimalCallbackModule']

View File

@@ -0,0 +1,72 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import atexit
import glob
import os
import pwd
# PSUtil
import psutil
__all__ = []
@atexit.register
def terminate_ssh_control_masters():
# Determine if control persist is being used and if any open sockets
# exist after running the playbook.
cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
if not cp_path:
return
cp_dir = os.path.dirname(cp_path)
if not os.path.exists(cp_dir):
return
cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
cp_files = glob.glob(cp_pattern)
if not cp_files:
return
# Attempt to find any running control master processes.
username = pwd.getpwuid(os.getuid())[0]
ssh_cm_procs = []
for proc in psutil.process_iter():
try:
pname = proc.name()
pcmdline = proc.cmdline()
pusername = proc.username()
except psutil.NoSuchProcess:
continue
if pusername != username:
continue
if pname != 'ssh':
continue
for cp_file in cp_files:
if pcmdline and cp_file in pcmdline[0]:
ssh_cm_procs.append(proc)
break
# Terminate then kill control master processes. Workaround older
# version of psutil that may not have wait_procs implemented.
for proc in ssh_cm_procs:
proc.terminate()
procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
for proc in procs_alive:
proc.kill()

View File

@@ -0,0 +1,92 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import cgi
import contextlib
import functools
import json
import sys
import uuid
# Ansible
from ansible.utils.display import Display
# Tower Display Callback
from tower_display_callback.events import event_context
__all__ = []
def with_context(**context):
global event_context
def wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
return wrap
for attr in dir(Display):
if attr.startswith('_') or 'cow' in attr or 'prompt' in attr:
continue
if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'):
continue
if not callable(getattr(Display, attr)):
continue
setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr)))
def with_verbosity(f):
global event_context
@functools.wraps(f)
def wrapper(*args, **kwargs):
host = args[2] if len(args) >= 3 else kwargs.get('host', None)
caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2)
context = dict(verbose=True, verbosity=(caplevel + 1))
if host is not None:
context['remote_addr'] = host
with event_context.set_local(**context):
return f(*args, **kwargs)
return wrapper
Display.verbose = with_verbosity(Display.verbose)
def display_with_context(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
fileobj = sys.stderr if stderr else sys.stdout
event_uuid = event_context.get().get('uuid', None)
try:
if not log_only and not event_uuid:
event_context.add_local(uuid=str(uuid.uuid4()))
event_context.dump_begin(fileobj)
return f(*args, **kwargs)
finally:
if not log_only and not event_uuid:
event_context.dump_end(fileobj)
event_context.remove_local(uuid=None)
return wrapper
Display.display = display_with_context(Display.display)

View File

@@ -0,0 +1,138 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import base64
import cgi
import contextlib
import datetime
import json
import os
import threading
import uuid
__all__ = ['event_context']
class EventContext(object):
'''
Store global and local (per thread/process) data associated with callback
events and other display output methods.
'''
def add_local(self, **kwargs):
if not hasattr(self, '_local'):
self._local = threading.local()
self._local._ctx = {}
self._local._ctx.update(kwargs)
def remove_local(self, **kwargs):
if hasattr(self, '_local'):
for key in kwargs.keys():
self._local._ctx.pop(key, None)
@contextlib.contextmanager
def set_local(self, **kwargs):
try:
self.add_local(**kwargs)
yield
finally:
self.remove_local(**kwargs)
def get_local(self):
return getattr(getattr(self, '_local', None), '_ctx', {})
def add_global(self, **kwargs):
if not hasattr(self, '_global_ctx'):
self._global_ctx = {}
self._global_ctx.update(kwargs)
def remove_global(self, **kwargs):
if hasattr(self, '_global_ctx'):
for key in kwargs.keys():
self._global_ctx.pop(key, None)
@contextlib.contextmanager
def set_global(self, **kwargs):
try:
self.add_global(**kwargs)
yield
finally:
self.remove_global(**kwargs)
def get_global(self):
return getattr(self, '_global_ctx', {})
def get(self):
ctx = {}
ctx.update(self.get_global())
ctx.update(self.get_local())
return ctx
def get_begin_dict(self):
event_data = self.get()
if os.getenv('JOB_ID', ''):
event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
if os.getenv('AD_HOC_COMMAND_ID', ''):
event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
event_data.setdefault('pid', os.getpid())
event_data.setdefault('uuid', str(uuid.uuid4()))
event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
if not event_data.get('parent_uuid', None) and event_data.get('job_id', None):
for key in ('task_uuid', 'play_uuid', 'playbook_uuid'):
parent_uuid = event_data.get(key, None)
if parent_uuid and parent_uuid != event_data.get('uuid', None):
event_data['parent_uuid'] = parent_uuid
break
event = event_data.pop('event', None)
if not event:
event = 'verbose'
for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'):
if event_data.get(key, False):
event = key
break
event_dict = dict(event=event, event_data=event_data)
for key in event_data.keys():
if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'):
event_dict[key] = event_data.pop(key)
elif key in ('verbosity', 'pid'):
event_dict[key] = event_data[key]
return event_dict
def get_end_dict(self):
return {}
def dump(self, fileobj, data, max_width=78):
b64data = base64.b64encode(json.dumps(data))
fileobj.write(u'\x1b[K')
for offset in xrange(0, len(b64data), max_width):
chunk = b64data[offset:offset + max_width]
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write(u'\x1b[K')
def dump_begin(self, fileobj):
self.dump(fileobj, self.get_begin_dict())
def dump_end(self, fileobj):
self.dump(fileobj, self.get_end_dict())
event_context = EventContext()

View File

@@ -0,0 +1,28 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
# Ansible
import ansible
# Because of the way Ansible loads plugins, it's not possible to import
# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh.
execfile(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py'))

View File

@@ -0,0 +1,443 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import contextlib
import copy
import os
import re
import sys
import uuid
# Ansible
from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
# Tower Display Callback
from tower_display_callback.events import event_context
from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule
class BaseCallbackModule(CallbackBase):
'''
Callback module for logging ansible/ansible-playbook events.
'''
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
# These events should never have an associated play.
EVENTS_WITHOUT_PLAY = [
'playbook_on_start',
'playbook_on_stats',
]
# These events should never have an associated task.
EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
'playbook_on_setup',
'playbook_on_notify',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
]
CENSOR_FIELD_WHITELIST = [
'msg',
'failed',
'changed',
'results',
'start',
'end',
'delta',
'cmd',
'_ansible_no_log',
'rc',
'failed_when_result',
'skipped',
'skip_reason',
]
def __init__(self):
super(BaseCallbackModule, self).__init__()
self.task_uuids = set()
def censor_result(self, res, no_log=False):
if not isinstance(res, dict):
if no_log:
return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return res
if res.get('_ansible_no_log', no_log):
new_res = {}
for k in self.CENSOR_FIELD_WHITELIST:
if k in res:
new_res[k] = res[k]
if k == 'cmd' and k in res:
if isinstance(res['cmd'], list):
res['cmd'] = ' '.join(res['cmd'])
if re.search(r'\s', res['cmd']):
new_res['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
r'\1 <censored>',
res['cmd'])
new_res['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
res = new_res
if 'results' in res:
if isinstance(res['results'], list):
for i in xrange(len(res['results'])):
res['results'][i] = self.censor_result(res['results'][i], res.get('_ansible_no_log', no_log))
elif res.get('_ansible_no_log', False):
res['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return res
@contextlib.contextmanager
def capture_event_data(self, event, **event_data):
event_data.setdefault('uuid', str(uuid.uuid4()))
if 'res' in event_data:
event_data['res'] = self.censor_result(copy.deepcopy(event_data['res']))
res = event_data.get('res', None)
if res and isinstance(res, dict):
if 'artifact_data' in res:
event_data['artifact_data'] = res['artifact_data']
if event not in self.EVENTS_WITHOUT_TASK:
task = event_data.pop('task', None)
else:
task = None
try:
event_context.add_local(event=event, **event_data)
if task:
self.set_task(task, local=True)
event_context.dump_begin(sys.stdout)
yield
finally:
event_context.dump_end(sys.stdout)
if task:
self.clear_task(local=True)
event_context.remove_local(event=None, **event_data)
def set_playbook(self, playbook):
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.
self.playbook_uuid = str(uuid.uuid4())
file_name = getattr(playbook, '_file_name', '???')
event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid)
self.clear_play()
def set_play(self, play):
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern)
self.clear_task()
def clear_play(self):
event_context.remove_global(play=None, play_uuid=None, play_pattern=None)
self.clear_task()
def set_task(self, task, local=False):
# FIXME: Task is "global" unless using free strategy!
task_ctx = dict(
task=(task.name or task.action),
task_path=task.get_path(),
task_uuid=str(task._uuid),
task_action=task.action,
)
if not task.no_log:
task_args = ', '.join(('%s=%s' % a for a in task.args.items()))
task_ctx['task_args'] = task_args
if getattr(task, '_role', None):
task_role = task._role._role_name
else:
task_role = getattr(task, 'role_name', '')
if task_role:
task_ctx['role'] = task_role
if local:
event_context.add_local(**task_ctx)
else:
event_context.add_global(**task_ctx)
def clear_task(self, local=False):
task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None)
if local:
event_context.remove_local(**task_ctx)
else:
event_context.remove_global(**task_ctx)
def v2_playbook_on_start(self, playbook):
self.set_playbook(playbook)
event_data = dict(
uuid=self.playbook_uuid,
)
with self.capture_event_data('playbook_on_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_start(playbook)
def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
return # not currently used in v2 (yet) - FIXME: Confirm this is still the case?
event_data = dict(
varname=varname,
private=private,
prompt=prompt,
encrypt=encrypt,
confirm=confirm,
salt_size=salt_size,
salt=salt,
default=default,
)
with self.capture_event_data('playbook_on_vars_prompt', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(varname,
private, prompt, encrypt, confirm, salt_size, salt, default)
def v2_playbook_on_include(self, included_file):
event_data = dict(
included_file=included_file,
)
with self.capture_event_data('playbook_on_include', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_include(included_file)
def v2_playbook_on_play_start(self, play):
self.set_play(play)
if hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
pattern = ','.join(play.hosts)
else:
pattern = play.hosts
else:
pattern = ''
name = play.get_name().strip() or pattern
event_data = dict(
name=name,
pattern=pattern,
uuid=str(play._uuid),
)
with self.capture_event_data('playbook_on_play_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_play_start(play)
def v2_playbook_on_import_for_host(self, result, imported_file):
return # not currently used in v2 (yet) / don't care about recording this one
with self.capture_event_data('playbook_on_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file)
def v2_playbook_on_not_import_for_host(self, result, missing_file):
return # not currently used in v2 (yet) / don't care about recording this one
with self.capture_event_data('playbook_on_not_import_for_host'):
super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file)
def v2_playbook_on_setup(self):
return # not currently used in v2 (yet)
with self.capture_event_data('playbook_on_setup'):
super(BaseCallbackModule, self).v2_playbook_on_setup()
def v2_playbook_on_task_start(self, task, is_conditional):
# FIXME: Flag task path output as vv.
task_uuid = str(task._uuid)
if task_uuid in self.task_uuids:
return
self.task_uuids.add(task_uuid)
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
is_conditional=is_conditional,
uuid=task_uuid,
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# re-using playbook_on_task_start event here for this v2-specific
# event, though we may consider any changes necessary to distinguish
# this from a normal task FIXME!
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task)
def v2_playbook_on_handler_task_start(self, task):
# re-using playbook_on_task_start event here for this v2-specific
# event, though we may consider any changes necessary to distinguish
# this from a normal task FIXME!
self.set_task(task)
event_data = dict(
task=task,
name=task.get_name(),
uuid=str(task._uuid),
)
with self.capture_event_data('playbook_on_task_start', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task)
def v2_playbook_on_no_hosts_matched(self):
with self.capture_event_data('playbook_on_no_hosts_matched'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched()
def v2_playbook_on_no_hosts_remaining(self):
with self.capture_event_data('playbook_on_no_hosts_remaining'):
super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining()
def v2_playbook_on_notify(self, result, handler):
event_data = dict(
host=result._host.name,
task=result._task,
handler=handler,
)
with self.capture_event_data('playbook_on_notify', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_notify(result, handler)
def v2_playbook_on_stats(self, stats):
self.clear_play()
# FIXME: Add count of plays/tasks.
event_data = dict(
changed=stats.changed,
dark=stats.dark,
failures=stats.failures,
ok=stats.ok,
processed=stats.processed,
skipped=stats.skipped,
)
with self.capture_event_data('playbook_on_stats', **event_data):
super(BaseCallbackModule, self).v2_playbook_on_stats(stats)
def v2_runner_on_ok(self, result):
# FIXME: Display detailed results or not based on verbosity.
event_data = dict(
host=result._host.name,
remote_addr=result._host.address,
task=result._task,
res=result._result,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_on_ok(result)
def v2_runner_on_failed(self, result, ignore_errors=False):
# FIXME: Add verbosity for exception/results output.
event_data = dict(
host=result._host.name,
res=result._result,
task=result._task,
ignore_errors=ignore_errors,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors)
def v2_runner_on_error(self, result):
pass # Not implemented in v2.
def v2_runner_on_skipped(self, result):
event_data = dict(
host=result._host.name,
task=result._task,
event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
)
with self.capture_event_data('runner_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_on_skipped(result)
def v2_runner_on_unreachable(self, result):
event_data = dict(
host=result._host.name,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_on_unreachable', **event_data):
super(BaseCallbackModule, self).v2_runner_on_unreachable(result)
def v2_runner_on_no_hosts(self, task):
event_data = dict(
task=task,
)
with self.capture_event_data('runner_on_no_hosts', **event_data):
super(BaseCallbackModule, self).v2_runner_on_no_hosts(task)
def v2_runner_on_file_diff(self, result, diff):
# FIXME: Ignore file diff for ad hoc commands?
event_data = dict(
host=result._host.name,
task=result._task,
diff=diff,
)
with self.capture_event_data('runner_on_file_diff', **event_data):
super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff)
def v2_runner_item_on_ok(self, result):
event_data = dict(
host=result._host.name,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_ok', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_ok(result)
def v2_runner_item_on_failed(self, result):
event_data = dict(
host=result._host.name,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_failed', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_failed(result)
def v2_runner_item_on_skipped(self, result):
event_data = dict(
host=result._host.name,
task=result._task,
res=result._result,
)
with self.capture_event_data('runner_item_on_skipped', **event_data):
super(BaseCallbackModule, self).v2_runner_item_on_skipped(result)
# V2 does not use the _on_async callbacks (yet).
def runner_on_async_poll(self, host, res, jid, clock):
self._log_event('runner_on_async_poll', host=host, res=res, jid=jid,
clock=clock)
def runner_on_async_ok(self, host, res, jid):
self._log_event('runner_on_async_ok', host=host, res=res, jid=jid)
def runner_on_async_failed(self, host, res, jid):
self._log_event('runner_on_async_failed', host=host, res=res, jid=jid)
class TowerDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule):
CALLBACK_NAME = 'tower_display'
class TowerMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule):
CALLBACK_NAME = 'minimal'
def v2_playbook_on_play_start(self, play):
pass
def v2_playbook_on_task_start(self, task, is_conditional):
self.set_task(task)

View File

@@ -2,7 +2,6 @@
# All Rights Reserved.
# Python
import datetime
import logging
import json
@@ -12,10 +11,7 @@ from kombu.mixins import ConsumerMixin
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django.core.cache import cache
from django.db import DatabaseError
from django.utils.dateparse import parse_datetime
from django.utils.timezone import FixedOffset
# AWX
from awx.main.models import * # noqa
@@ -36,112 +32,26 @@ class CallbackBrokerWorker(ConsumerMixin):
def process_task(self, body, message):
try:
if "event" not in body:
raise Exception("Payload does not have an event")
if "job_id" not in body:
raise Exception("Payload does not have a job_id")
if 'event' not in body:
raise Exception('Payload does not have an event')
if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG:
logger.info("Body: {}".format(body))
logger.info("Message: {}".format(message))
self.process_job_event(body)
logger.info('Body: {}'.format(body))
logger.info('Message: {}'.format(message))
try:
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc:
import traceback
traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc)
message.ack()
def process_job_event(self, payload):
# Get the correct "verbose" value from the job.
# If for any reason there's a problem, just use 0.
if 'ad_hoc_command_id' in payload:
event_type_key = 'ad_hoc_command_id'
event_object_type = AdHocCommand
else:
event_type_key = 'job_id'
event_object_type = Job
try:
verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity
except Exception as e:
verbose=0
# TODO: cache
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(payload['created'], datetime.datetime):
payload['created'] = parse_datetime(payload['created'])
if not payload['created'].tzinfo:
payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
payload.pop('created', None)
event_uuid = payload.get("uuid", '')
parent_event_uuid = payload.get("parent_uuid", '')
artifact_data = payload.get("artifact_data", None)
# Sanity check: Don't honor keys that we don't recognize.
for key in payload.keys():
if key not in (event_type_key, 'event', 'event_data',
'created', 'counter', 'uuid'):
payload.pop(key)
try:
# If we're not in verbose mode, wipe out any module
# arguments.
res = payload['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
if 'ad_hoc_command_id' in payload:
AdHocCommandEvent.objects.create(**data)
return
j = JobEvent(**payload)
if payload['event'] == 'playbook_on_start':
j.save()
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
return
else:
if parent_event_uuid:
parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None)
if parent_id is None:
parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id'])
if parent_id_obj.exists(): # Problematic if not there, means the parent hasn't been written yet... TODO
j.parent_id = parent_id_obj[0].id
print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id))
cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300)
else:
print("Cache hit")
j.parent_id = parent_id
j.save(post_process=True)
if event_uuid:
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
except DatabaseError as e:
logger.error("Database Error Saving Job Event: {}".format(e))
if artifact_data:
try:
self.process_artifacts(artifact_data, res, payload)
except DatabaseError as e:
logger.error("Database Error Saving Job Artifacts: {}".format(e))
def process_artifacts(self, artifact_data, res, payload):
artifact_dict = json.loads(artifact_data)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
if artifact_data is not None:
parent_job = Job.objects.filter(pk=payload['job_id']).first()
if parent_job is not None and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
class Command(NoArgsCommand):
'''
@@ -158,4 +68,3 @@ class Command(NoArgsCommand):
worker.run()
except KeyboardInterrupt:
print('Terminating Callback Receiver')

View File

@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0043_v310_scm_revision'),
]
operations = [
migrations.AddField(
model_name='adhoccommandevent',
name='end_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='start_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='stdout',
field=models.TextField(default=b'', editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='uuid',
field=models.CharField(default=b'', max_length=1024, editable=False),
),
migrations.AddField(
model_name='adhoccommandevent',
name='verbosity',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='end_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='playbook',
field=models.CharField(default=b'', max_length=1024, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='start_line',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='jobevent',
name='stdout',
field=models.TextField(default=b'', editable=False),
),
migrations.AddField(
model_name='jobevent',
name='verbosity',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='adhoccommandevent',
name='counter',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='adhoccommandevent',
name='event',
field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_skipped', 'Host Skipped'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
),
migrations.AlterField(
model_name='jobevent',
name='counter',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AlterField(
model_name='jobevent',
name='event',
field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
),
migrations.AlterUniqueTogether(
name='adhoccommandevent',
unique_together=set([]),
),
migrations.AlterIndexTogether(
name='adhoccommandevent',
index_together=set([('ad_hoc_command', 'event'), ('ad_hoc_command', 'uuid'), ('ad_hoc_command', 'end_line'), ('ad_hoc_command', 'start_line')]),
),
migrations.AlterIndexTogether(
name='jobevent',
index_together=set([('job', 'event'), ('job', 'parent'), ('job', 'start_line'), ('job', 'uuid'), ('job', 'end_line')]),
),
]

View File

@@ -2,6 +2,7 @@
# All Rights Reserved.
# Python
import datetime
import hmac
import json
import logging
@@ -9,8 +10,11 @@ from urlparse import urljoin
# Django
from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
@@ -267,14 +271,28 @@ class AdHocCommandEvent(CreatedModifiedModel):
#('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode
#('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
class Meta:
app_label = 'main'
unique_together = [('ad_hoc_command', 'host_name')]
ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
@@ -311,8 +329,30 @@ class AdHocCommandEvent(CreatedModifiedModel):
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self):
@@ -350,3 +390,28 @@ class AdHocCommandEvent(CreatedModifiedModel):
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the ad hoc command event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created',
'counter', 'uuid', 'stdout', 'start_line', 'end_line',
'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
return AdHocCommandEvent.objects.create(**kwargs)

View File

@@ -2,6 +2,7 @@
# All Rights Reserved.
# Python
import datetime
import hmac
import json
import yaml
@@ -11,8 +12,12 @@ from urlparse import urljoin
# Django
from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from django.utils.encoding import force_text
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
@@ -940,7 +945,7 @@ class JobEvent(CreatedModifiedModel):
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
# - runner_on_error
# - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
# - runner_on_no_hosts
@@ -962,14 +967,14 @@ class JobEvent(CreatedModifiedModel):
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
# AWX does not yet support --diff mode
# Tower does not yet support --diff mode
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
# AWX does not yet support vars_prompt (and will probably hang :)
# Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
# callback will not record this
@@ -978,6 +983,15 @@ class JobEvent(CreatedModifiedModel):
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
@@ -986,6 +1000,13 @@ class JobEvent(CreatedModifiedModel):
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent'),
]
job = models.ForeignKey(
'Job',
@@ -1032,12 +1053,17 @@ class JobEvent(CreatedModifiedModel):
related_name='job_events',
editable=False,
)
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
role = models.CharField( # FIXME: Determine from callback or task name.
role = models.CharField(
max_length=1024,
default='',
editable=False,
@@ -1057,8 +1083,24 @@ class JobEvent(CreatedModifiedModel):
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self):
return reverse('api:job_event_detail', args=(self.pk,))
@@ -1119,7 +1161,8 @@ class JobEvent(CreatedModifiedModel):
pass
return msg
def _find_parent(self):
def _find_parent_id(self):
# Find the (most likely) parent event for this event.
parent_events = set()
if self.event in ('playbook_on_play_start', 'playbook_on_stats',
'playbook_on_vars_prompt'):
@@ -1135,101 +1178,55 @@ class JobEvent(CreatedModifiedModel):
parent_events.add('playbook_on_setup')
parent_events.add('playbook_on_task_start')
if parent_events:
try:
qs = JobEvent.objects.filter(job_id=self.job_id)
if self.pk:
qs = qs.filter(pk__lt=self.pk, event__in=parent_events)
else:
qs = qs.filter(event__in=parent_events)
return qs.order_by('-pk')[0]
except IndexError:
pass
return None
qs = JobEvent.objects.filter(job_id=self.job_id, event__in=parent_events).order_by('-pk')
if self.pk:
qs = qs.filter(pk__lt=self.pk)
return qs.only('id').values_list('id', flat=True).first()
def save(self, *args, **kwargs):
from awx.main.models.inventory import Host
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Skip normal checks on save if we're only updating failed/changed
# flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
res = self.event_data.get('res', None)
# Workaround for Ansible 1.2, where the runner_on_async_ok event is
# created even when the async task failed. Change the event to be
# correct.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
self.event = 'runner_on_async_failed'
except (AttributeError, TypeError):
pass
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
def _update_from_event_data(self):
# Update job event model fields from event data.
updated_fields = set()
job = self.job
verbosity = job.verbosity
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
if self.event == 'playbook_on_stats':
try:
failures_dict = self.event_data.get('failures', {})
dark_dict = self.event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
if 'failed' not in update_fields:
update_fields.append('failed')
changed_dict = self.event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
if 'changed' not in update_fields:
update_fields.append('changed')
except (AttributeError, TypeError):
pass
self.play = self.event_data.get('play', '').strip()
if 'play' not in update_fields:
update_fields.append('play')
self.task = self.event_data.get('task', '').strip()
if 'task' not in update_fields:
update_fields.append('task')
self.role = self.event_data.get('role', '').strip()
if 'role' not in update_fields:
update_fields.append('role')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
# Only update job event hierarchy and related models during post
# processing (after running job).
post_process = kwargs.pop('post_process', False)
if post_process:
updated_fields.add('changed')
# If we're not in verbose mode, wipe out any module arguments.
invocation = res.get('invocation', None)
if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation:
event_data['res']['invocation']['module_args'] = ''
self.event_data = event_data
update_fields.add('event_data')
if self.event == 'playbook_on_stats':
try:
if not self.host_id and self.host_name:
host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
if self.parent is None:
self.parent = self._find_parent()
if 'parent' not in update_fields:
update_fields.append('parent')
super(JobEvent, self).save(*args, **kwargs)
if post_process and not from_parent_update:
self.update_parent_failed_and_changed()
# FIXME: The update_hosts() call (and its queries) are the current
# performance bottleneck....
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self.update_hosts()
self.update_host_summary_from_stats()
for field in ('playbook', 'play', 'task', 'role', 'host'):
value = force_text(event_data.get(field, '')).strip()
if field == 'host':
field = 'host_name'
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
def update_parent_failed_and_changed(self):
# Propagage failed and changed flags to parent events.
if self.parent:
def _update_parent_failed_and_changed(self):
# Propagate failed and changed flags to parent events.
if self.parent_id:
parent = self.parent
update_fields = []
if self.failed and not parent.failed:
@@ -1240,9 +1237,10 @@ class JobEvent(CreatedModifiedModel):
update_fields.append('changed')
if update_fields:
parent.save(update_fields=update_fields, from_parent_update=True)
parent.update_parent_failed_and_changed()
parent._update_parent_failed_and_changed()
def update_hosts(self, extra_host_pks=None):
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
from awx.main.models.inventory import Host
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
@@ -1256,16 +1254,14 @@ class JobEvent(CreatedModifiedModel):
pass
qs = Host.objects.filter(inventory__jobs__id=self.job_id)
qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id)
for host in qs.only('id'):
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent:
self.parent.update_hosts(self.hosts.only('id').values_list('id', flat=True))
if self.parent_id:
self.parent._update_hosts(qs.values_list('id', flat=True))
def update_host_summary_from_stats(self):
def _update_host_summary_from_stats(self):
from awx.main.models.inventory import Host
if self.event != 'playbook_on_stats':
return
hostnames = set()
try:
for v in self.event_data.values():
@@ -1276,7 +1272,6 @@ class JobEvent(CreatedModifiedModel):
qs = Host.objects.filter(inventory__jobs__id=self.job_id,
name__in=hostnames)
job = self.job
#for host in qs.only('id', 'name'):
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
@@ -1300,6 +1295,112 @@ class JobEvent(CreatedModifiedModel):
job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id))
def save(self, *args, **kwargs):
from awx.main.models.inventory import Host
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
# Update parent related field if not set.
if self.parent_id is None:
self.parent_id = self._find_parent_id()
if self.parent_id and 'parent_id' not in update_fields:
update_fields.append('parent_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if self.parent_id:
self._update_parent_failed_and_changed()
# FIXME: The update_hosts() call (and its queries) are the current
# performance bottleneck....
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_host_summary_from_stats()
@classmethod
def create_from_data(self, **kwargs):
# Must have a job_id specified.
if not kwargs.get('job_id', None):
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Save UUID and parent UUID for determining parent-child relationship.
job_event_uuid = kwargs.get('uuid', None)
parent_event_uuid = kwargs.get('parent_uuid', None)
artifact_data = kwargs.get('artifact_data', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play',
'role', 'task', 'created', 'counter', 'uuid', 'stdout',
'start_line', 'end_line', 'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
# Try to find a parent event based on UUID.
if parent_event_uuid:
cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid)
parent_id = cache.get(cache_key)
if parent_id is None:
parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first()
if parent_id:
print("Settings cache: {} with value {}".format(cache_key, parent_id))
cache.set(cache_key, parent_id, 300)
if parent_id:
kwargs['parent_id'] = parent_id
job_event = JobEvent.objects.create(**kwargs)
# Cache this job event ID vs. UUID for future parent lookups.
if job_event_uuid:
cache_key = '{}_{}'.format(kwargs['job_id'], job_event_uuid)
cache.set(cache_key, job_event.id, 300)
# Save artifact data to parent job (if provided).
if artifact_data:
artifact_dict = json.loads(artifact_data)
event_data = kwargs.get('event_data', None)
if event_data and isinstance(event_data, dict):
res = event_data.get('res', None)
if res and isinstance(res, dict):
if res.get('_ansible_no_log', False):
artifact_dict['_ansible_no_log'] = True
parent_job = Job.objects.filter(pk=kwargs['job_id']).first()
if parent_job and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@classmethod
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
'''

View File

@@ -696,8 +696,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return StringIO(msg['missing' if self.finished else 'pending'])
def _escape_ascii(self, content):
ansi_escape = re.compile(r'\x1b[^m]*m')
return ansi_escape.sub('', content)
# Remove ANSI escape sequences used to embed event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
# Remove ANSI color escape sequences.
content = re.sub(r'\x1b[^m]*m', '', content)
return content
def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False):
content = self.result_stdout_raw_handle().read()

View File

@@ -49,7 +49,8 @@ from awx.main.models import * # noqa
from awx.main.models import UnifiedJob
from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot)
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
OutputEventFilter)
from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
@@ -397,6 +398,8 @@ class BaseTask(Task):
if os.path.isdir(os.path.join(venv_libdir, python_ver)):
env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":"
break
# Add awx/lib to PYTHONPATH.
env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')]))
return env
def add_tower_venv(self, env):
@@ -494,6 +497,17 @@ class BaseTask(Task):
'''
return OrderedDict()
def get_stdout_handle(self, instance):
'''
Return an open file object for capturing stdout.
'''
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1())))
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
assert stdout_handle.name == stdout_filename
return stdout_handle
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None, extra_update_fields=None):
'''
@@ -643,10 +657,7 @@ class BaseTask(Task):
cwd = self.build_cwd(instance, **kwargs)
env = self.build_env(instance, **kwargs)
safe_env = self.build_safe_env(instance, **kwargs)
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (pk, str(uuid.uuid1())))
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
stdout_handle = self.get_stdout_handle(instance)
if self.should_use_proot(instance, **kwargs):
if not check_proot_installed():
raise RuntimeError('proot is not installed')
@@ -660,7 +671,7 @@ class BaseTask(Task):
args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields)
except Exception:
@@ -779,6 +790,7 @@ class RunJob(BaseTask):
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display'
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -974,6 +986,16 @@ class RunJob(BaseTask):
d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password'
return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunJob, self).get_stdout_handle(instance)
def job_event_callback(event_data):
event_data.setdefault('job_id', instance.id)
JobEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, job_event_callback)
def get_ssh_key_path(self, instance, **kwargs):
'''
If using an SSH key, return the path for use by ssh-agent.
@@ -1019,11 +1041,6 @@ class RunJob(BaseTask):
pass
else:
update_inventory_computed_fields.delay(inventory.id, True)
# Update job event fields after job has completed (only when using REST
# API callback).
if not getattr(settings, 'CALLBACK_CONSUMER_PORT', None) and not getattr(settings, 'CALLBACK_QUEUE', None):
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)
class RunProjectUpdate(BaseTask):
@@ -1597,6 +1614,7 @@ class RunAdHocCommand(BaseTask):
env['INVENTORY_HOSTVARS'] = str(True)
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline).
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = ad_hoc_command.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -1693,6 +1711,16 @@ class RunAdHocCommand(BaseTask):
d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password'
return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance)
def ad_hoc_command_event_callback(event_data):
event_data.setdefault('ad_hoc_command_id', instance.id)
AdHocCommandEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)
def get_ssh_key_path(self, instance, **kwargs):
'''
If using an SSH key, return the path for use by ssh-agent.

View File

@@ -4,6 +4,7 @@
# Python
import base64
import hashlib
import json
import logging
import os
import re
@@ -36,7 +37,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps']
'get_current_apps', 'set_current_apps', 'OutputEventFilter']
def get_object_or_400(klass, *args, **kwargs):
@@ -640,3 +641,71 @@ def set_current_apps(apps):
def get_current_apps():
global current_apps
return current_apps
class OutputEventFilter(object):
'''
File-like object that looks for encoded job events in stdout data.
'''
EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
def __init__(self, fileobj=None, event_callback=None):
self._fileobj = fileobj
self._event_callback = event_callback
self._counter = 1
self._start_line = 0
self._buffer = ''
self._current_event_data = None
def __getattr__(self, attr):
return getattr(self._fileobj, attr)
def write(self, data):
if self._fileobj:
self._fileobj.write(data)
self._buffer += data
while True:
match = self.EVENT_DATA_RE.search(self._buffer)
if not match:
break
try:
base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1))
event_data = json.loads(base64.b64decode(base64_data))
except ValueError:
event_data = {}
self._emit_event(self._buffer[:match.start()], event_data)
self._buffer = self._buffer[match.end():]
def close(self):
if self._fileobj:
self._fileobj.close()
if self._buffer:
self._emit_event(self._buffer)
self._buffer = ''
def _emit_event(self, buffered_stdout, next_event_data=None):
if self._current_event_data:
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]
elif buffered_stdout:
event_data = dict(event='verbose')
stdout_chunks = buffered_stdout.splitlines(True)
else:
stdout_chunks = []
for stdout_chunk in stdout_chunks:
event_data['counter'] = self._counter
self._counter += 1
event_data['stdout'] = stdout_chunk
n_lines = stdout_chunk.count('\n')
event_data['start_line'] = self._start_line
event_data['end_line'] = self._start_line + n_lines
self._start_line += n_lines
if self._event_callback:
self._event_callback(event_data)
if next_event_data.get('uuid', None):
self._current_event_data = next_event_data
else:
self._current_event_data = None

View File

@@ -1,579 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# This file is a utility Ansible plugin that is not part of the AWX or Ansible
# packages. It does not import any code from either package, nor does its
# license apply to Ansible or AWX.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# Neither the name of the <ORGANIZATION> nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# Python
import datetime
import glob
import json
import logging
import os
import pwd
import urlparse
import re
from copy import deepcopy
from uuid import uuid4
# Kombu
from kombu import Connection, Exchange, Producer
# Requests
import requests
import psutil
CENSOR_FIELD_WHITELIST = [
'msg',
'failed',
'changed',
'results',
'start',
'end',
'delta',
'cmd',
'_ansible_no_log',
'rc',
'failed_when_result',
'skipped',
'skip_reason',
]
def censor(obj, no_log=False):
if not isinstance(obj, dict):
if no_log:
return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return obj
if obj.get('_ansible_no_log', no_log):
new_obj = {}
for k in CENSOR_FIELD_WHITELIST:
if k in obj:
new_obj[k] = obj[k]
if k == 'cmd' and k in obj:
if isinstance(obj['cmd'], list):
obj['cmd'] = ' '.join(obj['cmd'])
if re.search(r'\s', obj['cmd']):
new_obj['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
r'\1 <censored>',
obj['cmd'])
new_obj['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
obj = new_obj
if 'results' in obj:
if isinstance(obj['results'], list):
for i in xrange(len(obj['results'])):
obj['results'][i] = censor(obj['results'][i], obj.get('_ansible_no_log', no_log))
elif obj.get('_ansible_no_log', False):
obj['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
return obj
class TokenAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, request):
request.headers['Authorization'] = 'Token %s' % self.token
return request
# TODO: non v2_ events are deprecated and should be purge/refactored out
class BaseCallbackModule(object):
'''
Callback module for logging ansible-playbook job events via the REST API.
'''
def __init__(self):
self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '')
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
self._init_connection()
self.counter = 0
self.active_playbook = None
self.active_play = None
self.active_task = None
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def _init_connection(self):
self.connection = None
def _start_connection(self):
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
def _post_job_event_queue_msg(self, event, event_data):
self.counter += 1
msg = {
'event': event,
'event_data': event_data,
'counter': self.counter,
'created': datetime.datetime.utcnow().isoformat(),
}
if event in ('playbook_on_play_start',
'playbook_on_stats',
'playbook_on_vars_prompt'):
msg['parent_uuid'] = str(self.active_playbook)
elif event in ('playbook_on_notify',
'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_include',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
msg['parent_uuid'] = str(self.active_play)
elif event.startswith('runner_on_') or event.startswith('runner_item_on_'):
msg['parent_uuid'] = str(self.active_task)
else:
msg['parent_uuid'] = ''
if "uuid" in event_data:
msg['uuid'] = str(event_data['uuid'])
else:
msg['uuid'] = ''
if getattr(self, 'job_id', None):
msg['job_id'] = self.job_id
if getattr(self, 'ad_hoc_command_id', None):
msg['ad_hoc_command_id'] = self.ad_hoc_command_id
if getattr(self, 'artifact_data', None):
msg['artifact_data'] = self.artifact_data
active_pid = os.getpid()
if self.job_callback_debug:
msg.update({
'pid': active_pid,
})
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self._init_connection()
if self.connection is None:
self._start_connection()
producer = Producer(self.connection)
producer.publish(msg,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
retry_count += 1
if retry_count >= 3:
break
def _post_rest_api_event(self, event, event_data):
data = json.dumps({
'event': event,
'event_data': event_data,
})
parts = urlparse.urlsplit(self.base_url)
if parts.username and parts.password:
auth = (parts.username, parts.password)
elif self.auth_token:
auth = TokenAuth(self.auth_token)
else:
auth = None
port = parts.port or (443 if parts.scheme == 'https' else 80)
url = urlparse.urlunsplit([parts.scheme,
'%s:%d' % (parts.hostname, port),
parts.path, parts.query, parts.fragment])
url = urlparse.urljoin(url, self.rest_api_path)
headers = {'content-type': 'application/json'}
response = requests.post(url, data=data, headers=headers, auth=auth)
response.raise_for_status()
def _log_event(self, event, **event_data):
if 'res' in event_data:
event_data['res'] = censor(deepcopy(event_data['res']))
if self.callback_connection:
self._post_job_event_queue_msg(event, event_data)
else:
self._post_rest_api_event(event, event_data)
def on_any(self, *args, **kwargs):
pass
def runner_on_failed(self, host, res, ignore_errors=False):
self._log_event('runner_on_failed', host=host, res=res,
ignore_errors=ignore_errors)
def v2_runner_on_failed(self, result, ignore_errors=False):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_failed', host=result._host.name,
res=result._result, task=result._task,
ignore_errors=ignore_errors, event_loop=event_is_loop)
def runner_on_ok(self, host, res):
self._log_event('runner_on_ok', host=host, res=res)
def v2_runner_on_ok(self, result):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_ok', host=result._host.name,
task=result._task, res=result._result,
event_loop=event_is_loop)
def runner_on_error(self, host, msg):
self._log_event('runner_on_error', host=host, msg=msg)
def v2_runner_on_error(self, result):
pass # Currently not implemented in v2
def runner_on_skipped(self, host, item=None):
self._log_event('runner_on_skipped', host=host, item=item)
def v2_runner_on_skipped(self, result):
event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
self._log_event('runner_on_skipped', host=result._host.name,
task=result._task, event_loop=event_is_loop)
def runner_on_unreachable(self, host, res):
self._log_event('runner_on_unreachable', host=host, res=res)
def v2_runner_on_unreachable(self, result):
self._log_event('runner_on_unreachable', host=result._host.name,
task=result._task, res=result._result)
def runner_on_no_hosts(self):
self._log_event('runner_on_no_hosts')
def v2_runner_on_no_hosts(self, task):
self._log_event('runner_on_no_hosts', task=task)
# V2 does not use the _on_async callbacks (yet).
def runner_on_async_poll(self, host, res, jid, clock):
self._log_event('runner_on_async_poll', host=host, res=res, jid=jid,
clock=clock)
def runner_on_async_ok(self, host, res, jid):
self._log_event('runner_on_async_ok', host=host, res=res, jid=jid)
def runner_on_async_failed(self, host, res, jid):
self._log_event('runner_on_async_failed', host=host, res=res, jid=jid)
def runner_on_file_diff(self, host, diff):
self._log_event('runner_on_file_diff', host=host, diff=diff)
def v2_runner_on_file_diff(self, result, diff):
self._log_event('runner_on_file_diff', host=result._host.name,
task=result._task, diff=diff)
def v2_runner_item_on_ok(self, result):
self._log_event('runner_item_on_ok', res=result._result, host=result._host.name,
task=result._task)
def v2_runner_item_on_failed(self, result):
self._log_event('runner_item_on_failed', res=result._result, host=result._host.name,
task=result._task)
def v2_runner_item_on_skipped(self, result):
self._log_event('runner_item_on_skipped', res=result._result, host=result._host.name,
task=result._task)
@staticmethod
def terminate_ssh_control_masters():
# Determine if control persist is being used and if any open sockets
# exist after running the playbook.
cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
if not cp_path:
return
cp_dir = os.path.dirname(cp_path)
if not os.path.exists(cp_dir):
return
cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
cp_files = glob.glob(cp_pattern)
if not cp_files:
return
# Attempt to find any running control master processes.
username = pwd.getpwuid(os.getuid())[0]
ssh_cm_procs = []
for proc in psutil.process_iter():
try:
pname = proc.name()
pcmdline = proc.cmdline()
pusername = proc.username()
except psutil.NoSuchProcess:
continue
if pusername != username:
continue
if pname != 'ssh':
continue
for cp_file in cp_files:
if pcmdline and cp_file in pcmdline[0]:
ssh_cm_procs.append(proc)
break
# Terminate then kill control master processes. Workaround older
# version of psutil that may not have wait_procs implemented.
for proc in ssh_cm_procs:
proc.terminate()
procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
for proc in procs_alive:
proc.kill()
class JobCallbackModule(BaseCallbackModule):
'''
Callback module for logging ansible-playbook job events via the REST API.
'''
# These events should never have an associated play.
EVENTS_WITHOUT_PLAY = [
'playbook_on_start',
'playbook_on_stats',
]
# These events should never have an associated task.
EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
'playbook_on_setup',
'playbook_on_notify',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
]
def __init__(self):
self.job_id = int(os.getenv('JOB_ID', '0'))
self.rest_api_path = '/api/v1/jobs/%d/job_events/' % self.job_id
super(JobCallbackModule, self).__init__()
def _log_event(self, event, **event_data):
play = getattr(self, 'play', None)
play_name = getattr(play, 'name', '')
if play_name and event not in self.EVENTS_WITHOUT_PLAY:
event_data['play'] = play_name
task = event_data.pop('task', None) or getattr(self, 'task', None)
task_name = None
role_name = None
if task:
if hasattr(task, 'get_name'):
# in v2, the get_name() method creates the name
task_name = task.get_name()
else:
# v1 datastructure
task_name = getattr(task, 'name', '')
if hasattr(task, '_role') and task._role:
# v2 datastructure
role_name = task._role._role_name
else:
# v1 datastructure
role_name = getattr(task, 'role_name', '')
if task_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['task'] = task_name
if role_name and event not in self.EVENTS_WITHOUT_TASK:
event_data['role'] = role_name
self.artifact_data = None
if 'res' in event_data and 'artifact_data' in event_data['res']:
self.artifact_data = event_data['res']['artifact_data']
super(JobCallbackModule, self)._log_event(event, **event_data)
def playbook_on_start(self):
self._log_event('playbook_on_start')
def v2_playbook_on_start(self, playbook):
# NOTE: the playbook parameter was added late in Ansible 2.0 development
# so we don't currently utilize but could later.
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them
self.active_playbook = str(uuid4())
self._log_event('playbook_on_start', uuid=self.active_playbook)
def playbook_on_notify(self, host, handler):
self._log_event('playbook_on_notify', host=host, handler=handler)
def v2_playbook_on_notify(self, result, handler):
self._log_event('playbook_on_notify', host=result._host.name,
task=result._task, handler=handler)
def playbook_on_no_hosts_matched(self):
self._log_event('playbook_on_no_hosts_matched')
def v2_playbook_on_no_hosts_matched(self):
# since there is no task/play info, this is currently identical
# to the v1 callback which does the same thing
self.playbook_on_no_hosts_matched()
def playbook_on_no_hosts_remaining(self):
self._log_event('playbook_on_no_hosts_remaining')
def v2_playbook_on_no_hosts_remaining(self):
# since there is no task/play info, this is currently identical
# to the v1 callback which does the same thing
self.playbook_on_no_hosts_remaining()
def playbook_on_task_start(self, name, is_conditional):
self._log_event('playbook_on_task_start', name=name,
is_conditional=is_conditional)
def v2_playbook_on_task_start(self, task, is_conditional):
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name(), is_conditional=is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# re-using playbook_on_task_start event here for this v2-specific
# event, though we may consider any changes necessary to distinguish
# this from a normal task
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name())
def playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
self._log_event('playbook_on_vars_prompt', varname=varname,
private=private, prompt=prompt, encrypt=encrypt,
confirm=confirm, salt_size=salt_size, salt=salt,
default=default)
def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
encrypt=None, confirm=False, salt_size=None,
salt=None, default=None):
pass # not currently used in v2 (yet)
def playbook_on_setup(self):
self._log_event('playbook_on_setup')
def v2_playbook_on_setup(self):
pass # not currently used in v2 (yet)
def playbook_on_import_for_host(self, host, imported_file):
# don't care about recording this one
# self._log_event('playbook_on_import_for_host', host=host,
# imported_file=imported_file)
pass
def v2_playbook_on_import_for_host(self, result, imported_file):
pass # not currently used in v2 (yet)
def playbook_on_not_import_for_host(self, host, missing_file):
# don't care about recording this one
#self._log_event('playbook_on_not_import_for_host', host=host,
# missing_file=missing_file)
pass
def v2_playbook_on_not_import_for_host(self, result, missing_file):
pass # not currently used in v2 (yet)
def playbook_on_play_start(self, name):
# Only play name is passed via callback, get host pattern from the play.
pattern = getattr(getattr(self, 'play', None), 'hosts', name)
self._log_event('playbook_on_play_start', name=name, pattern=pattern)
def v2_playbook_on_play_start(self, play):
setattr(self, 'play', play)
# Ansible 2.0.0.2 doesn't default .name to hosts like it did in 1.9.4,
# though that default will likely return in a future version of Ansible.
if (not hasattr(play, 'name') or not play.name) and hasattr(play, 'hosts'):
if isinstance(play.hosts, list):
play.name = ','.join(play.hosts)
else:
play.name = play.hosts
self.active_play = play._uuid
self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid),
pattern=play.hosts)
def playbook_on_stats(self, stats):
d = {}
for attr in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
d[attr] = getattr(stats, attr)
self._log_event('playbook_on_stats', **d)
self.terminate_ssh_control_masters()
def v2_playbook_on_stats(self, stats):
self.playbook_on_stats(stats)
def v2_playbook_on_include(self, included_file):
self._log_event('playbook_on_include', included_file=included_file)
class AdHocCommandCallbackModule(BaseCallbackModule):
'''
Callback module for logging ansible ad hoc events via ZMQ or the REST API.
'''
def __init__(self):
self.ad_hoc_command_id = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
self.rest_api_path = '/api/v1/ad_hoc_commands/%d/events/' % self.ad_hoc_command_id
self.skipped_hosts = set()
super(AdHocCommandCallbackModule, self).__init__()
def _log_event(self, event, **event_data):
# Ignore task for ad hoc commands (with v2).
event_data.pop('task', None)
super(AdHocCommandCallbackModule, self)._log_event(event, **event_data)
def runner_on_file_diff(self, host, diff):
pass # Ignore file diff for ad hoc commands.
def runner_on_ok(self, host, res):
# When running in check mode using a module that does not support check
# mode, Ansible v1.9 will call runner_on_skipped followed by
# runner_on_ok for the same host; only capture the skipped event and
# ignore the ok event.
if host not in self.skipped_hosts:
super(AdHocCommandCallbackModule, self).runner_on_ok(host, res)
def runner_on_skipped(self, host, item=None):
super(AdHocCommandCallbackModule, self).runner_on_skipped(host, item)
self.skipped_hosts.add(host)
if os.getenv('JOB_ID', ''):
CallbackModule = JobCallbackModule
elif os.getenv('AD_HOC_COMMAND_ID', ''):
CallbackModule = AdHocCommandCallbackModule

View File

@@ -0,0 +1,30 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
import sys
# Add awx/lib to sys.path.
awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
if awx_lib_path not in sys.path:
sys.path.insert(0, awx_lib_path)
# Tower Display Callback
from tower_display_callback import TowerMinimalCallbackModule as CallbackModule

View File

@@ -0,0 +1,30 @@
# Copyright (c) 2016 Ansible by Red Hat, Inc.
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
# Python
import os
import sys
# Add awx/lib to sys.path.
awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
if awx_lib_path not in sys.path:
sys.path.insert(0, awx_lib_path)
# Tower Display Callback
from tower_display_callback import TowerDefaultCallbackModule as CallbackModule