diff --git a/awx/api/permissions.py b/awx/api/permissions.py
index ecd725bc6e..cda66ff2ec 100644
--- a/awx/api/permissions.py
+++ b/awx/api/permissions.py
@@ -189,8 +189,6 @@ class TaskPermission(ModelAccessPermission):
# token.
if view.model == Inventory and request.method.lower() in ('head', 'get'):
return bool(not obj or obj.pk == unified_job.inventory_id)
- elif view.model in (JobEvent, AdHocCommandEvent) and request.method.lower() == 'post':
- return bool(not obj or obj.pk == unified_job.pk)
else:
return False
diff --git a/awx/api/serializers.py b/awx/api/serializers.py
index eaa764c345..3677f1a028 100644
--- a/awx/api/serializers.py
+++ b/awx/api/serializers.py
@@ -2417,7 +2417,9 @@ class JobEventSerializer(BaseSerializer):
model = JobEvent
fields = ('*', '-name', '-description', 'job', 'event', 'counter',
'event_display', 'event_data', 'event_level', 'failed',
- 'changed', 'host', 'host_name', 'parent', 'play', 'task', 'role')
+ 'changed', 'uuid', 'host', 'host_name', 'parent', 'playbook',
+ 'play', 'task', 'role', 'stdout', 'start_line', 'end_line',
+ 'verbosity')
def get_related(self, obj):
res = super(JobEventSerializer, self).get_related(obj)
@@ -2453,16 +2455,8 @@ class AdHocCommandEventSerializer(BaseSerializer):
model = AdHocCommandEvent
fields = ('*', '-name', '-description', 'ad_hoc_command', 'event',
'counter', 'event_display', 'event_data', 'failed',
- 'changed', 'host', 'host_name')
-
- def to_internal_value(self, data):
- ret = super(AdHocCommandEventSerializer, self).to_internal_value(data)
- # AdHocCommandAdHocCommandEventsList should be the only view creating
- # AdHocCommandEvent instances, so keep the ad_hoc_command it sets, even
- # though ad_hoc_command is a read-only field.
- if 'ad_hoc_command' in data:
- ret['ad_hoc_command'] = data['ad_hoc_command']
- return ret
+ 'changed', 'uuid', 'host', 'host_name', 'stdout',
+ 'start_line', 'end_line', 'verbosity')
def get_related(self, obj):
res = super(AdHocCommandEventSerializer, self).get_related(obj)
diff --git a/awx/api/views.py b/awx/api/views.py
index 66980e141d..3fb9826519 100644
--- a/awx/api/views.py
+++ b/awx/api/views.py
@@ -3050,21 +3050,6 @@ class GroupJobEventsList(BaseJobEventsList):
class JobJobEventsList(BaseJobEventsList):
parent_model = Job
- authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
- permission_classes = (TaskPermission,)
-
- # Post allowed for job event callback only.
- def post(self, request, *args, **kwargs):
- parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
- data = request.data.copy()
- data['job'] = parent_obj.pk
- serializer = self.get_serializer(data=data)
- if serializer.is_valid():
- self.instance = serializer.save()
- headers = {'Location': serializer.data['url']}
- return Response(serializer.data, status=status.HTTP_201_CREATED,
- headers=headers)
- return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class JobJobPlaysList(BaseJobEventsList):
@@ -3455,25 +3440,8 @@ class HostAdHocCommandEventsList(BaseAdHocCommandEventsList):
class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList):
parent_model = AdHocCommand
- authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES
- permission_classes = (TaskPermission,)
new_in_220 = True
- # Post allowed for ad hoc event callback only.
- def post(self, request, *args, **kwargs):
- if request.user:
- raise PermissionDenied()
- parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk'])
- data = request.data.copy()
- data['ad_hoc_command'] = parent_obj
- serializer = self.get_serializer(data=data)
- if serializer.is_valid():
- self.instance = serializer.save()
- headers = {'Location': serializer.data['url']}
- return Response(serializer.data, status=status.HTTP_201_CREATED,
- headers=headers)
- return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
-
class AdHocCommandActivityStreamList(SubListAPIView):
@@ -3583,7 +3551,11 @@ class UnifiedJobStdout(RetrieveAPIView):
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
+ # Remove any ANSI escape sequences containing job event data.
+ content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
+
body = ansiconv.to_html(cgi.escape(content))
+
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),
diff --git a/awx/lib/sitecustomize.py b/awx/lib/sitecustomize.py
new file mode 100644
index 0000000000..02ac2eba55
--- /dev/null
+++ b/awx/lib/sitecustomize.py
@@ -0,0 +1,22 @@
+# Python
+import os
+import sys
+
+# Based on http://stackoverflow.com/a/6879344/131141 -- Initialize tower display
+# callback as early as possible to wrap ansible.display.Display methods.
+
+def argv_ready(argv):
+ if argv and os.path.basename(argv[0]) in {'ansible', 'ansible-playbook'}:
+ import tower_display_callback # noqa
+
+
+class argv_placeholder(object):
+
+ def __del__(self):
+ argv_ready(sys.argv)
+
+
+if hasattr(sys, 'argv'):
+ argv_ready(sys.argv)
+else:
+ sys.argv = argv_placeholder()
diff --git a/awx/lib/tower_display_callback/__init__.py b/awx/lib/tower_display_callback/__init__.py
new file mode 100644
index 0000000000..d984956c7f
--- /dev/null
+++ b/awx/lib/tower_display_callback/__init__.py
@@ -0,0 +1,25 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Tower Display Callback
+from . import cleanup # noqa (registers control persistent cleanup)
+from . import display # noqa (wraps ansible.display.Display methods)
+from .module import TowerDefaultCallbackModule, TowerMinimalCallbackModule
+
+__all__ = ['TowerDefaultCallbackModule', 'TowerMinimalCallbackModule']
diff --git a/awx/lib/tower_display_callback/cleanup.py b/awx/lib/tower_display_callback/cleanup.py
new file mode 100644
index 0000000000..7a0387cddf
--- /dev/null
+++ b/awx/lib/tower_display_callback/cleanup.py
@@ -0,0 +1,72 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import atexit
+import glob
+import os
+import pwd
+
+# PSUtil
+import psutil
+
+__all__ = []
+
+
+@atexit.register
+def terminate_ssh_control_masters():
+ # Determine if control persist is being used and if any open sockets
+ # exist after running the playbook.
+ cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
+ if not cp_path:
+ return
+ cp_dir = os.path.dirname(cp_path)
+ if not os.path.exists(cp_dir):
+ return
+ cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
+ cp_files = glob.glob(cp_pattern)
+ if not cp_files:
+ return
+
+ # Attempt to find any running control master processes.
+ username = pwd.getpwuid(os.getuid())[0]
+ ssh_cm_procs = []
+ for proc in psutil.process_iter():
+ try:
+ pname = proc.name()
+ pcmdline = proc.cmdline()
+ pusername = proc.username()
+ except psutil.NoSuchProcess:
+ continue
+ if pusername != username:
+ continue
+ if pname != 'ssh':
+ continue
+ for cp_file in cp_files:
+ if pcmdline and cp_file in pcmdline[0]:
+ ssh_cm_procs.append(proc)
+ break
+
+ # Terminate then kill control master processes. Workaround older
+ # version of psutil that may not have wait_procs implemented.
+ for proc in ssh_cm_procs:
+ proc.terminate()
+ procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
+ for proc in procs_alive:
+ proc.kill()
diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py
new file mode 100644
index 0000000000..5b1265201e
--- /dev/null
+++ b/awx/lib/tower_display_callback/display.py
@@ -0,0 +1,92 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import functools
+import sys
+import uuid
+
+# Ansible
+from ansible.utils.display import Display
+
+# Tower Display Callback
+from tower_display_callback.events import event_context
+
+__all__ = []
+
+
+def with_context(**context):
+ global event_context
+
+ def wrap(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ with event_context.set_local(**context):
+ return f(*args, **kwargs)
+ return wrapper
+ return wrap
+
+
+for attr in dir(Display):
+ if attr.startswith('_') or 'cow' in attr or 'prompt' in attr:
+ continue
+ if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'):
+ continue
+ if not callable(getattr(Display, attr)):
+ continue
+ setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr)))
+
+
+def with_verbosity(f):
+ global event_context
+
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ host = args[2] if len(args) >= 3 else kwargs.get('host', None)
+ caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2)
+ context = dict(verbose=True, verbosity=(caplevel + 1))
+ if host is not None:
+ context['remote_addr'] = host
+ with event_context.set_local(**context):
+ return f(*args, **kwargs)
+ return wrapper
+
+Display.verbose = with_verbosity(Display.verbose)
+
+
+def display_with_context(f):
+
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
+ stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
+ fileobj = sys.stderr if stderr else sys.stdout
+ event_uuid = event_context.get().get('uuid', None)
+ try:
+ if not log_only and not event_uuid:
+ event_context.add_local(uuid=str(uuid.uuid4()))
+ event_context.dump_begin(fileobj)
+ return f(*args, **kwargs)
+ finally:
+ if not log_only and not event_uuid:
+ event_context.dump_end(fileobj)
+ event_context.remove_local(uuid=None)
+ return wrapper
+
+Display.display = display_with_context(Display.display)
diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py
new file mode 100644
index 0000000000..ad7eb6418e
--- /dev/null
+++ b/awx/lib/tower_display_callback/events.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import base64
+import contextlib
+import datetime
+import json
+import os
+import threading
+import uuid
+
+__all__ = ['event_context']
+
+
+class EventContext(object):
+ '''
+ Store global and local (per thread/process) data associated with callback
+ events and other display output methods.
+ '''
+
+ def add_local(self, **kwargs):
+ if not hasattr(self, '_local'):
+ self._local = threading.local()
+ self._local._ctx = {}
+ self._local._ctx.update(kwargs)
+
+ def remove_local(self, **kwargs):
+ if hasattr(self, '_local'):
+ for key in kwargs.keys():
+ self._local._ctx.pop(key, None)
+
+ @contextlib.contextmanager
+ def set_local(self, **kwargs):
+ try:
+ self.add_local(**kwargs)
+ yield
+ finally:
+ self.remove_local(**kwargs)
+
+ def get_local(self):
+ return getattr(getattr(self, '_local', None), '_ctx', {})
+
+ def add_global(self, **kwargs):
+ if not hasattr(self, '_global_ctx'):
+ self._global_ctx = {}
+ self._global_ctx.update(kwargs)
+
+ def remove_global(self, **kwargs):
+ if hasattr(self, '_global_ctx'):
+ for key in kwargs.keys():
+ self._global_ctx.pop(key, None)
+
+ @contextlib.contextmanager
+ def set_global(self, **kwargs):
+ try:
+ self.add_global(**kwargs)
+ yield
+ finally:
+ self.remove_global(**kwargs)
+
+ def get_global(self):
+ return getattr(self, '_global_ctx', {})
+
+ def get(self):
+ ctx = {}
+ ctx.update(self.get_global())
+ ctx.update(self.get_local())
+ return ctx
+
+ def get_begin_dict(self):
+ event_data = self.get()
+ if os.getenv('JOB_ID', ''):
+ event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
+ if os.getenv('AD_HOC_COMMAND_ID', ''):
+ event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
+ event_data.setdefault('pid', os.getpid())
+ event_data.setdefault('uuid', str(uuid.uuid4()))
+ event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
+ if not event_data.get('parent_uuid', None) and event_data.get('job_id', None):
+ for key in ('task_uuid', 'play_uuid', 'playbook_uuid'):
+ parent_uuid = event_data.get(key, None)
+ if parent_uuid and parent_uuid != event_data.get('uuid', None):
+ event_data['parent_uuid'] = parent_uuid
+ break
+
+ event = event_data.pop('event', None)
+ if not event:
+ event = 'verbose'
+ for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'):
+ if event_data.get(key, False):
+ event = key
+ break
+
+ event_dict = dict(event=event, event_data=event_data)
+ for key in event_data.keys():
+ if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'):
+ event_dict[key] = event_data.pop(key)
+ elif key in ('verbosity', 'pid'):
+ event_dict[key] = event_data[key]
+ return event_dict
+
+ def get_end_dict(self):
+ return {}
+
+ def dump(self, fileobj, data, max_width=78):
+ b64data = base64.b64encode(json.dumps(data))
+ fileobj.write(u'\x1b[K')
+ for offset in xrange(0, len(b64data), max_width):
+ chunk = b64data[offset:offset + max_width]
+ escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
+ fileobj.write(escaped_chunk)
+ fileobj.write(u'\x1b[K')
+
+ def dump_begin(self, fileobj):
+ self.dump(fileobj, self.get_begin_dict())
+
+ def dump_end(self, fileobj):
+ self.dump(fileobj, self.get_end_dict())
+
+event_context = EventContext()
diff --git a/awx/lib/tower_display_callback/minimal.py b/awx/lib/tower_display_callback/minimal.py
new file mode 100644
index 0000000000..de7694213e
--- /dev/null
+++ b/awx/lib/tower_display_callback/minimal.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import os
+
+# Ansible
+import ansible
+
+# Because of the way Ansible loads plugins, it's not possible to import
+# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh.
+execfile(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py'))
diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py
new file mode 100644
index 0000000000..02b30ef2bc
--- /dev/null
+++ b/awx/lib/tower_display_callback/module.py
@@ -0,0 +1,488 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import contextlib
+import copy
+import re
+import sys
+import uuid
+
+# Ansible
+from ansible.plugins.callback import CallbackBase
+from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
+
+# Tower Display Callback
+from tower_display_callback.events import event_context
+from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule
+
+
+class BaseCallbackModule(CallbackBase):
+ '''
+ Callback module for logging ansible/ansible-playbook events.
+ '''
+
+ CALLBACK_VERSION = 2.0
+ CALLBACK_TYPE = 'stdout'
+
+ # These events should never have an associated play.
+ EVENTS_WITHOUT_PLAY = [
+ 'playbook_on_start',
+ 'playbook_on_stats',
+ ]
+
+ # These events should never have an associated task.
+ EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
+ 'playbook_on_setup',
+ 'playbook_on_notify',
+ 'playbook_on_import_for_host',
+ 'playbook_on_not_import_for_host',
+ 'playbook_on_no_hosts_matched',
+ 'playbook_on_no_hosts_remaining',
+ ]
+
+ CENSOR_FIELD_WHITELIST = [
+ 'msg',
+ 'failed',
+ 'changed',
+ 'results',
+ 'start',
+ 'end',
+ 'delta',
+ 'cmd',
+ '_ansible_no_log',
+ 'rc',
+ 'failed_when_result',
+ 'skipped',
+ 'skip_reason',
+ ]
+
+ def __init__(self):
+ super(BaseCallbackModule, self).__init__()
+ self.task_uuids = set()
+
+ def censor_result(self, res, no_log=False):
+ if not isinstance(res, dict):
+ if no_log:
+ return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
+ return res
+ if res.get('_ansible_no_log', no_log):
+ new_res = {}
+ for k in self.CENSOR_FIELD_WHITELIST:
+ if k in res:
+ new_res[k] = res[k]
+ if k == 'cmd' and k in res:
+ if isinstance(res['cmd'], list):
+ res['cmd'] = ' '.join(res['cmd'])
+ if re.search(r'\s', res['cmd']):
+ new_res['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
+ r'\1 ',
+ res['cmd'])
+ new_res['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
+ res = new_res
+ if 'results' in res:
+ if isinstance(res['results'], list):
+ for i in xrange(len(res['results'])):
+ res['results'][i] = self.censor_result(res['results'][i], res.get('_ansible_no_log', no_log))
+ elif res.get('_ansible_no_log', False):
+ res['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
+ return res
+
+ @contextlib.contextmanager
+ def capture_event_data(self, event, **event_data):
+
+ event_data.setdefault('uuid', str(uuid.uuid4()))
+
+ if 'res' in event_data:
+ event_data['res'] = self.censor_result(copy.deepcopy(event_data['res']))
+ res = event_data.get('res', None)
+ if res and isinstance(res, dict):
+ if 'artifact_data' in res:
+ event_data['artifact_data'] = res['artifact_data']
+
+ if event not in self.EVENTS_WITHOUT_TASK:
+ task = event_data.pop('task', None)
+ else:
+ task = None
+
+ try:
+ event_context.add_local(event=event, **event_data)
+ if task:
+ self.set_task(task, local=True)
+ event_context.dump_begin(sys.stdout)
+ yield
+ finally:
+ event_context.dump_end(sys.stdout)
+ if task:
+ self.clear_task(local=True)
+ event_context.remove_local(event=None, **event_data)
+
+ def set_playbook(self, playbook):
+ # NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.
+ self.playbook_uuid = str(uuid.uuid4())
+ file_name = getattr(playbook, '_file_name', '???')
+ event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid)
+ self.clear_play()
+
+ def set_play(self, play):
+ if hasattr(play, 'hosts'):
+ if isinstance(play.hosts, list):
+ pattern = ','.join(play.hosts)
+ else:
+ pattern = play.hosts
+ else:
+ pattern = ''
+ name = play.get_name().strip() or pattern
+ event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern)
+ self.clear_task()
+
+ def clear_play(self):
+ event_context.remove_global(play=None, play_uuid=None, play_pattern=None)
+ self.clear_task()
+
+ def set_task(self, task, local=False):
+ # FIXME: Task is "global" unless using free strategy!
+ task_ctx = dict(
+ task=(task.name or task.action),
+ task_path=task.get_path(),
+ task_uuid=str(task._uuid),
+ task_action=task.action,
+ )
+ if not task.no_log:
+ task_args = ', '.join(('%s=%s' % a for a in task.args.items()))
+ task_ctx['task_args'] = task_args
+ if getattr(task, '_role', None):
+ task_role = task._role._role_name
+ else:
+ task_role = getattr(task, 'role_name', '')
+ if task_role:
+ task_ctx['role'] = task_role
+ if local:
+ event_context.add_local(**task_ctx)
+ else:
+ event_context.add_global(**task_ctx)
+
+ def clear_task(self, local=False):
+ task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None)
+ if local:
+ event_context.remove_local(**task_ctx)
+ else:
+ event_context.remove_global(**task_ctx)
+
+ def v2_playbook_on_start(self, playbook):
+ self.set_playbook(playbook)
+ event_data = dict(
+ uuid=self.playbook_uuid,
+ )
+ with self.capture_event_data('playbook_on_start', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_start(playbook)
+
+ def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
+ encrypt=None, confirm=False, salt_size=None,
+ salt=None, default=None):
+ event_data = dict(
+ varname=varname,
+ private=private,
+ prompt=prompt,
+ encrypt=encrypt,
+ confirm=confirm,
+ salt_size=salt_size,
+ salt=salt,
+ default=default,
+ )
+ with self.capture_event_data('playbook_on_vars_prompt', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(
+ varname, private, prompt, encrypt, confirm, salt_size, salt,
+ default,
+ )
+
+ def v2_playbook_on_include(self, included_file):
+ event_data = dict(
+ included_file=included_file,
+ )
+ with self.capture_event_data('playbook_on_include', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_include(included_file)
+
+ def v2_playbook_on_play_start(self, play):
+ self.set_play(play)
+ if hasattr(play, 'hosts'):
+ if isinstance(play.hosts, list):
+ pattern = ','.join(play.hosts)
+ else:
+ pattern = play.hosts
+ else:
+ pattern = ''
+ name = play.get_name().strip() or pattern
+ event_data = dict(
+ name=name,
+ pattern=pattern,
+ uuid=str(play._uuid),
+ )
+ with self.capture_event_data('playbook_on_play_start', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_play_start(play)
+
+ def v2_playbook_on_import_for_host(self, result, imported_file):
+ # NOTE: Not used by Ansible 2.x.
+ with self.capture_event_data('playbook_on_import_for_host'):
+ super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file)
+
+ def v2_playbook_on_not_import_for_host(self, result, missing_file):
+ # NOTE: Not used by Ansible 2.x.
+ with self.capture_event_data('playbook_on_not_import_for_host'):
+ super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file)
+
+ def v2_playbook_on_setup(self):
+ # NOTE: Not used by Ansible 2.x.
+ with self.capture_event_data('playbook_on_setup'):
+ super(BaseCallbackModule, self).v2_playbook_on_setup()
+
+ def v2_playbook_on_task_start(self, task, is_conditional):
+ # FIXME: Flag task path output as vv.
+ task_uuid = str(task._uuid)
+ if task_uuid in self.task_uuids:
+ # FIXME: When this task UUID repeats, it means the play is using the
+ # free strategy, so different hosts may be running different tasks
+ # within a play.
+ return
+ self.task_uuids.add(task_uuid)
+ self.set_task(task)
+ event_data = dict(
+ task=task,
+ name=task.get_name(),
+ is_conditional=is_conditional,
+ uuid=task_uuid,
+ )
+ with self.capture_event_data('playbook_on_task_start', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional)
+
+ def v2_playbook_on_cleanup_task_start(self, task):
+ # NOTE: Not used by Ansible 2.x.
+ self.set_task(task)
+ event_data = dict(
+ task=task,
+ name=task.get_name(),
+ uuid=str(task._uuid),
+ is_conditional=True,
+ )
+ with self.capture_event_data('playbook_on_task_start', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task)
+
+ def v2_playbook_on_handler_task_start(self, task):
+ # NOTE: Re-using playbook_on_task_start event for this v2-specific
+ # event, but setting is_conditional=True, which is how v1 identified a
+ # task run as a handler.
+ self.set_task(task)
+ event_data = dict(
+ task=task,
+ name=task.get_name(),
+ uuid=str(task._uuid),
+ is_conditional=True,
+ )
+ with self.capture_event_data('playbook_on_task_start', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task)
+
+ def v2_playbook_on_no_hosts_matched(self):
+ with self.capture_event_data('playbook_on_no_hosts_matched'):
+ super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched()
+
+ def v2_playbook_on_no_hosts_remaining(self):
+ with self.capture_event_data('playbook_on_no_hosts_remaining'):
+ super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining()
+
+ def v2_playbook_on_notify(self, result, handler):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ handler=handler,
+ )
+ with self.capture_event_data('playbook_on_notify', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_notify(result, handler)
+
+ def v2_playbook_on_stats(self, stats):
+ self.clear_play()
+ # FIXME: Add count of plays/tasks.
+ event_data = dict(
+ changed=stats.changed,
+ dark=stats.dark,
+ failures=stats.failures,
+ ok=stats.ok,
+ processed=stats.processed,
+ skipped=stats.skipped,
+ )
+ with self.capture_event_data('playbook_on_stats', **event_data):
+ super(BaseCallbackModule, self).v2_playbook_on_stats(stats)
+
+ def v2_runner_on_ok(self, result):
+ # FIXME: Display detailed results or not based on verbosity.
+ event_data = dict(
+ host=result._host.get_name(),
+ remote_addr=result._host.address,
+ task=result._task,
+ res=result._result,
+ event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
+ )
+ with self.capture_event_data('runner_on_ok', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_ok(result)
+
+ def v2_runner_on_failed(self, result, ignore_errors=False):
+ # FIXME: Add verbosity for exception/results output.
+ event_data = dict(
+ host=result._host.get_name(),
+ remote_addr=result._host.address,
+ res=result._result,
+ task=result._task,
+ ignore_errors=ignore_errors,
+ event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
+ )
+ with self.capture_event_data('runner_on_failed', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors)
+
+ def v2_runner_on_skipped(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ remote_addr=result._host.address,
+ task=result._task,
+ event_loop=result._task.loop if hasattr(result._task, 'loop') else None,
+ )
+ with self.capture_event_data('runner_on_skipped', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_skipped(result)
+
+ def v2_runner_on_unreachable(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ remote_addr=result._host.address,
+ task=result._task,
+ res=result._result,
+ )
+ with self.capture_event_data('runner_on_unreachable', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_unreachable(result)
+
+ def v2_runner_on_no_hosts(self, task):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ task=task,
+ )
+ with self.capture_event_data('runner_on_no_hosts', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_no_hosts(task)
+
+ def v2_runner_on_async_poll(self, result):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ jid=result._result.get('ansible_job_id'),
+ )
+ with self.capture_event_data('runner_on_async_poll', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_async_poll(result)
+
+ def v2_runner_on_async_ok(self, result):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ jid=result._result.get('ansible_job_id'),
+ )
+ with self.capture_event_data('runner_on_async_ok', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_async_ok(result)
+
+ def v2_runner_on_async_failed(self, result):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ jid=result._result.get('ansible_job_id'),
+ )
+ with self.capture_event_data('runner_on_async_failed', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_async_failed(result)
+
+ def v2_runner_on_file_diff(self, result, diff):
+ # NOTE: Not used by Ansible 2.x.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ diff=diff,
+ )
+ with self.capture_event_data('runner_on_file_diff', **event_data):
+ super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff)
+
+ def v2_on_file_diff(self, result):
+ # NOTE: Logged as runner_on_file_diff.
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ diff=result._result.get('diff'),
+ )
+ with self.capture_event_data('runner_on_file_diff', **event_data):
+ super(BaseCallbackModule, self).v2_on_file_diff(result)
+
+ def v2_runner_item_on_ok(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ )
+ with self.capture_event_data('runner_item_on_ok', **event_data):
+ super(BaseCallbackModule, self).v2_runner_item_on_ok(result)
+
+ def v2_runner_item_on_failed(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ )
+ with self.capture_event_data('runner_item_on_failed', **event_data):
+ super(BaseCallbackModule, self).v2_runner_item_on_failed(result)
+
+ def v2_runner_item_on_skipped(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ )
+ with self.capture_event_data('runner_item_on_skipped', **event_data):
+ super(BaseCallbackModule, self).v2_runner_item_on_skipped(result)
+
+ def v2_runner_retry(self, result):
+ event_data = dict(
+ host=result._host.get_name(),
+ task=result._task,
+ res=result._result,
+ )
+ with self.capture_event_data('runner_retry', **event_data):
+ super(BaseCallbackModule, self).v2_runner_retry(result)
+
+
+class TowerDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule):
+
+ CALLBACK_NAME = 'tower_display'
+
+
+class TowerMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule):
+
+ CALLBACK_NAME = 'minimal'
+
+ def v2_playbook_on_play_start(self, play):
+ pass
+
+ def v2_playbook_on_task_start(self, task, is_conditional):
+ self.set_task(task)
diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py
index 4f777cd40e..6381660948 100644
--- a/awx/main/management/commands/run_callback_receiver.py
+++ b/awx/main/management/commands/run_callback_receiver.py
@@ -2,9 +2,7 @@
# All Rights Reserved.
# Python
-import datetime
import logging
-import json
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@@ -12,10 +10,7 @@ from kombu.mixins import ConsumerMixin
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand
-from django.core.cache import cache
from django.db import DatabaseError
-from django.utils.dateparse import parse_datetime
-from django.utils.timezone import FixedOffset
# AWX
from awx.main.models import * # noqa
@@ -36,112 +31,26 @@ class CallbackBrokerWorker(ConsumerMixin):
def process_task(self, body, message):
try:
- if "event" not in body:
- raise Exception("Payload does not have an event")
- if "job_id" not in body:
- raise Exception("Payload does not have a job_id")
+ if 'event' not in body:
+ raise Exception('Payload does not have an event')
+ if 'job_id' not in body and 'ad_hoc_command_id' not in body:
+ raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG:
- logger.info("Body: {}".format(body))
- logger.info("Message: {}".format(message))
- self.process_job_event(body)
+ logger.info('Body: {}'.format(body))
+ logger.info('Message: {}'.format(message))
+ try:
+ if 'job_id' in body:
+ JobEvent.create_from_data(**body)
+ elif 'ad_hoc_command_id' in body:
+ AdHocCommandEvent.create_from_data(**body)
+ except DatabaseError as e:
+ logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc:
import traceback
traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc)
message.ack()
- def process_job_event(self, payload):
- # Get the correct "verbose" value from the job.
- # If for any reason there's a problem, just use 0.
- if 'ad_hoc_command_id' in payload:
- event_type_key = 'ad_hoc_command_id'
- event_object_type = AdHocCommand
- else:
- event_type_key = 'job_id'
- event_object_type = Job
-
- try:
- verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity
- except Exception as e:
- verbose=0
- # TODO: cache
-
- # Convert the datetime for the job event's creation appropriately,
- # and include a time zone for it.
- #
- # In the event of any issue, throw it out, and Django will just save
- # the current time.
- try:
- if not isinstance(payload['created'], datetime.datetime):
- payload['created'] = parse_datetime(payload['created'])
- if not payload['created'].tzinfo:
- payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0))
- except (KeyError, ValueError):
- payload.pop('created', None)
-
- event_uuid = payload.get("uuid", '')
- parent_event_uuid = payload.get("parent_uuid", '')
- artifact_data = payload.get("artifact_data", None)
-
- # Sanity check: Don't honor keys that we don't recognize.
- for key in payload.keys():
- if key not in (event_type_key, 'event', 'event_data',
- 'created', 'counter', 'uuid'):
- payload.pop(key)
-
- try:
- # If we're not in verbose mode, wipe out any module
- # arguments.
- res = payload['event_data'].get('res', {})
- if isinstance(res, dict):
- i = res.get('invocation', {})
- if verbose == 0 and 'module_args' in i:
- i['module_args'] = ''
-
- if 'ad_hoc_command_id' in payload:
- AdHocCommandEvent.objects.create(**data)
- return
-
- j = JobEvent(**payload)
- if payload['event'] == 'playbook_on_start':
- j.save()
- cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
- return
- else:
- if parent_event_uuid:
- parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None)
- if parent_id is None:
- parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id'])
- if parent_id_obj.exists(): # Problematic if not there, means the parent hasn't been written yet... TODO
- j.parent_id = parent_id_obj[0].id
- print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id))
- cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300)
- else:
- print("Cache hit")
- j.parent_id = parent_id
- j.save(post_process=True)
- if event_uuid:
- cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
- except DatabaseError as e:
- logger.error("Database Error Saving Job Event: {}".format(e))
-
- if artifact_data:
- try:
- self.process_artifacts(artifact_data, res, payload)
- except DatabaseError as e:
- logger.error("Database Error Saving Job Artifacts: {}".format(e))
-
- def process_artifacts(self, artifact_data, res, payload):
- artifact_dict = json.loads(artifact_data)
- if res and isinstance(res, dict):
- if res.get('_ansible_no_log', False):
- artifact_dict['_ansible_no_log'] = True
- if artifact_data is not None:
- parent_job = Job.objects.filter(pk=payload['job_id']).first()
- if parent_job is not None and parent_job.artifacts != artifact_dict:
- parent_job.artifacts = artifact_dict
- parent_job.save(update_fields=['artifacts'])
-
class Command(NoArgsCommand):
'''
@@ -158,4 +67,3 @@ class Command(NoArgsCommand):
worker.run()
except KeyboardInterrupt:
print('Terminating Callback Receiver')
-
diff --git a/awx/main/migrations/0045_v310_job_event_stdout.py b/awx/main/migrations/0045_v310_job_event_stdout.py
new file mode 100644
index 0000000000..e3325ddb6b
--- /dev/null
+++ b/awx/main/migrations/0045_v310_job_event_stdout.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('main', '0044_v310_project_playbook_files'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='adhoccommandevent',
+ name='end_line',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='adhoccommandevent',
+ name='start_line',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='adhoccommandevent',
+ name='stdout',
+ field=models.TextField(default=b'', editable=False),
+ ),
+ migrations.AddField(
+ model_name='adhoccommandevent',
+ name='uuid',
+ field=models.CharField(default=b'', max_length=1024, editable=False),
+ ),
+ migrations.AddField(
+ model_name='adhoccommandevent',
+ name='verbosity',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='jobevent',
+ name='end_line',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='jobevent',
+ name='playbook',
+ field=models.CharField(default=b'', max_length=1024, editable=False),
+ ),
+ migrations.AddField(
+ model_name='jobevent',
+ name='start_line',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='jobevent',
+ name='stdout',
+ field=models.TextField(default=b'', editable=False),
+ ),
+ migrations.AddField(
+ model_name='jobevent',
+ name='verbosity',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AlterField(
+ model_name='adhoccommandevent',
+ name='counter',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AlterField(
+ model_name='adhoccommandevent',
+ name='event',
+ field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_skipped', 'Host Skipped'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
+ ),
+ migrations.AlterField(
+ model_name='jobevent',
+ name='counter',
+ field=models.PositiveIntegerField(default=0, editable=False),
+ ),
+ migrations.AlterField(
+ model_name='jobevent',
+ name='event',
+ field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_item_on_ok', 'Item OK'), (b'runner_item_on_failed', 'Item Failed'), (b'runner_item_on_skipped', 'Item Skipped'), (b'runner_retry', 'Host Retry'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_include', 'Including File'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]),
+ ),
+ migrations.AlterUniqueTogether(
+ name='adhoccommandevent',
+ unique_together=set([]),
+ ),
+ migrations.AlterIndexTogether(
+ name='adhoccommandevent',
+ index_together=set([('ad_hoc_command', 'event'), ('ad_hoc_command', 'uuid'), ('ad_hoc_command', 'end_line'), ('ad_hoc_command', 'start_line')]),
+ ),
+ migrations.AlterIndexTogether(
+ name='jobevent',
+ index_together=set([('job', 'event'), ('job', 'parent'), ('job', 'start_line'), ('job', 'uuid'), ('job', 'end_line')]),
+ ),
+ ]
diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py
index b03be56452..65f40427b0 100644
--- a/awx/main/models/ad_hoc_commands.py
+++ b/awx/main/models/ad_hoc_commands.py
@@ -2,6 +2,7 @@
# All Rights Reserved.
# Python
+import datetime
import hmac
import json
import logging
@@ -10,7 +11,9 @@ from urlparse import urljoin
# Django
from django.conf import settings
from django.db import models
+from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
+from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
@@ -257,24 +260,38 @@ class AdHocCommandEvent(CreatedModifiedModel):
('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback).
- #('runner_on_no_hosts', _('No Hosts Matched'), False),
+ # ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that
# does not support check mode).
('runner_on_skipped', _('Host Skipped'), False),
- # Tower does not support async for ad hoc commands.
- #('runner_on_async_poll', _('Host Polling'), False),
- #('runner_on_async_ok', _('Host Async OK'), False),
- #('runner_on_async_failed', _('Host Async Failure'), True),
- # Tower does not yet support --diff mode
- #('runner_on_file_diff', _('File Difference'), False),
+ # Tower does not support async for ad hoc commands (not used in v2).
+ # ('runner_on_async_poll', _('Host Polling'), False),
+ # ('runner_on_async_ok', _('Host Async OK'), False),
+ # ('runner_on_async_failed', _('Host Async Failure'), True),
+ # Tower does not yet support --diff mode.
+ # ('runner_on_file_diff', _('File Difference'), False),
+
+ # Additional event types for captured stdout not directly related to
+ # runner events.
+ ('debug', _('Debug'), False),
+ ('verbose', _('Verbose'), False),
+ ('deprecated', _('Deprecated'), False),
+ ('warning', _('Warning'), False),
+ ('system_warning', _('System Warning'), False),
+ ('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
class Meta:
app_label = 'main'
- unique_together = [('ad_hoc_command', 'host_name')]
ordering = ('-pk',)
+ index_together = [
+ ('ad_hoc_command', 'event'),
+ ('ad_hoc_command', 'uuid'),
+ ('ad_hoc_command', 'start_line'),
+ ('ad_hoc_command', 'end_line'),
+ ]
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
@@ -311,8 +328,30 @@ class AdHocCommandEvent(CreatedModifiedModel):
default=False,
editable=False,
)
+ uuid = models.CharField(
+ max_length=1024,
+ default='',
+ editable=False,
+ )
counter = models.PositiveIntegerField(
default=0,
+ editable=False,
+ )
+ stdout = models.TextField(
+ default='',
+ editable=False,
+ )
+ verbosity = models.PositiveIntegerField(
+ default=0,
+ editable=False,
+ )
+ start_line = models.PositiveIntegerField(
+ default=0,
+ editable=False,
+ )
+ end_line = models.PositiveIntegerField(
+ default=0,
+ editable=False,
)
def get_absolute_url(self):
@@ -350,3 +389,28 @@ class AdHocCommandEvent(CreatedModifiedModel):
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
+
+ @classmethod
+ def create_from_data(self, **kwargs):
+ # Convert the datetime for the ad hoc command event's creation
+ # appropriately, and include a time zone for it.
+ #
+ # In the event of any issue, throw it out, and Django will just save
+ # the current time.
+ try:
+ if not isinstance(kwargs['created'], datetime.datetime):
+ kwargs['created'] = parse_datetime(kwargs['created'])
+ if not kwargs['created'].tzinfo:
+ kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
+ except (KeyError, ValueError):
+ kwargs.pop('created', None)
+
+ # Sanity check: Don't honor keys that we don't recognize.
+ valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created',
+ 'counter', 'uuid', 'stdout', 'start_line', 'end_line',
+ 'verbosity'}
+ for key in kwargs.keys():
+ if key not in valid_keys:
+ kwargs.pop(key)
+
+ return AdHocCommandEvent.objects.create(**kwargs)
diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py
index 1f11e5dee9..a15e291d78 100644
--- a/awx/main/models/jobs.py
+++ b/awx/main/models/jobs.py
@@ -2,6 +2,7 @@
# All Rights Reserved.
# Python
+import datetime
import hmac
import json
import yaml
@@ -11,8 +12,12 @@ from urlparse import urljoin
# Django
from django.conf import settings
+from django.core.cache import cache
from django.db import models
from django.db.models import Q, Count
+from django.utils.dateparse import parse_datetime
+from django.utils.encoding import force_text
+from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
@@ -931,24 +936,29 @@ class JobEvent(CreatedModifiedModel):
# - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play)
- # - playbook_on_import_for_host
- # - playbook_on_not_import_for_host
+ # - playbook_on_import_for_host (not logged, not used for v2)
+ # - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining
- # - playbook_on_setup
+ # - playbook_on_include (only v2 - only used for handlers?)
+ # - playbook_on_setup (not used for v2)
# - runner_on*
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
- # - runner_on_error
+ # - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
- # - runner_on_no_hosts
- # - runner_on_async_poll
- # - runner_on_async_ok
- # - runner_on_async_failed
- # - runner_on_file_diff
- # - playbook_on_notify (once for each notification from the play)
+ # - runner_on_no_hosts (not used for v2)
+ # - runner_on_async_poll (not used for v2)
+ # - runner_on_async_ok (not used for v2)
+ # - runner_on_async_failed (not used for v2)
+ # - runner_on_file_diff (v2 event is v2_on_file_diff)
+ # - runner_item_on_ok (v2 only)
+ # - runner_item_on_failed (v2 only)
+ # - runner_item_on_skipped (v2 only)
+ # - runner_retry (v2 only)
+ # - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats
EVENT_TYPES = [
@@ -962,22 +972,34 @@ class JobEvent(CreatedModifiedModel):
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
- # AWX does not yet support --diff mode
+ (3, 'runner_item_on_ok', _('Item OK'), False),
+ (3, 'runner_item_on_failed', _('Item Failed'), True),
+ (3, 'runner_item_on_skipped', _('Item Skipped'), False),
+ (3, 'runner_retry', _('Host Retry'), False),
+ # Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
+ (2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
- # AWX does not yet support vars_prompt (and will probably hang :)
+ # Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
- # callback will not record this
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
- # callback will not record this
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
+
+ # Additional event types for captured stdout not directly related to
+ # playbook or runner events.
+ (0, 'debug', _('Debug'), False),
+ (0, 'verbose', _('Verbose'), False),
+ (0, 'deprecated', _('Deprecated'), False),
+ (0, 'warning', _('Warning'), False),
+ (0, 'system_warning', _('System Warning'), False),
+ (0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
@@ -986,6 +1008,13 @@ class JobEvent(CreatedModifiedModel):
class Meta:
app_label = 'main'
ordering = ('pk',)
+ index_together = [
+ ('job', 'event'),
+ ('job', 'uuid'),
+ ('job', 'start_line'),
+ ('job', 'end_line'),
+ ('job', 'parent'),
+ ]
job = models.ForeignKey(
'Job',
@@ -1032,12 +1061,17 @@ class JobEvent(CreatedModifiedModel):
related_name='job_events',
editable=False,
)
+ playbook = models.CharField(
+ max_length=1024,
+ default='',
+ editable=False,
+ )
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
- role = models.CharField( # FIXME: Determine from callback or task name.
+ role = models.CharField(
max_length=1024,
default='',
editable=False,
@@ -1057,8 +1091,24 @@ class JobEvent(CreatedModifiedModel):
)
counter = models.PositiveIntegerField(
default=0,
+ editable=False,
+ )
+ stdout = models.TextField(
+ default='',
+ editable=False,
+ )
+ verbosity = models.PositiveIntegerField(
+ default=0,
+ editable=False,
+ )
+ start_line = models.PositiveIntegerField(
+ default=0,
+ editable=False,
+ )
+ end_line = models.PositiveIntegerField(
+ default=0,
+ editable=False,
)
-
def get_absolute_url(self):
return reverse('api:job_event_detail', args=(self.pk,))
@@ -1119,7 +1169,8 @@ class JobEvent(CreatedModifiedModel):
pass
return msg
- def _find_parent(self):
+ def _find_parent_id(self):
+ # Find the (most likely) parent event for this event.
parent_events = set()
if self.event in ('playbook_on_play_start', 'playbook_on_stats',
'playbook_on_vars_prompt'):
@@ -1135,101 +1186,55 @@ class JobEvent(CreatedModifiedModel):
parent_events.add('playbook_on_setup')
parent_events.add('playbook_on_task_start')
if parent_events:
- try:
- qs = JobEvent.objects.filter(job_id=self.job_id)
- if self.pk:
- qs = qs.filter(pk__lt=self.pk, event__in=parent_events)
- else:
- qs = qs.filter(event__in=parent_events)
- return qs.order_by('-pk')[0]
- except IndexError:
- pass
- return None
+ qs = JobEvent.objects.filter(job_id=self.job_id, event__in=parent_events).order_by('-pk')
+ if self.pk:
+ qs = qs.filter(pk__lt=self.pk)
+ return qs.only('id').values_list('id', flat=True).first()
- def save(self, *args, **kwargs):
- from awx.main.models.inventory import Host
- # If update_fields has been specified, add our field names to it,
- # if it hasn't been specified, then we're just doing a normal save.
- update_fields = kwargs.get('update_fields', [])
- # Skip normal checks on save if we're only updating failed/changed
- # flags triggered from a child event.
- from_parent_update = kwargs.pop('from_parent_update', False)
- if not from_parent_update:
- res = self.event_data.get('res', None)
- # Workaround for Ansible 1.2, where the runner_on_async_ok event is
- # created even when the async task failed. Change the event to be
- # correct.
- if self.event == 'runner_on_async_ok':
- try:
- if res.get('failed', False) or res.get('rc', 0) != 0:
- self.event = 'runner_on_async_failed'
- except (AttributeError, TypeError):
- pass
- if self.event in self.FAILED_EVENTS:
- if not self.event_data.get('ignore_errors', False):
- self.failed = True
- if 'failed' not in update_fields:
- update_fields.append('failed')
- if isinstance(res, dict) and res.get('changed', False):
+ def _update_from_event_data(self):
+ # Update job event model fields from event data.
+ updated_fields = set()
+ job = self.job
+ verbosity = job.verbosity
+ event_data = self.event_data
+ res = event_data.get('res', None)
+ if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
+ self.failed = True
+ updated_fields.add('failed')
+ if isinstance(res, dict):
+ if res.get('changed', False):
self.changed = True
- if 'changed' not in update_fields:
- update_fields.append('changed')
- if self.event == 'playbook_on_stats':
- try:
- failures_dict = self.event_data.get('failures', {})
- dark_dict = self.event_data.get('dark', {})
- self.failed = bool(sum(failures_dict.values()) +
- sum(dark_dict.values()))
- if 'failed' not in update_fields:
- update_fields.append('failed')
- changed_dict = self.event_data.get('changed', {})
- self.changed = bool(sum(changed_dict.values()))
- if 'changed' not in update_fields:
- update_fields.append('changed')
- except (AttributeError, TypeError):
- pass
- self.play = self.event_data.get('play', '').strip()
- if 'play' not in update_fields:
- update_fields.append('play')
- self.task = self.event_data.get('task', '').strip()
- if 'task' not in update_fields:
- update_fields.append('task')
- self.role = self.event_data.get('role', '').strip()
- if 'role' not in update_fields:
- update_fields.append('role')
- self.host_name = self.event_data.get('host', '').strip()
- if 'host_name' not in update_fields:
- update_fields.append('host_name')
- # Only update job event hierarchy and related models during post
- # processing (after running job).
- post_process = kwargs.pop('post_process', False)
- if post_process:
+ updated_fields.add('changed')
+ # If we're not in verbose mode, wipe out any module arguments.
+ invocation = res.get('invocation', None)
+ if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation:
+ event_data['res']['invocation']['module_args'] = ''
+ self.event_data = event_data
+ update_fields.add('event_data')
+ if self.event == 'playbook_on_stats':
try:
- if not self.host_id and self.host_name:
- host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name)
- host_id = host_qs.only('id').values_list('id', flat=True)
- if host_id.exists():
- self.host_id = host_id[0]
- if 'host_id' not in update_fields:
- update_fields.append('host_id')
- except (IndexError, AttributeError):
+ failures_dict = event_data.get('failures', {})
+ dark_dict = event_data.get('dark', {})
+ self.failed = bool(sum(failures_dict.values()) +
+ sum(dark_dict.values()))
+ updated_fields.add('failed')
+ changed_dict = event_data.get('changed', {})
+ self.changed = bool(sum(changed_dict.values()))
+ updated_fields.add('changed')
+ except (AttributeError, TypeError):
pass
- if self.parent is None:
- self.parent = self._find_parent()
- if 'parent' not in update_fields:
- update_fields.append('parent')
- super(JobEvent, self).save(*args, **kwargs)
- if post_process and not from_parent_update:
- self.update_parent_failed_and_changed()
- # FIXME: The update_hosts() call (and its queries) are the current
- # performance bottleneck....
- if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
- self.update_hosts()
- self.update_host_summary_from_stats()
+ for field in ('playbook', 'play', 'task', 'role', 'host'):
+ value = force_text(event_data.get(field, '')).strip()
+ if field == 'host':
+ field = 'host_name'
+ if value != getattr(self, field):
+ setattr(self, field, value)
+ updated_fields.add(field)
+ return updated_fields
- def update_parent_failed_and_changed(self):
- # Propagage failed and changed flags to parent events.
- if self.parent:
+ def _update_parent_failed_and_changed(self):
+ # Propagate failed and changed flags to parent events.
+ if self.parent_id:
parent = self.parent
update_fields = []
if self.failed and not parent.failed:
@@ -1240,9 +1245,10 @@ class JobEvent(CreatedModifiedModel):
update_fields.append('changed')
if update_fields:
parent.save(update_fields=update_fields, from_parent_update=True)
- parent.update_parent_failed_and_changed()
+ parent._update_parent_failed_and_changed()
- def update_hosts(self, extra_host_pks=None):
+ def _update_hosts(self, extra_host_pks=None):
+ # Update job event hosts m2m from host_name, propagate to parent events.
from awx.main.models.inventory import Host
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
@@ -1256,16 +1262,14 @@ class JobEvent(CreatedModifiedModel):
pass
qs = Host.objects.filter(inventory__jobs__id=self.job_id)
qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks))
- qs = qs.exclude(job_events__pk=self.id)
- for host in qs.only('id'):
+ qs = qs.exclude(job_events__pk=self.id).only('id')
+ for host in qs:
self.hosts.add(host)
- if self.parent:
- self.parent.update_hosts(self.hosts.only('id').values_list('id', flat=True))
+ if self.parent_id:
+ self.parent._update_hosts(qs.values_list('id', flat=True))
- def update_host_summary_from_stats(self):
+ def _update_host_summary_from_stats(self):
from awx.main.models.inventory import Host
- if self.event != 'playbook_on_stats':
- return
hostnames = set()
try:
for v in self.event_data.values():
@@ -1276,7 +1280,6 @@ class JobEvent(CreatedModifiedModel):
qs = Host.objects.filter(inventory__jobs__id=self.job_id,
name__in=hostnames)
job = self.job
- #for host in qs.only('id', 'name'):
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
@@ -1300,6 +1303,112 @@ class JobEvent(CreatedModifiedModel):
job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id))
+ def save(self, *args, **kwargs):
+ from awx.main.models.inventory import Host
+ # If update_fields has been specified, add our field names to it,
+ # if it hasn't been specified, then we're just doing a normal save.
+ update_fields = kwargs.get('update_fields', [])
+ # Update model fields and related objects unless we're only updating
+ # failed/changed flags triggered from a child event.
+ from_parent_update = kwargs.pop('from_parent_update', False)
+ if not from_parent_update:
+ # Update model fields from event data.
+ updated_fields = self._update_from_event_data()
+ for field in updated_fields:
+ if field not in update_fields:
+ update_fields.append(field)
+ # Update host related field from host_name.
+ if not self.host_id and self.host_name:
+ host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name)
+ host_id = host_qs.only('id').values_list('id', flat=True).first()
+ if host_id != self.host_id:
+ self.host_id = host_id
+ if 'host_id' not in update_fields:
+ update_fields.append('host_id')
+ # Update parent related field if not set.
+ if self.parent_id is None:
+ self.parent_id = self._find_parent_id()
+ if self.parent_id and 'parent_id' not in update_fields:
+ update_fields.append('parent_id')
+ super(JobEvent, self).save(*args, **kwargs)
+ # Update related objects after this event is saved.
+ if not from_parent_update:
+ if self.parent_id:
+ self._update_parent_failed_and_changed()
+ # FIXME: The update_hosts() call (and its queries) are the current
+ # performance bottleneck....
+ if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
+ self._update_hosts()
+ if self.event == 'playbook_on_stats':
+ self._update_host_summary_from_stats()
+
+ @classmethod
+ def create_from_data(self, **kwargs):
+ # Must have a job_id specified.
+ if not kwargs.get('job_id', None):
+ return
+
+ # Convert the datetime for the job event's creation appropriately,
+ # and include a time zone for it.
+ #
+ # In the event of any issue, throw it out, and Django will just save
+ # the current time.
+ try:
+ if not isinstance(kwargs['created'], datetime.datetime):
+ kwargs['created'] = parse_datetime(kwargs['created'])
+ if not kwargs['created'].tzinfo:
+ kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
+ except (KeyError, ValueError):
+ kwargs.pop('created', None)
+
+ # Save UUID and parent UUID for determining parent-child relationship.
+ job_event_uuid = kwargs.get('uuid', None)
+ parent_event_uuid = kwargs.get('parent_uuid', None)
+ artifact_data = kwargs.get('artifact_data', None)
+
+ # Sanity check: Don't honor keys that we don't recognize.
+ valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play',
+ 'role', 'task', 'created', 'counter', 'uuid', 'stdout',
+ 'start_line', 'end_line', 'verbosity'}
+ for key in kwargs.keys():
+ if key not in valid_keys:
+ kwargs.pop(key)
+
+ # Try to find a parent event based on UUID.
+ if parent_event_uuid:
+ cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid)
+ parent_id = cache.get(cache_key)
+ if parent_id is None:
+ parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first()
+ if parent_id:
+ print("Settings cache: {} with value {}".format(cache_key, parent_id))
+ cache.set(cache_key, parent_id, 300)
+ if parent_id:
+ kwargs['parent_id'] = parent_id
+
+ job_event = JobEvent.objects.create(**kwargs)
+
+ # Cache this job event ID vs. UUID for future parent lookups.
+ if job_event_uuid:
+ cache_key = '{}_{}'.format(kwargs['job_id'], job_event_uuid)
+ cache.set(cache_key, job_event.id, 300)
+
+ # Save artifact data to parent job (if provided).
+ if artifact_data:
+ artifact_dict = json.loads(artifact_data)
+ event_data = kwargs.get('event_data', None)
+ if event_data and isinstance(event_data, dict):
+ res = event_data.get('res', None)
+ if res and isinstance(res, dict):
+ if res.get('_ansible_no_log', False):
+ artifact_dict['_ansible_no_log'] = True
+ parent_job = Job.objects.filter(pk=kwargs['job_id']).first()
+ if parent_job and parent_job.artifacts != artifact_dict:
+ parent_job.artifacts = artifact_dict
+ parent_job.save(update_fields=['artifacts'])
+
+ return job_event
+
@classmethod
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
'''
diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py
index 677be8136d..674bedbffe 100644
--- a/awx/main/models/unified_jobs.py
+++ b/awx/main/models/unified_jobs.py
@@ -696,8 +696,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
return StringIO(msg['missing' if self.finished else 'pending'])
def _escape_ascii(self, content):
- ansi_escape = re.compile(r'\x1b[^m]*m')
- return ansi_escape.sub('', content)
+ # Remove ANSI escape sequences used to embed event data.
+ content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
+ # Remove ANSI color escape sequences.
+ content = re.sub(r'\x1b[^m]*m', '', content)
+ return content
def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False):
content = self.result_stdout_raw_handle().read()
diff --git a/awx/main/queue.py b/awx/main/queue.py
index b0b8d0374e..bfb487441f 100644
--- a/awx/main/queue.py
+++ b/awx/main/queue.py
@@ -1,9 +1,19 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
+# Python
import json
+import logging
+import os
+
+# Django
+from django.conf import settings
+
+# Kombu
+from kombu import Connection, Exchange, Producer
+
+__all__ = ['FifoQueue', 'CallbackQueueDispatcher']
-__all__ = ['FifoQueue']
# TODO: Figure out wtf to do with this class
class FifoQueue(object):
@@ -33,3 +43,39 @@ class FifoQueue(object):
answer = None
if answer:
return json.loads(answer)
+
+
+class CallbackQueueDispatcher(object):
+
+ def __init__(self):
+ self.callback_connection = getattr(settings, 'CALLBACK_CONNECTION', None)
+ self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
+ self.connection = None
+ self.exchange = None
+ self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
+
+ def dispatch(self, obj):
+ if not self.callback_connection or not self.connection_queue:
+ return
+ active_pid = os.getpid()
+ for retry_count in xrange(4):
+ try:
+ if not hasattr(self, 'connection_pid'):
+ self.connection_pid = active_pid
+ if self.connection_pid != active_pid:
+ self.connection = None
+ if self.connection is None:
+ self.connection = Connection(self.callback_connection)
+ self.exchange = Exchange(self.connection_queue, type='direct')
+
+ producer = Producer(self.connection)
+ producer.publish(obj,
+ serializer='json',
+ compression='bzip2',
+ exchange=self.exchange,
+ declare=[self.exchange],
+ routing_key=self.connection_queue)
+ return
+ except Exception, e:
+ self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
+ retry_count, exc_info=True)
diff --git a/awx/main/tasks.py b/awx/main/tasks.py
index dcb2bc5258..35858577a7 100644
--- a/awx/main/tasks.py
+++ b/awx/main/tasks.py
@@ -47,9 +47,11 @@ from django.contrib.auth.models import User
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa
from awx.main.models import UnifiedJob
+from awx.main.queue import CallbackQueueDispatcher
from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
- check_proot_installed, build_proot_temp_dir, wrap_args_with_proot)
+ check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
+ OutputEventFilter)
from awx.main.consumers import emit_channel_notification
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
@@ -397,6 +399,8 @@ class BaseTask(Task):
if os.path.isdir(os.path.join(venv_libdir, python_ver)):
env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":"
break
+ # Add awx/lib to PYTHONPATH.
+ env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')]))
return env
def add_tower_venv(self, env):
@@ -494,6 +498,17 @@ class BaseTask(Task):
'''
return OrderedDict()
+ def get_stdout_handle(self, instance):
+ '''
+ Return an open file object for capturing stdout.
+ '''
+ if not os.path.exists(settings.JOBOUTPUT_ROOT):
+ os.makedirs(settings.JOBOUTPUT_ROOT)
+ stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1())))
+ stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
+ assert stdout_handle.name == stdout_filename
+ return stdout_handle
+
def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle,
output_replacements=None, extra_update_fields=None):
'''
@@ -643,10 +658,7 @@ class BaseTask(Task):
cwd = self.build_cwd(instance, **kwargs)
env = self.build_env(instance, **kwargs)
safe_env = self.build_safe_env(instance, **kwargs)
- if not os.path.exists(settings.JOBOUTPUT_ROOT):
- os.makedirs(settings.JOBOUTPUT_ROOT)
- stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (pk, str(uuid.uuid1())))
- stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
+ stdout_handle = self.get_stdout_handle(instance)
if self.should_use_proot(instance, **kwargs):
if not check_proot_installed():
raise RuntimeError('proot is not installed')
@@ -660,7 +672,7 @@ class BaseTask(Task):
args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args),
- job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
+ job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name)
status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle,
extra_update_fields=extra_update_fields)
except Exception:
@@ -779,6 +791,7 @@ class RunJob(BaseTask):
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
+ env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display'
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -974,6 +987,25 @@ class RunJob(BaseTask):
d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password'
return d
+ def get_stdout_handle(self, instance):
+ '''
+ Wrap stdout file object to capture events.
+ '''
+ stdout_handle = super(RunJob, self).get_stdout_handle(instance)
+
+ if getattr(settings, 'USE_CALLBACK_QUEUE', False):
+ dispatcher = CallbackQueueDispatcher()
+
+ def job_event_callback(event_data):
+ event_data.setdefault('job_id', instance.id)
+ dispatcher.dispatch(event_data)
+ else:
+ def job_event_callback(event_data):
+ event_data.setdefault('job_id', instance.id)
+ JobEvent.create_from_data(**event_data)
+
+ return OutputEventFilter(stdout_handle, job_event_callback)
+
def get_ssh_key_path(self, instance, **kwargs):
'''
If using an SSH key, return the path for use by ssh-agent.
@@ -1019,11 +1051,6 @@ class RunJob(BaseTask):
pass
else:
update_inventory_computed_fields.delay(inventory.id, True)
- # Update job event fields after job has completed (only when using REST
- # API callback).
- if not getattr(settings, 'CALLBACK_CONSUMER_PORT', None) and not getattr(settings, 'CALLBACK_QUEUE', None):
- for job_event in job.job_events.order_by('pk'):
- job_event.save(post_process=True)
class RunProjectUpdate(BaseTask):
@@ -1597,6 +1624,7 @@ class RunAdHocCommand(BaseTask):
env['INVENTORY_HOSTVARS'] = str(True)
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
+ env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline).
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = ad_hoc_command.task_auth_token or ''
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
@@ -1693,6 +1721,25 @@ class RunAdHocCommand(BaseTask):
d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password'
return d
+ def get_stdout_handle(self, instance):
+ '''
+ Wrap stdout file object to capture events.
+ '''
+ stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance)
+
+ if getattr(settings, 'USE_CALLBACK_QUEUE', False):
+ dispatcher = CallbackQueueDispatcher()
+
+ def ad_hoc_command_event_callback(event_data):
+ event_data.setdefault('ad_hoc_command_id', instance.id)
+ dispatcher.dispatch(event_data)
+ else:
+ def ad_hoc_command_event_callback(event_data):
+ event_data.setdefault('ad_hoc_command_id', instance.id)
+ AdHocCommandEvent.create_from_data(**event_data)
+
+ return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)
+
def get_ssh_key_path(self, instance, **kwargs):
'''
If using an SSH key, return the path for use by ssh-agent.
diff --git a/awx/main/utils.py b/awx/main/utils.py
index c652f8e166..76f28f090a 100644
--- a/awx/main/utils.py
+++ b/awx/main/utils.py
@@ -4,6 +4,7 @@
# Python
import base64
import hashlib
+import json
import logging
import os
import re
@@ -36,7 +37,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
- 'get_current_apps', 'set_current_apps']
+ 'get_current_apps', 'set_current_apps', 'OutputEventFilter']
def get_object_or_400(klass, *args, **kwargs):
@@ -640,3 +641,71 @@ def set_current_apps(apps):
def get_current_apps():
global current_apps
return current_apps
+
+
+class OutputEventFilter(object):
+ '''
+ File-like object that looks for encoded job events in stdout data.
+ '''
+
+ EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
+
+ def __init__(self, fileobj=None, event_callback=None):
+ self._fileobj = fileobj
+ self._event_callback = event_callback
+ self._counter = 1
+ self._start_line = 0
+ self._buffer = ''
+ self._current_event_data = None
+
+ def __getattr__(self, attr):
+ return getattr(self._fileobj, attr)
+
+ def write(self, data):
+ if self._fileobj:
+ self._fileobj.write(data)
+ self._buffer += data
+ while True:
+ match = self.EVENT_DATA_RE.search(self._buffer)
+ if not match:
+ break
+ try:
+ base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1))
+ event_data = json.loads(base64.b64decode(base64_data))
+ except ValueError:
+ event_data = {}
+ self._emit_event(self._buffer[:match.start()], event_data)
+ self._buffer = self._buffer[match.end():]
+
+ def close(self):
+ if self._fileobj:
+ self._fileobj.close()
+ if self._buffer:
+ self._emit_event(self._buffer)
+ self._buffer = ''
+
+ def _emit_event(self, buffered_stdout, next_event_data=None):
+ if self._current_event_data:
+ event_data = self._current_event_data
+ stdout_chunks = [buffered_stdout]
+ elif buffered_stdout:
+ event_data = dict(event='verbose')
+ stdout_chunks = buffered_stdout.splitlines(True)
+ else:
+ stdout_chunks = []
+
+ for stdout_chunk in stdout_chunks:
+ event_data['counter'] = self._counter
+ self._counter += 1
+ event_data['stdout'] = stdout_chunk
+ n_lines = stdout_chunk.count('\n')
+ event_data['start_line'] = self._start_line
+ event_data['end_line'] = self._start_line + n_lines
+ self._start_line += n_lines
+ if self._event_callback:
+ self._event_callback(event_data)
+
+ if next_event_data.get('uuid', None):
+ self._current_event_data = next_event_data
+ else:
+ self._current_event_data = None
diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py
deleted file mode 100644
index 67f36612f6..0000000000
--- a/awx/plugins/callback/job_event_callback.py
+++ /dev/null
@@ -1,579 +0,0 @@
-# Copyright (c) 2015 Ansible, Inc.
-# This file is a utility Ansible plugin that is not part of the AWX or Ansible
-# packages. It does not import any code from either package, nor does its
-# license apply to Ansible or AWX.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#
-# Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-#
-# Neither the name of the nor the names of its contributors
-# may be used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-# Python
-import datetime
-import glob
-import json
-import logging
-import os
-import pwd
-import urlparse
-import re
-from copy import deepcopy
-from uuid import uuid4
-
-# Kombu
-from kombu import Connection, Exchange, Producer
-
-# Requests
-import requests
-
-import psutil
-
-CENSOR_FIELD_WHITELIST = [
- 'msg',
- 'failed',
- 'changed',
- 'results',
- 'start',
- 'end',
- 'delta',
- 'cmd',
- '_ansible_no_log',
- 'rc',
- 'failed_when_result',
- 'skipped',
- 'skip_reason',
-]
-
-def censor(obj, no_log=False):
- if not isinstance(obj, dict):
- if no_log:
- return "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
- return obj
- if obj.get('_ansible_no_log', no_log):
- new_obj = {}
- for k in CENSOR_FIELD_WHITELIST:
- if k in obj:
- new_obj[k] = obj[k]
- if k == 'cmd' and k in obj:
- if isinstance(obj['cmd'], list):
- obj['cmd'] = ' '.join(obj['cmd'])
- if re.search(r'\s', obj['cmd']):
- new_obj['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$',
- r'\1 ',
- obj['cmd'])
- new_obj['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
- obj = new_obj
- if 'results' in obj:
- if isinstance(obj['results'], list):
- for i in xrange(len(obj['results'])):
- obj['results'][i] = censor(obj['results'][i], obj.get('_ansible_no_log', no_log))
- elif obj.get('_ansible_no_log', False):
- obj['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result"
- return obj
-
-
-class TokenAuth(requests.auth.AuthBase):
-
- def __init__(self, token):
- self.token = token
-
- def __call__(self, request):
- request.headers['Authorization'] = 'Token %s' % self.token
- return request
-
-
-# TODO: non v2_ events are deprecated and should be purge/refactored out
-class BaseCallbackModule(object):
- '''
- Callback module for logging ansible-playbook job events via the REST API.
- '''
-
- def __init__(self):
- self.base_url = os.getenv('REST_API_URL', '')
- self.auth_token = os.getenv('REST_API_TOKEN', '')
- self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
- self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
- self.connection = None
- self.exchange = None
- self._init_logging()
- self._init_connection()
- self.counter = 0
- self.active_playbook = None
- self.active_play = None
- self.active_task = None
-
- def _init_logging(self):
- try:
- self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
- except ValueError:
- self.job_callback_debug = 0
- self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
- if self.job_callback_debug >= 2:
- self.logger.setLevel(logging.DEBUG)
- elif self.job_callback_debug >= 1:
- self.logger.setLevel(logging.INFO)
- else:
- self.logger.setLevel(logging.WARNING)
- handler = logging.StreamHandler()
- formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
- self.logger.propagate = False
-
- def _init_connection(self):
- self.connection = None
-
- def _start_connection(self):
- self.connection = Connection(self.callback_connection)
- self.exchange = Exchange(self.connection_queue, type='direct')
-
- def _post_job_event_queue_msg(self, event, event_data):
- self.counter += 1
- msg = {
- 'event': event,
- 'event_data': event_data,
- 'counter': self.counter,
- 'created': datetime.datetime.utcnow().isoformat(),
- }
- if event in ('playbook_on_play_start',
- 'playbook_on_stats',
- 'playbook_on_vars_prompt'):
- msg['parent_uuid'] = str(self.active_playbook)
- elif event in ('playbook_on_notify',
- 'playbook_on_setup',
- 'playbook_on_task_start',
- 'playbook_on_no_hosts_matched',
- 'playbook_on_no_hosts_remaining',
- 'playbook_on_include',
- 'playbook_on_import_for_host',
- 'playbook_on_not_import_for_host'):
- msg['parent_uuid'] = str(self.active_play)
- elif event.startswith('runner_on_') or event.startswith('runner_item_on_'):
- msg['parent_uuid'] = str(self.active_task)
- else:
- msg['parent_uuid'] = ''
-
- if "uuid" in event_data:
- msg['uuid'] = str(event_data['uuid'])
- else:
- msg['uuid'] = ''
-
- if getattr(self, 'job_id', None):
- msg['job_id'] = self.job_id
- if getattr(self, 'ad_hoc_command_id', None):
- msg['ad_hoc_command_id'] = self.ad_hoc_command_id
-
- if getattr(self, 'artifact_data', None):
- msg['artifact_data'] = self.artifact_data
-
- active_pid = os.getpid()
- if self.job_callback_debug:
- msg.update({
- 'pid': active_pid,
- })
- for retry_count in xrange(4):
- try:
- if not hasattr(self, 'connection_pid'):
- self.connection_pid = active_pid
- if self.connection_pid != active_pid:
- self._init_connection()
- if self.connection is None:
- self._start_connection()
-
- producer = Producer(self.connection)
- producer.publish(msg,
- serializer='json',
- compression='bzip2',
- exchange=self.exchange,
- declare=[self.exchange],
- routing_key=self.connection_queue)
- return
- except Exception, e:
- self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
- retry_count, exc_info=True)
- retry_count += 1
- if retry_count >= 3:
- break
-
- def _post_rest_api_event(self, event, event_data):
- data = json.dumps({
- 'event': event,
- 'event_data': event_data,
- })
- parts = urlparse.urlsplit(self.base_url)
- if parts.username and parts.password:
- auth = (parts.username, parts.password)
- elif self.auth_token:
- auth = TokenAuth(self.auth_token)
- else:
- auth = None
- port = parts.port or (443 if parts.scheme == 'https' else 80)
- url = urlparse.urlunsplit([parts.scheme,
- '%s:%d' % (parts.hostname, port),
- parts.path, parts.query, parts.fragment])
- url = urlparse.urljoin(url, self.rest_api_path)
- headers = {'content-type': 'application/json'}
- response = requests.post(url, data=data, headers=headers, auth=auth)
- response.raise_for_status()
-
- def _log_event(self, event, **event_data):
- if 'res' in event_data:
- event_data['res'] = censor(deepcopy(event_data['res']))
-
- if self.callback_connection:
- self._post_job_event_queue_msg(event, event_data)
- else:
- self._post_rest_api_event(event, event_data)
-
- def on_any(self, *args, **kwargs):
- pass
-
- def runner_on_failed(self, host, res, ignore_errors=False):
- self._log_event('runner_on_failed', host=host, res=res,
- ignore_errors=ignore_errors)
-
- def v2_runner_on_failed(self, result, ignore_errors=False):
- event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
- self._log_event('runner_on_failed', host=result._host.name,
- res=result._result, task=result._task,
- ignore_errors=ignore_errors, event_loop=event_is_loop)
-
- def runner_on_ok(self, host, res):
- self._log_event('runner_on_ok', host=host, res=res)
-
- def v2_runner_on_ok(self, result):
- event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
- self._log_event('runner_on_ok', host=result._host.name,
- task=result._task, res=result._result,
- event_loop=event_is_loop)
-
- def runner_on_error(self, host, msg):
- self._log_event('runner_on_error', host=host, msg=msg)
-
- def v2_runner_on_error(self, result):
- pass # Currently not implemented in v2
-
- def runner_on_skipped(self, host, item=None):
- self._log_event('runner_on_skipped', host=host, item=item)
-
- def v2_runner_on_skipped(self, result):
- event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None
- self._log_event('runner_on_skipped', host=result._host.name,
- task=result._task, event_loop=event_is_loop)
-
- def runner_on_unreachable(self, host, res):
- self._log_event('runner_on_unreachable', host=host, res=res)
-
- def v2_runner_on_unreachable(self, result):
- self._log_event('runner_on_unreachable', host=result._host.name,
- task=result._task, res=result._result)
-
- def runner_on_no_hosts(self):
- self._log_event('runner_on_no_hosts')
-
- def v2_runner_on_no_hosts(self, task):
- self._log_event('runner_on_no_hosts', task=task)
-
- # V2 does not use the _on_async callbacks (yet).
-
- def runner_on_async_poll(self, host, res, jid, clock):
- self._log_event('runner_on_async_poll', host=host, res=res, jid=jid,
- clock=clock)
-
- def runner_on_async_ok(self, host, res, jid):
- self._log_event('runner_on_async_ok', host=host, res=res, jid=jid)
-
- def runner_on_async_failed(self, host, res, jid):
- self._log_event('runner_on_async_failed', host=host, res=res, jid=jid)
-
- def runner_on_file_diff(self, host, diff):
- self._log_event('runner_on_file_diff', host=host, diff=diff)
-
- def v2_runner_on_file_diff(self, result, diff):
- self._log_event('runner_on_file_diff', host=result._host.name,
- task=result._task, diff=diff)
-
- def v2_runner_item_on_ok(self, result):
- self._log_event('runner_item_on_ok', res=result._result, host=result._host.name,
- task=result._task)
-
- def v2_runner_item_on_failed(self, result):
- self._log_event('runner_item_on_failed', res=result._result, host=result._host.name,
- task=result._task)
-
- def v2_runner_item_on_skipped(self, result):
- self._log_event('runner_item_on_skipped', res=result._result, host=result._host.name,
- task=result._task)
-
- @staticmethod
- def terminate_ssh_control_masters():
- # Determine if control persist is being used and if any open sockets
- # exist after running the playbook.
- cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '')
- if not cp_path:
- return
- cp_dir = os.path.dirname(cp_path)
- if not os.path.exists(cp_dir):
- return
- cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*')
- cp_files = glob.glob(cp_pattern)
- if not cp_files:
- return
-
- # Attempt to find any running control master processes.
- username = pwd.getpwuid(os.getuid())[0]
- ssh_cm_procs = []
- for proc in psutil.process_iter():
- try:
- pname = proc.name()
- pcmdline = proc.cmdline()
- pusername = proc.username()
- except psutil.NoSuchProcess:
- continue
- if pusername != username:
- continue
- if pname != 'ssh':
- continue
- for cp_file in cp_files:
- if pcmdline and cp_file in pcmdline[0]:
- ssh_cm_procs.append(proc)
- break
-
- # Terminate then kill control master processes. Workaround older
- # version of psutil that may not have wait_procs implemented.
- for proc in ssh_cm_procs:
- proc.terminate()
- procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5)
- for proc in procs_alive:
- proc.kill()
-
-
-class JobCallbackModule(BaseCallbackModule):
- '''
- Callback module for logging ansible-playbook job events via the REST API.
- '''
-
- # These events should never have an associated play.
- EVENTS_WITHOUT_PLAY = [
- 'playbook_on_start',
- 'playbook_on_stats',
- ]
- # These events should never have an associated task.
- EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [
- 'playbook_on_setup',
- 'playbook_on_notify',
- 'playbook_on_import_for_host',
- 'playbook_on_not_import_for_host',
- 'playbook_on_no_hosts_matched',
- 'playbook_on_no_hosts_remaining',
- ]
-
- def __init__(self):
- self.job_id = int(os.getenv('JOB_ID', '0'))
- self.rest_api_path = '/api/v1/jobs/%d/job_events/' % self.job_id
- super(JobCallbackModule, self).__init__()
-
- def _log_event(self, event, **event_data):
- play = getattr(self, 'play', None)
- play_name = getattr(play, 'name', '')
- if play_name and event not in self.EVENTS_WITHOUT_PLAY:
- event_data['play'] = play_name
- task = event_data.pop('task', None) or getattr(self, 'task', None)
- task_name = None
- role_name = None
- if task:
- if hasattr(task, 'get_name'):
- # in v2, the get_name() method creates the name
- task_name = task.get_name()
- else:
- # v1 datastructure
- task_name = getattr(task, 'name', '')
- if hasattr(task, '_role') and task._role:
- # v2 datastructure
- role_name = task._role._role_name
- else:
- # v1 datastructure
- role_name = getattr(task, 'role_name', '')
- if task_name and event not in self.EVENTS_WITHOUT_TASK:
- event_data['task'] = task_name
- if role_name and event not in self.EVENTS_WITHOUT_TASK:
- event_data['role'] = role_name
- self.artifact_data = None
- if 'res' in event_data and 'artifact_data' in event_data['res']:
- self.artifact_data = event_data['res']['artifact_data']
- super(JobCallbackModule, self)._log_event(event, **event_data)
-
- def playbook_on_start(self):
- self._log_event('playbook_on_start')
-
- def v2_playbook_on_start(self, playbook):
- # NOTE: the playbook parameter was added late in Ansible 2.0 development
- # so we don't currently utilize but could later.
- # NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them
- self.active_playbook = str(uuid4())
- self._log_event('playbook_on_start', uuid=self.active_playbook)
-
- def playbook_on_notify(self, host, handler):
- self._log_event('playbook_on_notify', host=host, handler=handler)
-
- def v2_playbook_on_notify(self, result, handler):
- self._log_event('playbook_on_notify', host=result._host.name,
- task=result._task, handler=handler)
-
- def playbook_on_no_hosts_matched(self):
- self._log_event('playbook_on_no_hosts_matched')
-
- def v2_playbook_on_no_hosts_matched(self):
- # since there is no task/play info, this is currently identical
- # to the v1 callback which does the same thing
- self.playbook_on_no_hosts_matched()
-
- def playbook_on_no_hosts_remaining(self):
- self._log_event('playbook_on_no_hosts_remaining')
-
- def v2_playbook_on_no_hosts_remaining(self):
- # since there is no task/play info, this is currently identical
- # to the v1 callback which does the same thing
- self.playbook_on_no_hosts_remaining()
-
- def playbook_on_task_start(self, name, is_conditional):
- self._log_event('playbook_on_task_start', name=name,
- is_conditional=is_conditional)
-
- def v2_playbook_on_task_start(self, task, is_conditional):
- self.active_task = task._uuid
- self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
- name=task.get_name(), is_conditional=is_conditional)
-
- def v2_playbook_on_cleanup_task_start(self, task):
- # re-using playbook_on_task_start event here for this v2-specific
- # event, though we may consider any changes necessary to distinguish
- # this from a normal task
- self.active_task = task._uuid
- self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
- name=task.get_name())
-
- def playbook_on_vars_prompt(self, varname, private=True, prompt=None,
- encrypt=None, confirm=False, salt_size=None,
- salt=None, default=None):
- self._log_event('playbook_on_vars_prompt', varname=varname,
- private=private, prompt=prompt, encrypt=encrypt,
- confirm=confirm, salt_size=salt_size, salt=salt,
- default=default)
-
- def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None,
- encrypt=None, confirm=False, salt_size=None,
- salt=None, default=None):
- pass # not currently used in v2 (yet)
-
- def playbook_on_setup(self):
- self._log_event('playbook_on_setup')
-
- def v2_playbook_on_setup(self):
- pass # not currently used in v2 (yet)
-
- def playbook_on_import_for_host(self, host, imported_file):
- # don't care about recording this one
- # self._log_event('playbook_on_import_for_host', host=host,
- # imported_file=imported_file)
- pass
-
- def v2_playbook_on_import_for_host(self, result, imported_file):
- pass # not currently used in v2 (yet)
-
- def playbook_on_not_import_for_host(self, host, missing_file):
- # don't care about recording this one
- #self._log_event('playbook_on_not_import_for_host', host=host,
- # missing_file=missing_file)
- pass
-
- def v2_playbook_on_not_import_for_host(self, result, missing_file):
- pass # not currently used in v2 (yet)
-
- def playbook_on_play_start(self, name):
- # Only play name is passed via callback, get host pattern from the play.
- pattern = getattr(getattr(self, 'play', None), 'hosts', name)
- self._log_event('playbook_on_play_start', name=name, pattern=pattern)
-
- def v2_playbook_on_play_start(self, play):
- setattr(self, 'play', play)
- # Ansible 2.0.0.2 doesn't default .name to hosts like it did in 1.9.4,
- # though that default will likely return in a future version of Ansible.
- if (not hasattr(play, 'name') or not play.name) and hasattr(play, 'hosts'):
- if isinstance(play.hosts, list):
- play.name = ','.join(play.hosts)
- else:
- play.name = play.hosts
- self.active_play = play._uuid
- self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid),
- pattern=play.hosts)
-
- def playbook_on_stats(self, stats):
- d = {}
- for attr in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
- d[attr] = getattr(stats, attr)
- self._log_event('playbook_on_stats', **d)
- self.terminate_ssh_control_masters()
-
- def v2_playbook_on_stats(self, stats):
- self.playbook_on_stats(stats)
-
- def v2_playbook_on_include(self, included_file):
- self._log_event('playbook_on_include', included_file=included_file)
-
-class AdHocCommandCallbackModule(BaseCallbackModule):
- '''
- Callback module for logging ansible ad hoc events via ZMQ or the REST API.
- '''
-
- def __init__(self):
- self.ad_hoc_command_id = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
- self.rest_api_path = '/api/v1/ad_hoc_commands/%d/events/' % self.ad_hoc_command_id
- self.skipped_hosts = set()
- super(AdHocCommandCallbackModule, self).__init__()
-
- def _log_event(self, event, **event_data):
- # Ignore task for ad hoc commands (with v2).
- event_data.pop('task', None)
- super(AdHocCommandCallbackModule, self)._log_event(event, **event_data)
-
- def runner_on_file_diff(self, host, diff):
- pass # Ignore file diff for ad hoc commands.
-
- def runner_on_ok(self, host, res):
- # When running in check mode using a module that does not support check
- # mode, Ansible v1.9 will call runner_on_skipped followed by
- # runner_on_ok for the same host; only capture the skipped event and
- # ignore the ok event.
- if host not in self.skipped_hosts:
- super(AdHocCommandCallbackModule, self).runner_on_ok(host, res)
-
- def runner_on_skipped(self, host, item=None):
- super(AdHocCommandCallbackModule, self).runner_on_skipped(host, item)
- self.skipped_hosts.add(host)
-
-if os.getenv('JOB_ID', ''):
- CallbackModule = JobCallbackModule
-elif os.getenv('AD_HOC_COMMAND_ID', ''):
- CallbackModule = AdHocCommandCallbackModule
diff --git a/awx/plugins/callback/minimal.py b/awx/plugins/callback/minimal.py
new file mode 100644
index 0000000000..fcbaa76d55
--- /dev/null
+++ b/awx/plugins/callback/minimal.py
@@ -0,0 +1,30 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import os
+import sys
+
+# Add awx/lib to sys.path.
+awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
+if awx_lib_path not in sys.path:
+ sys.path.insert(0, awx_lib_path)
+
+# Tower Display Callback
+from tower_display_callback import TowerMinimalCallbackModule as CallbackModule # noqa
diff --git a/awx/plugins/callback/tower_display.py b/awx/plugins/callback/tower_display.py
new file mode 100644
index 0000000000..725232dfe4
--- /dev/null
+++ b/awx/plugins/callback/tower_display.py
@@ -0,0 +1,30 @@
+# Copyright (c) 2016 Ansible by Red Hat, Inc.
+#
+# This file is part of Ansible Tower, but depends on code imported from Ansible.
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+
+from __future__ import (absolute_import, division, print_function)
+
+# Python
+import os
+import sys
+
+# Add awx/lib to sys.path.
+awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib'))
+if awx_lib_path not in sys.path:
+ sys.path.insert(0, awx_lib_path)
+
+# Tower Display Callback
+from tower_display_callback import TowerDefaultCallbackModule as CallbackModule # noqa