From c18b6c13521ce19c153707c7ae34a537553af47d Mon Sep 17 00:00:00 2001 From: Chris Church Date: Sat, 22 Oct 2016 00:15:49 -0400 Subject: [PATCH 1/5] Add support for capturing stdout associated with job events and ad hoc command events. * New event types for stdout lines not associated with a callback event. * New stdout, start_line, end_line and verbosity fields for job/ahc events. * Callback plugins to wrap Ansible default/minimal stdout callbacks and embed callback event data using ANSI escape sequences. * Callback plugin library to wrap ansible.display.Display class methods. * Output filter to extract event data from stdout and create job/ahc events. * Update stdout formats to strip new ANSI escape sequences. --- awx/api/permissions.py | 2 - awx/api/serializers.py | 16 +- awx/api/views.py | 39 +- awx/lib/sitecustomize.py | 22 + awx/lib/tower_display_callback/__init__.py | 25 + awx/lib/tower_display_callback/cleanup.py | 72 +++ awx/lib/tower_display_callback/display.py | 92 +++ awx/lib/tower_display_callback/events.py | 138 +++++ awx/lib/tower_display_callback/minimal.py | 28 + awx/lib/tower_display_callback/module.py | 443 ++++++++++++++ .../commands/run_callback_receiver.py | 117 +--- .../migrations/0044_v310_job_event_stdout.py | 96 +++ awx/main/models/ad_hoc_commands.py | 67 +- awx/main/models/jobs.py | 313 ++++++---- awx/main/models/unified_jobs.py | 7 +- awx/main/tasks.py | 50 +- awx/main/utils.py | 71 ++- awx/plugins/callback/job_event_callback.py | 579 ------------------ awx/plugins/callback/minimal.py | 30 + awx/plugins/callback/tower_display.py | 30 + 20 files changed, 1387 insertions(+), 850 deletions(-) create mode 100644 awx/lib/sitecustomize.py create mode 100644 awx/lib/tower_display_callback/__init__.py create mode 100644 awx/lib/tower_display_callback/cleanup.py create mode 100644 awx/lib/tower_display_callback/display.py create mode 100644 awx/lib/tower_display_callback/events.py create mode 100644 awx/lib/tower_display_callback/minimal.py create mode 100644 awx/lib/tower_display_callback/module.py create mode 100644 awx/main/migrations/0044_v310_job_event_stdout.py delete mode 100644 awx/plugins/callback/job_event_callback.py create mode 100644 awx/plugins/callback/minimal.py create mode 100644 awx/plugins/callback/tower_display.py diff --git a/awx/api/permissions.py b/awx/api/permissions.py index ecd725bc6e..cda66ff2ec 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -189,8 +189,6 @@ class TaskPermission(ModelAccessPermission): # token. if view.model == Inventory and request.method.lower() in ('head', 'get'): return bool(not obj or obj.pk == unified_job.inventory_id) - elif view.model in (JobEvent, AdHocCommandEvent) and request.method.lower() == 'post': - return bool(not obj or obj.pk == unified_job.pk) else: return False diff --git a/awx/api/serializers.py b/awx/api/serializers.py index eaa764c345..3677f1a028 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2417,7 +2417,9 @@ class JobEventSerializer(BaseSerializer): model = JobEvent fields = ('*', '-name', '-description', 'job', 'event', 'counter', 'event_display', 'event_data', 'event_level', 'failed', - 'changed', 'host', 'host_name', 'parent', 'play', 'task', 'role') + 'changed', 'uuid', 'host', 'host_name', 'parent', 'playbook', + 'play', 'task', 'role', 'stdout', 'start_line', 'end_line', + 'verbosity') def get_related(self, obj): res = super(JobEventSerializer, self).get_related(obj) @@ -2453,16 +2455,8 @@ class AdHocCommandEventSerializer(BaseSerializer): model = AdHocCommandEvent fields = ('*', '-name', '-description', 'ad_hoc_command', 'event', 'counter', 'event_display', 'event_data', 'failed', - 'changed', 'host', 'host_name') - - def to_internal_value(self, data): - ret = super(AdHocCommandEventSerializer, self).to_internal_value(data) - # AdHocCommandAdHocCommandEventsList should be the only view creating - # AdHocCommandEvent instances, so keep the ad_hoc_command it sets, even - # though ad_hoc_command is a read-only field. - if 'ad_hoc_command' in data: - ret['ad_hoc_command'] = data['ad_hoc_command'] - return ret + 'changed', 'uuid', 'host', 'host_name', 'stdout', + 'start_line', 'end_line', 'verbosity') def get_related(self, obj): res = super(AdHocCommandEventSerializer, self).get_related(obj) diff --git a/awx/api/views.py b/awx/api/views.py index 66980e141d..c95cebcb9e 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -10,8 +10,9 @@ import time import socket import sys import logging -from base64 import b64encode +from base64 import b64encode, b64decode from collections import OrderedDict +from HTMLParser import HTMLParser # Django from django.conf import settings @@ -3050,21 +3051,6 @@ class GroupJobEventsList(BaseJobEventsList): class JobJobEventsList(BaseJobEventsList): parent_model = Job - authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES - permission_classes = (TaskPermission,) - - # Post allowed for job event callback only. - def post(self, request, *args, **kwargs): - parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk']) - data = request.data.copy() - data['job'] = parent_obj.pk - serializer = self.get_serializer(data=data) - if serializer.is_valid(): - self.instance = serializer.save() - headers = {'Location': serializer.data['url']} - return Response(serializer.data, status=status.HTTP_201_CREATED, - headers=headers) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) class JobJobPlaysList(BaseJobEventsList): @@ -3455,25 +3441,8 @@ class HostAdHocCommandEventsList(BaseAdHocCommandEventsList): class AdHocCommandAdHocCommandEventsList(BaseAdHocCommandEventsList): parent_model = AdHocCommand - authentication_classes = [TaskAuthentication] + api_settings.DEFAULT_AUTHENTICATION_CLASSES - permission_classes = (TaskPermission,) new_in_220 = True - # Post allowed for ad hoc event callback only. - def post(self, request, *args, **kwargs): - if request.user: - raise PermissionDenied() - parent_obj = get_object_or_404(self.parent_model, pk=self.kwargs['pk']) - data = request.data.copy() - data['ad_hoc_command'] = parent_obj - serializer = self.get_serializer(data=data) - if serializer.is_valid(): - self.instance = serializer.save() - headers = {'Location': serializer.data['url']} - return Response(serializer.data, status=status.HTTP_201_CREATED, - headers=headers) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - class AdHocCommandActivityStreamList(SubListAPIView): @@ -3583,7 +3552,11 @@ class UnifiedJobStdout(RetrieveAPIView): dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val)) content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line) + # Remove any ANSI escape sequences containing job event data. + content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content) + body = ansiconv.to_html(cgi.escape(content)) + context = { 'title': get_view_name(self.__class__), 'body': mark_safe(body), diff --git a/awx/lib/sitecustomize.py b/awx/lib/sitecustomize.py new file mode 100644 index 0000000000..cf6cab211e --- /dev/null +++ b/awx/lib/sitecustomize.py @@ -0,0 +1,22 @@ +# Python +import os +import sys + +# Based on http://stackoverflow.com/a/6879344/131141 -- Initialize tower display +# callback as early as possible to wrap ansible.display.Display methods. + +def argv_ready(argv): + if argv and os.path.basename(argv[0]) in {'ansible', 'ansible-playbook'}: + import tower_display_callback + + +class argv_placeholder(object): + + def __del__(self): + argv_ready(sys.argv) + + +if hasattr(sys, 'argv'): + argv_ready(sys.argv) +else: + sys.argv = argv_placeholder() diff --git a/awx/lib/tower_display_callback/__init__.py b/awx/lib/tower_display_callback/__init__.py new file mode 100644 index 0000000000..313a1d50c6 --- /dev/null +++ b/awx/lib/tower_display_callback/__init__.py @@ -0,0 +1,25 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Tower Display Callback +from . import cleanup # to register control persistent cleanup. +from . import display # to wrap ansible.display.Display methods. +from .module import TowerDefaultCallbackModule, TowerMinimalCallbackModule + +__all__ = ['TowerDefaultCallbackModule', 'TowerMinimalCallbackModule'] diff --git a/awx/lib/tower_display_callback/cleanup.py b/awx/lib/tower_display_callback/cleanup.py new file mode 100644 index 0000000000..1bc276f742 --- /dev/null +++ b/awx/lib/tower_display_callback/cleanup.py @@ -0,0 +1,72 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import atexit +import glob +import os +import pwd + +# PSUtil +import psutil + +__all__ = [] + + +@atexit.register +def terminate_ssh_control_masters(): + # Determine if control persist is being used and if any open sockets + # exist after running the playbook. + cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '') + if not cp_path: + return + cp_dir = os.path.dirname(cp_path) + if not os.path.exists(cp_dir): + return + cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*') + cp_files = glob.glob(cp_pattern) + if not cp_files: + return + + # Attempt to find any running control master processes. + username = pwd.getpwuid(os.getuid())[0] + ssh_cm_procs = [] + for proc in psutil.process_iter(): + try: + pname = proc.name() + pcmdline = proc.cmdline() + pusername = proc.username() + except psutil.NoSuchProcess: + continue + if pusername != username: + continue + if pname != 'ssh': + continue + for cp_file in cp_files: + if pcmdline and cp_file in pcmdline[0]: + ssh_cm_procs.append(proc) + break + + # Terminate then kill control master processes. Workaround older + # version of psutil that may not have wait_procs implemented. + for proc in ssh_cm_procs: + proc.terminate() + procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5) + for proc in procs_alive: + proc.kill() \ No newline at end of file diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py new file mode 100644 index 0000000000..ec32fec334 --- /dev/null +++ b/awx/lib/tower_display_callback/display.py @@ -0,0 +1,92 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import cgi +import contextlib +import functools +import json +import sys +import uuid + +# Ansible +from ansible.utils.display import Display + +# Tower Display Callback +from tower_display_callback.events import event_context + +__all__ = [] + + +def with_context(**context): + global event_context + def wrap(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + with event_context.set_local(**context): + return f(*args, **kwargs) + return wrapper + return wrap + + +for attr in dir(Display): + if attr.startswith('_') or 'cow' in attr or 'prompt' in attr: + continue + if attr in ('display', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv', 'verbose'): + continue + if not callable(getattr(Display, attr)): + continue + setattr(Display, attr, with_context(**{attr: True})(getattr(Display, attr))) + + +def with_verbosity(f): + global event_context + @functools.wraps(f) + def wrapper(*args, **kwargs): + host = args[2] if len(args) >= 3 else kwargs.get('host', None) + caplevel = args[3] if len(args) >= 4 else kwargs.get('caplevel', 2) + context = dict(verbose=True, verbosity=(caplevel + 1)) + if host is not None: + context['remote_addr'] = host + with event_context.set_local(**context): + return f(*args, **kwargs) + return wrapper + +Display.verbose = with_verbosity(Display.verbose) + + +def display_with_context(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False) + stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False) + fileobj = sys.stderr if stderr else sys.stdout + event_uuid = event_context.get().get('uuid', None) + try: + if not log_only and not event_uuid: + event_context.add_local(uuid=str(uuid.uuid4())) + event_context.dump_begin(fileobj) + return f(*args, **kwargs) + finally: + if not log_only and not event_uuid: + event_context.dump_end(fileobj) + event_context.remove_local(uuid=None) + return wrapper + +Display.display = display_with_context(Display.display) diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py new file mode 100644 index 0000000000..fa664e5856 --- /dev/null +++ b/awx/lib/tower_display_callback/events.py @@ -0,0 +1,138 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import base64 +import cgi +import contextlib +import datetime +import json +import os +import threading +import uuid + +__all__ = ['event_context'] + + +class EventContext(object): + ''' + Store global and local (per thread/process) data associated with callback + events and other display output methods. + ''' + + def add_local(self, **kwargs): + if not hasattr(self, '_local'): + self._local = threading.local() + self._local._ctx = {} + self._local._ctx.update(kwargs) + + def remove_local(self, **kwargs): + if hasattr(self, '_local'): + for key in kwargs.keys(): + self._local._ctx.pop(key, None) + + @contextlib.contextmanager + def set_local(self, **kwargs): + try: + self.add_local(**kwargs) + yield + finally: + self.remove_local(**kwargs) + + def get_local(self): + return getattr(getattr(self, '_local', None), '_ctx', {}) + + def add_global(self, **kwargs): + if not hasattr(self, '_global_ctx'): + self._global_ctx = {} + self._global_ctx.update(kwargs) + + def remove_global(self, **kwargs): + if hasattr(self, '_global_ctx'): + for key in kwargs.keys(): + self._global_ctx.pop(key, None) + + @contextlib.contextmanager + def set_global(self, **kwargs): + try: + self.add_global(**kwargs) + yield + finally: + self.remove_global(**kwargs) + + def get_global(self): + return getattr(self, '_global_ctx', {}) + + def get(self): + ctx = {} + ctx.update(self.get_global()) + ctx.update(self.get_local()) + return ctx + + def get_begin_dict(self): + event_data = self.get() + if os.getenv('JOB_ID', ''): + event_data['job_id'] = int(os.getenv('JOB_ID', '0')) + if os.getenv('AD_HOC_COMMAND_ID', ''): + event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0')) + event_data.setdefault('pid', os.getpid()) + event_data.setdefault('uuid', str(uuid.uuid4())) + event_data.setdefault('created', datetime.datetime.utcnow().isoformat()) + if not event_data.get('parent_uuid', None) and event_data.get('job_id', None): + for key in ('task_uuid', 'play_uuid', 'playbook_uuid'): + parent_uuid = event_data.get(key, None) + if parent_uuid and parent_uuid != event_data.get('uuid', None): + event_data['parent_uuid'] = parent_uuid + break + + event = event_data.pop('event', None) + if not event: + event = 'verbose' + for key in ('debug', 'verbose', 'deprecated', 'warning', 'system_warning', 'error'): + if event_data.get(key, False): + event = key + break + + event_dict = dict(event=event, event_data=event_data) + for key in event_data.keys(): + if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'): + event_dict[key] = event_data.pop(key) + elif key in ('verbosity', 'pid'): + event_dict[key] = event_data[key] + return event_dict + + def get_end_dict(self): + return {} + + def dump(self, fileobj, data, max_width=78): + b64data = base64.b64encode(json.dumps(data)) + fileobj.write(u'\x1b[K') + for offset in xrange(0, len(b64data), max_width): + chunk = b64data[offset:offset + max_width] + escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk)) + fileobj.write(escaped_chunk) + fileobj.write(u'\x1b[K') + + def dump_begin(self, fileobj): + self.dump(fileobj, self.get_begin_dict()) + + def dump_end(self, fileobj): + self.dump(fileobj, self.get_end_dict()) + +event_context = EventContext() diff --git a/awx/lib/tower_display_callback/minimal.py b/awx/lib/tower_display_callback/minimal.py new file mode 100644 index 0000000000..de7694213e --- /dev/null +++ b/awx/lib/tower_display_callback/minimal.py @@ -0,0 +1,28 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import os + +# Ansible +import ansible + +# Because of the way Ansible loads plugins, it's not possible to import +# ansible.plugins.callback.minimal when being loaded as the minimal plugin. Ugh. +execfile(os.path.join(os.path.dirname(ansible.__file__), 'plugins', 'callback', 'minimal.py')) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py new file mode 100644 index 0000000000..7be4835aa7 --- /dev/null +++ b/awx/lib/tower_display_callback/module.py @@ -0,0 +1,443 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import contextlib +import copy +import os +import re +import sys +import uuid + +# Ansible +from ansible.plugins.callback import CallbackBase +from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule + +# Tower Display Callback +from tower_display_callback.events import event_context +from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule + + +class BaseCallbackModule(CallbackBase): + ''' + Callback module for logging ansible/ansible-playbook events. + ''' + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'stdout' + + # These events should never have an associated play. + EVENTS_WITHOUT_PLAY = [ + 'playbook_on_start', + 'playbook_on_stats', + ] + + # These events should never have an associated task. + EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [ + 'playbook_on_setup', + 'playbook_on_notify', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + ] + + CENSOR_FIELD_WHITELIST = [ + 'msg', + 'failed', + 'changed', + 'results', + 'start', + 'end', + 'delta', + 'cmd', + '_ansible_no_log', + 'rc', + 'failed_when_result', + 'skipped', + 'skip_reason', + ] + + def __init__(self): + super(BaseCallbackModule, self).__init__() + self.task_uuids = set() + + def censor_result(self, res, no_log=False): + if not isinstance(res, dict): + if no_log: + return "the output has been hidden due to the fact that 'no_log: true' was specified for this result" + return res + if res.get('_ansible_no_log', no_log): + new_res = {} + for k in self.CENSOR_FIELD_WHITELIST: + if k in res: + new_res[k] = res[k] + if k == 'cmd' and k in res: + if isinstance(res['cmd'], list): + res['cmd'] = ' '.join(res['cmd']) + if re.search(r'\s', res['cmd']): + new_res['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$', + r'\1 ', + res['cmd']) + new_res['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" + res = new_res + if 'results' in res: + if isinstance(res['results'], list): + for i in xrange(len(res['results'])): + res['results'][i] = self.censor_result(res['results'][i], res.get('_ansible_no_log', no_log)) + elif res.get('_ansible_no_log', False): + res['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" + return res + + @contextlib.contextmanager + def capture_event_data(self, event, **event_data): + + event_data.setdefault('uuid', str(uuid.uuid4())) + + if 'res' in event_data: + event_data['res'] = self.censor_result(copy.deepcopy(event_data['res'])) + res = event_data.get('res', None) + if res and isinstance(res, dict): + if 'artifact_data' in res: + event_data['artifact_data'] = res['artifact_data'] + + if event not in self.EVENTS_WITHOUT_TASK: + task = event_data.pop('task', None) + else: + task = None + + try: + event_context.add_local(event=event, **event_data) + if task: + self.set_task(task, local=True) + event_context.dump_begin(sys.stdout) + yield + finally: + event_context.dump_end(sys.stdout) + if task: + self.clear_task(local=True) + event_context.remove_local(event=None, **event_data) + + def set_playbook(self, playbook): + # NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them. + self.playbook_uuid = str(uuid.uuid4()) + file_name = getattr(playbook, '_file_name', '???') + event_context.add_global(playbook=file_name, playbook_uuid=self.playbook_uuid) + self.clear_play() + + def set_play(self, play): + if hasattr(play, 'hosts'): + if isinstance(play.hosts, list): + pattern = ','.join(play.hosts) + else: + pattern = play.hosts + else: + pattern = '' + name = play.get_name().strip() or pattern + event_context.add_global(play=name, play_uuid=str(play._uuid), play_pattern=pattern) + self.clear_task() + + def clear_play(self): + event_context.remove_global(play=None, play_uuid=None, play_pattern=None) + self.clear_task() + + def set_task(self, task, local=False): + # FIXME: Task is "global" unless using free strategy! + task_ctx = dict( + task=(task.name or task.action), + task_path=task.get_path(), + task_uuid=str(task._uuid), + task_action=task.action, + ) + if not task.no_log: + task_args = ', '.join(('%s=%s' % a for a in task.args.items())) + task_ctx['task_args'] = task_args + if getattr(task, '_role', None): + task_role = task._role._role_name + else: + task_role = getattr(task, 'role_name', '') + if task_role: + task_ctx['role'] = task_role + if local: + event_context.add_local(**task_ctx) + else: + event_context.add_global(**task_ctx) + + def clear_task(self, local=False): + task_ctx = dict(task=None, task_path=None, task_uuid=None, task_action=None, task_args=None, role=None) + if local: + event_context.remove_local(**task_ctx) + else: + event_context.remove_global(**task_ctx) + + def v2_playbook_on_start(self, playbook): + self.set_playbook(playbook) + event_data = dict( + uuid=self.playbook_uuid, + ) + with self.capture_event_data('playbook_on_start', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_start(playbook) + + def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None, + encrypt=None, confirm=False, salt_size=None, + salt=None, default=None): + return # not currently used in v2 (yet) - FIXME: Confirm this is still the case? + event_data = dict( + varname=varname, + private=private, + prompt=prompt, + encrypt=encrypt, + confirm=confirm, + salt_size=salt_size, + salt=salt, + default=default, + ) + with self.capture_event_data('playbook_on_vars_prompt', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(varname, + private, prompt, encrypt, confirm, salt_size, salt, default) + + def v2_playbook_on_include(self, included_file): + event_data = dict( + included_file=included_file, + ) + with self.capture_event_data('playbook_on_include', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_include(included_file) + + def v2_playbook_on_play_start(self, play): + self.set_play(play) + if hasattr(play, 'hosts'): + if isinstance(play.hosts, list): + pattern = ','.join(play.hosts) + else: + pattern = play.hosts + else: + pattern = '' + name = play.get_name().strip() or pattern + event_data = dict( + name=name, + pattern=pattern, + uuid=str(play._uuid), + ) + with self.capture_event_data('playbook_on_play_start', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_play_start(play) + + def v2_playbook_on_import_for_host(self, result, imported_file): + return # not currently used in v2 (yet) / don't care about recording this one + with self.capture_event_data('playbook_on_import_for_host'): + super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file) + + def v2_playbook_on_not_import_for_host(self, result, missing_file): + return # not currently used in v2 (yet) / don't care about recording this one + with self.capture_event_data('playbook_on_not_import_for_host'): + super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file) + + def v2_playbook_on_setup(self): + return # not currently used in v2 (yet) + with self.capture_event_data('playbook_on_setup'): + super(BaseCallbackModule, self).v2_playbook_on_setup() + + def v2_playbook_on_task_start(self, task, is_conditional): + # FIXME: Flag task path output as vv. + task_uuid = str(task._uuid) + if task_uuid in self.task_uuids: + return + self.task_uuids.add(task_uuid) + self.set_task(task) + event_data = dict( + task=task, + name=task.get_name(), + is_conditional=is_conditional, + uuid=task_uuid, + ) + with self.capture_event_data('playbook_on_task_start', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional) + + def v2_playbook_on_cleanup_task_start(self, task): + # re-using playbook_on_task_start event here for this v2-specific + # event, though we may consider any changes necessary to distinguish + # this from a normal task FIXME! + self.set_task(task) + event_data = dict( + task=task, + name=task.get_name(), + uuid=str(task._uuid), + ) + with self.capture_event_data('playbook_on_task_start', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task) + + def v2_playbook_on_handler_task_start(self, task): + # re-using playbook_on_task_start event here for this v2-specific + # event, though we may consider any changes necessary to distinguish + # this from a normal task FIXME! + self.set_task(task) + event_data = dict( + task=task, + name=task.get_name(), + uuid=str(task._uuid), + ) + with self.capture_event_data('playbook_on_task_start', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task) + + def v2_playbook_on_no_hosts_matched(self): + with self.capture_event_data('playbook_on_no_hosts_matched'): + super(BaseCallbackModule, self).v2_playbook_on_no_hosts_matched() + + def v2_playbook_on_no_hosts_remaining(self): + with self.capture_event_data('playbook_on_no_hosts_remaining'): + super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining() + + def v2_playbook_on_notify(self, result, handler): + event_data = dict( + host=result._host.name, + task=result._task, + handler=handler, + ) + with self.capture_event_data('playbook_on_notify', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_notify(result, handler) + + def v2_playbook_on_stats(self, stats): + self.clear_play() + # FIXME: Add count of plays/tasks. + event_data = dict( + changed=stats.changed, + dark=stats.dark, + failures=stats.failures, + ok=stats.ok, + processed=stats.processed, + skipped=stats.skipped, + ) + with self.capture_event_data('playbook_on_stats', **event_data): + super(BaseCallbackModule, self).v2_playbook_on_stats(stats) + + def v2_runner_on_ok(self, result): + # FIXME: Display detailed results or not based on verbosity. + event_data = dict( + host=result._host.name, + remote_addr=result._host.address, + task=result._task, + res=result._result, + event_loop=result._task.loop if hasattr(result._task, 'loop') else None, + ) + with self.capture_event_data('runner_on_ok', **event_data): + super(BaseCallbackModule, self).v2_runner_on_ok(result) + + def v2_runner_on_failed(self, result, ignore_errors=False): + # FIXME: Add verbosity for exception/results output. + event_data = dict( + host=result._host.name, + res=result._result, + task=result._task, + ignore_errors=ignore_errors, + event_loop=result._task.loop if hasattr(result._task, 'loop') else None, + ) + with self.capture_event_data('runner_on_failed', **event_data): + super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors) + + def v2_runner_on_error(self, result): + pass # Not implemented in v2. + + def v2_runner_on_skipped(self, result): + event_data = dict( + host=result._host.name, + task=result._task, + event_loop=result._task.loop if hasattr(result._task, 'loop') else None, + ) + with self.capture_event_data('runner_on_skipped', **event_data): + super(BaseCallbackModule, self).v2_runner_on_skipped(result) + + def v2_runner_on_unreachable(self, result): + event_data = dict( + host=result._host.name, + task=result._task, + res=result._result, + ) + with self.capture_event_data('runner_on_unreachable', **event_data): + super(BaseCallbackModule, self).v2_runner_on_unreachable(result) + + def v2_runner_on_no_hosts(self, task): + event_data = dict( + task=task, + ) + with self.capture_event_data('runner_on_no_hosts', **event_data): + super(BaseCallbackModule, self).v2_runner_on_no_hosts(task) + + def v2_runner_on_file_diff(self, result, diff): + # FIXME: Ignore file diff for ad hoc commands? + event_data = dict( + host=result._host.name, + task=result._task, + diff=diff, + ) + with self.capture_event_data('runner_on_file_diff', **event_data): + super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff) + + def v2_runner_item_on_ok(self, result): + event_data = dict( + host=result._host.name, + task=result._task, + res=result._result, + ) + with self.capture_event_data('runner_item_on_ok', **event_data): + super(BaseCallbackModule, self).v2_runner_item_on_ok(result) + + def v2_runner_item_on_failed(self, result): + event_data = dict( + host=result._host.name, + task=result._task, + res=result._result, + ) + with self.capture_event_data('runner_item_on_failed', **event_data): + super(BaseCallbackModule, self).v2_runner_item_on_failed(result) + + def v2_runner_item_on_skipped(self, result): + event_data = dict( + host=result._host.name, + task=result._task, + res=result._result, + ) + with self.capture_event_data('runner_item_on_skipped', **event_data): + super(BaseCallbackModule, self).v2_runner_item_on_skipped(result) + + # V2 does not use the _on_async callbacks (yet). + + def runner_on_async_poll(self, host, res, jid, clock): + self._log_event('runner_on_async_poll', host=host, res=res, jid=jid, + clock=clock) + + def runner_on_async_ok(self, host, res, jid): + self._log_event('runner_on_async_ok', host=host, res=res, jid=jid) + + def runner_on_async_failed(self, host, res, jid): + self._log_event('runner_on_async_failed', host=host, res=res, jid=jid) + + +class TowerDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule): + + CALLBACK_NAME = 'tower_display' + + +class TowerMinimalCallbackModule(BaseCallbackModule, MinimalCallbackModule): + + CALLBACK_NAME = 'minimal' + + def v2_playbook_on_play_start(self, play): + pass + + def v2_playbook_on_task_start(self, task, is_conditional): + self.set_task(task) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 4f777cd40e..e99a34aa4f 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import datetime import logging import json @@ -12,10 +11,7 @@ from kombu.mixins import ConsumerMixin # Django from django.conf import settings from django.core.management.base import NoArgsCommand -from django.core.cache import cache from django.db import DatabaseError -from django.utils.dateparse import parse_datetime -from django.utils.timezone import FixedOffset # AWX from awx.main.models import * # noqa @@ -36,112 +32,26 @@ class CallbackBrokerWorker(ConsumerMixin): def process_task(self, body, message): try: - if "event" not in body: - raise Exception("Payload does not have an event") - if "job_id" not in body: - raise Exception("Payload does not have a job_id") + if 'event' not in body: + raise Exception('Payload does not have an event') + if 'job_id' not in body and 'ad_hoc_command_id' not in body: + raise Exception('Payload does not have a job_id or ad_hoc_command_id') if settings.DEBUG: - logger.info("Body: {}".format(body)) - logger.info("Message: {}".format(message)) - self.process_job_event(body) + logger.info('Body: {}'.format(body)) + logger.info('Message: {}'.format(message)) + try: + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) + except DatabaseError as e: + logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: import traceback traceback.print_exc() logger.error('Callback Task Processor Raised Exception: %r', exc) message.ack() - def process_job_event(self, payload): - # Get the correct "verbose" value from the job. - # If for any reason there's a problem, just use 0. - if 'ad_hoc_command_id' in payload: - event_type_key = 'ad_hoc_command_id' - event_object_type = AdHocCommand - else: - event_type_key = 'job_id' - event_object_type = Job - - try: - verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity - except Exception as e: - verbose=0 - # TODO: cache - - # Convert the datetime for the job event's creation appropriately, - # and include a time zone for it. - # - # In the event of any issue, throw it out, and Django will just save - # the current time. - try: - if not isinstance(payload['created'], datetime.datetime): - payload['created'] = parse_datetime(payload['created']) - if not payload['created'].tzinfo: - payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - payload.pop('created', None) - - event_uuid = payload.get("uuid", '') - parent_event_uuid = payload.get("parent_uuid", '') - artifact_data = payload.get("artifact_data", None) - - # Sanity check: Don't honor keys that we don't recognize. - for key in payload.keys(): - if key not in (event_type_key, 'event', 'event_data', - 'created', 'counter', 'uuid'): - payload.pop(key) - - try: - # If we're not in verbose mode, wipe out any module - # arguments. - res = payload['event_data'].get('res', {}) - if isinstance(res, dict): - i = res.get('invocation', {}) - if verbose == 0 and 'module_args' in i: - i['module_args'] = '' - - if 'ad_hoc_command_id' in payload: - AdHocCommandEvent.objects.create(**data) - return - - j = JobEvent(**payload) - if payload['event'] == 'playbook_on_start': - j.save() - cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) - return - else: - if parent_event_uuid: - parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None) - if parent_id is None: - parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id']) - if parent_id_obj.exists(): # Problematic if not there, means the parent hasn't been written yet... TODO - j.parent_id = parent_id_obj[0].id - print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id)) - cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300) - else: - print("Cache hit") - j.parent_id = parent_id - j.save(post_process=True) - if event_uuid: - cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) - except DatabaseError as e: - logger.error("Database Error Saving Job Event: {}".format(e)) - - if artifact_data: - try: - self.process_artifacts(artifact_data, res, payload) - except DatabaseError as e: - logger.error("Database Error Saving Job Artifacts: {}".format(e)) - - def process_artifacts(self, artifact_data, res, payload): - artifact_dict = json.loads(artifact_data) - if res and isinstance(res, dict): - if res.get('_ansible_no_log', False): - artifact_dict['_ansible_no_log'] = True - if artifact_data is not None: - parent_job = Job.objects.filter(pk=payload['job_id']).first() - if parent_job is not None and parent_job.artifacts != artifact_dict: - parent_job.artifacts = artifact_dict - parent_job.save(update_fields=['artifacts']) - class Command(NoArgsCommand): ''' @@ -158,4 +68,3 @@ class Command(NoArgsCommand): worker.run() except KeyboardInterrupt: print('Terminating Callback Receiver') - diff --git a/awx/main/migrations/0044_v310_job_event_stdout.py b/awx/main/migrations/0044_v310_job_event_stdout.py new file mode 100644 index 0000000000..8a66aa9f94 --- /dev/null +++ b/awx/main/migrations/0044_v310_job_event_stdout.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0043_v310_scm_revision'), + ] + + operations = [ + migrations.AddField( + model_name='adhoccommandevent', + name='end_line', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AddField( + model_name='adhoccommandevent', + name='start_line', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AddField( + model_name='adhoccommandevent', + name='stdout', + field=models.TextField(default=b'', editable=False), + ), + migrations.AddField( + model_name='adhoccommandevent', + name='uuid', + field=models.CharField(default=b'', max_length=1024, editable=False), + ), + migrations.AddField( + model_name='adhoccommandevent', + name='verbosity', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AddField( + model_name='jobevent', + name='end_line', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AddField( + model_name='jobevent', + name='playbook', + field=models.CharField(default=b'', max_length=1024, editable=False), + ), + migrations.AddField( + model_name='jobevent', + name='start_line', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AddField( + model_name='jobevent', + name='stdout', + field=models.TextField(default=b'', editable=False), + ), + migrations.AddField( + model_name='jobevent', + name='verbosity', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AlterField( + model_name='adhoccommandevent', + name='counter', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AlterField( + model_name='adhoccommandevent', + name='event', + field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_skipped', 'Host Skipped'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]), + ), + migrations.AlterField( + model_name='jobevent', + name='counter', + field=models.PositiveIntegerField(default=0, editable=False), + ), + migrations.AlterField( + model_name='jobevent', + name='event', + field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]), + ), + migrations.AlterUniqueTogether( + name='adhoccommandevent', + unique_together=set([]), + ), + migrations.AlterIndexTogether( + name='adhoccommandevent', + index_together=set([('ad_hoc_command', 'event'), ('ad_hoc_command', 'uuid'), ('ad_hoc_command', 'end_line'), ('ad_hoc_command', 'start_line')]), + ), + migrations.AlterIndexTogether( + name='jobevent', + index_together=set([('job', 'event'), ('job', 'parent'), ('job', 'start_line'), ('job', 'uuid'), ('job', 'end_line')]), + ), + ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index b03be56452..fc56802160 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -2,6 +2,7 @@ # All Rights Reserved. # Python +import datetime import hmac import json import logging @@ -9,8 +10,11 @@ from urlparse import urljoin # Django from django.conf import settings +from django.core.cache import cache from django.db import models +from django.utils.dateparse import parse_datetime from django.utils.text import Truncator +from django.utils.timezone import utc from django.utils.translation import ugettext_lazy as _ from django.core.exceptions import ValidationError from django.core.urlresolvers import reverse @@ -267,14 +271,28 @@ class AdHocCommandEvent(CreatedModifiedModel): #('runner_on_async_failed', _('Host Async Failure'), True), # Tower does not yet support --diff mode #('runner_on_file_diff', _('File Difference'), False), + + # Additional event types for captured stdout not directly related to + # runner events. + ('debug', _('Debug'), False), + ('verbose', _('Verbose'), False), + ('deprecated', _('Deprecated'), False), + ('warning', _('Warning'), False), + ('system_warning', _('System Warning'), False), + ('error', _('Error'), False), ] FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]] EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES] class Meta: app_label = 'main' - unique_together = [('ad_hoc_command', 'host_name')] ordering = ('-pk',) + index_together = [ + ('ad_hoc_command', 'event'), + ('ad_hoc_command', 'uuid'), + ('ad_hoc_command', 'start_line'), + ('ad_hoc_command', 'end_line'), + ] ad_hoc_command = models.ForeignKey( 'AdHocCommand', @@ -311,8 +329,30 @@ class AdHocCommandEvent(CreatedModifiedModel): default=False, editable=False, ) + uuid = models.CharField( + max_length=1024, + default='', + editable=False, + ) counter = models.PositiveIntegerField( default=0, + editable=False, + ) + stdout = models.TextField( + default='', + editable=False, + ) + verbosity = models.PositiveIntegerField( + default=0, + editable=False, + ) + start_line = models.PositiveIntegerField( + default=0, + editable=False, + ) + end_line = models.PositiveIntegerField( + default=0, + editable=False, ) def get_absolute_url(self): @@ -350,3 +390,28 @@ class AdHocCommandEvent(CreatedModifiedModel): except (IndexError, AttributeError): pass super(AdHocCommandEvent, self).save(*args, **kwargs) + + @classmethod + def create_from_data(self, **kwargs): + # Convert the datetime for the ad hoc command event's creation + # appropriately, and include a time zone for it. + # + # In the event of any issue, throw it out, and Django will just save + # the current time. + try: + if not isinstance(kwargs['created'], datetime.datetime): + kwargs['created'] = parse_datetime(kwargs['created']) + if not kwargs['created'].tzinfo: + kwargs['created'] = kwargs['created'].replace(tzinfo=utc) + except (KeyError, ValueError): + kwargs.pop('created', None) + + # Sanity check: Don't honor keys that we don't recognize. + valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created', + 'counter', 'uuid', 'stdout', 'start_line', 'end_line', + 'verbosity'} + for key in kwargs.keys(): + if key not in valid_keys: + kwargs.pop(key) + + return AdHocCommandEvent.objects.create(**kwargs) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 1f11e5dee9..74a8395a2a 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -2,6 +2,7 @@ # All Rights Reserved. # Python +import datetime import hmac import json import yaml @@ -11,8 +12,12 @@ from urlparse import urljoin # Django from django.conf import settings +from django.core.cache import cache from django.db import models from django.db.models import Q, Count +from django.utils.dateparse import parse_datetime +from django.utils.encoding import force_text +from django.utils.timezone import utc from django.utils.translation import ugettext_lazy as _ from django.core.exceptions import ValidationError from django.core.urlresolvers import reverse @@ -940,7 +945,7 @@ class JobEvent(CreatedModifiedModel): # - playbook_on_task_start (once for each task within a play) # - runner_on_failed # - runner_on_ok - # - runner_on_error + # - runner_on_error (not used for v2) # - runner_on_skipped # - runner_on_unreachable # - runner_on_no_hosts @@ -962,14 +967,14 @@ class JobEvent(CreatedModifiedModel): (3, 'runner_on_async_poll', _('Host Polling'), False), (3, 'runner_on_async_ok', _('Host Async OK'), False), (3, 'runner_on_async_failed', _('Host Async Failure'), True), - # AWX does not yet support --diff mode + # Tower does not yet support --diff mode (3, 'runner_on_file_diff', _('File Difference'), False), (0, 'playbook_on_start', _('Playbook Started'), False), (2, 'playbook_on_notify', _('Running Handlers'), False), (2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False), (2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False), (2, 'playbook_on_task_start', _('Task Started'), False), - # AWX does not yet support vars_prompt (and will probably hang :) + # Tower does not yet support vars_prompt (and will probably hang :) (1, 'playbook_on_vars_prompt', _('Variables Prompted'), False), (2, 'playbook_on_setup', _('Gathering Facts'), False), # callback will not record this @@ -978,6 +983,15 @@ class JobEvent(CreatedModifiedModel): (2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False), (1, 'playbook_on_play_start', _('Play Started'), False), (1, 'playbook_on_stats', _('Playbook Complete'), False), + + # Additional event types for captured stdout not directly related to + # playbook or runner events. + (0, 'debug', _('Debug'), False), + (0, 'verbose', _('Verbose'), False), + (0, 'deprecated', _('Deprecated'), False), + (0, 'warning', _('Warning'), False), + (0, 'system_warning', _('System Warning'), False), + (0, 'error', _('Error'), True), ] FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]] EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES] @@ -986,6 +1000,13 @@ class JobEvent(CreatedModifiedModel): class Meta: app_label = 'main' ordering = ('pk',) + index_together = [ + ('job', 'event'), + ('job', 'uuid'), + ('job', 'start_line'), + ('job', 'end_line'), + ('job', 'parent'), + ] job = models.ForeignKey( 'Job', @@ -1032,12 +1053,17 @@ class JobEvent(CreatedModifiedModel): related_name='job_events', editable=False, ) + playbook = models.CharField( + max_length=1024, + default='', + editable=False, + ) play = models.CharField( max_length=1024, default='', editable=False, ) - role = models.CharField( # FIXME: Determine from callback or task name. + role = models.CharField( max_length=1024, default='', editable=False, @@ -1057,8 +1083,24 @@ class JobEvent(CreatedModifiedModel): ) counter = models.PositiveIntegerField( default=0, + editable=False, + ) + stdout = models.TextField( + default='', + editable=False, + ) + verbosity = models.PositiveIntegerField( + default=0, + editable=False, + ) + start_line = models.PositiveIntegerField( + default=0, + editable=False, + ) + end_line = models.PositiveIntegerField( + default=0, + editable=False, ) - def get_absolute_url(self): return reverse('api:job_event_detail', args=(self.pk,)) @@ -1119,7 +1161,8 @@ class JobEvent(CreatedModifiedModel): pass return msg - def _find_parent(self): + def _find_parent_id(self): + # Find the (most likely) parent event for this event. parent_events = set() if self.event in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): @@ -1135,101 +1178,55 @@ class JobEvent(CreatedModifiedModel): parent_events.add('playbook_on_setup') parent_events.add('playbook_on_task_start') if parent_events: - try: - qs = JobEvent.objects.filter(job_id=self.job_id) - if self.pk: - qs = qs.filter(pk__lt=self.pk, event__in=parent_events) - else: - qs = qs.filter(event__in=parent_events) - return qs.order_by('-pk')[0] - except IndexError: - pass - return None + qs = JobEvent.objects.filter(job_id=self.job_id, event__in=parent_events).order_by('-pk') + if self.pk: + qs = qs.filter(pk__lt=self.pk) + return qs.only('id').values_list('id', flat=True).first() - def save(self, *args, **kwargs): - from awx.main.models.inventory import Host - # If update_fields has been specified, add our field names to it, - # if it hasn't been specified, then we're just doing a normal save. - update_fields = kwargs.get('update_fields', []) - # Skip normal checks on save if we're only updating failed/changed - # flags triggered from a child event. - from_parent_update = kwargs.pop('from_parent_update', False) - if not from_parent_update: - res = self.event_data.get('res', None) - # Workaround for Ansible 1.2, where the runner_on_async_ok event is - # created even when the async task failed. Change the event to be - # correct. - if self.event == 'runner_on_async_ok': - try: - if res.get('failed', False) or res.get('rc', 0) != 0: - self.event = 'runner_on_async_failed' - except (AttributeError, TypeError): - pass - if self.event in self.FAILED_EVENTS: - if not self.event_data.get('ignore_errors', False): - self.failed = True - if 'failed' not in update_fields: - update_fields.append('failed') - if isinstance(res, dict) and res.get('changed', False): + def _update_from_event_data(self): + # Update job event model fields from event data. + updated_fields = set() + job = self.job + verbosity = job.verbosity + event_data = self.event_data + res = event_data.get('res', None) + if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False): + self.failed = True + updated_fields.add('failed') + if isinstance(res, dict): + if res.get('changed', False): self.changed = True - if 'changed' not in update_fields: - update_fields.append('changed') - if self.event == 'playbook_on_stats': - try: - failures_dict = self.event_data.get('failures', {}) - dark_dict = self.event_data.get('dark', {}) - self.failed = bool(sum(failures_dict.values()) + - sum(dark_dict.values())) - if 'failed' not in update_fields: - update_fields.append('failed') - changed_dict = self.event_data.get('changed', {}) - self.changed = bool(sum(changed_dict.values())) - if 'changed' not in update_fields: - update_fields.append('changed') - except (AttributeError, TypeError): - pass - self.play = self.event_data.get('play', '').strip() - if 'play' not in update_fields: - update_fields.append('play') - self.task = self.event_data.get('task', '').strip() - if 'task' not in update_fields: - update_fields.append('task') - self.role = self.event_data.get('role', '').strip() - if 'role' not in update_fields: - update_fields.append('role') - self.host_name = self.event_data.get('host', '').strip() - if 'host_name' not in update_fields: - update_fields.append('host_name') - # Only update job event hierarchy and related models during post - # processing (after running job). - post_process = kwargs.pop('post_process', False) - if post_process: + updated_fields.add('changed') + # If we're not in verbose mode, wipe out any module arguments. + invocation = res.get('invocation', None) + if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation: + event_data['res']['invocation']['module_args'] = '' + self.event_data = event_data + update_fields.add('event_data') + if self.event == 'playbook_on_stats': try: - if not self.host_id and self.host_name: - host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name) - host_id = host_qs.only('id').values_list('id', flat=True) - if host_id.exists(): - self.host_id = host_id[0] - if 'host_id' not in update_fields: - update_fields.append('host_id') - except (IndexError, AttributeError): + failures_dict = event_data.get('failures', {}) + dark_dict = event_data.get('dark', {}) + self.failed = bool(sum(failures_dict.values()) + + sum(dark_dict.values())) + updated_fields.add('failed') + changed_dict = event_data.get('changed', {}) + self.changed = bool(sum(changed_dict.values())) + updated_fields.add('changed') + except (AttributeError, TypeError): pass - if self.parent is None: - self.parent = self._find_parent() - if 'parent' not in update_fields: - update_fields.append('parent') - super(JobEvent, self).save(*args, **kwargs) - if post_process and not from_parent_update: - self.update_parent_failed_and_changed() - # FIXME: The update_hosts() call (and its queries) are the current - # performance bottleneck.... - if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False): - self.update_hosts() - self.update_host_summary_from_stats() + for field in ('playbook', 'play', 'task', 'role', 'host'): + value = force_text(event_data.get(field, '')).strip() + if field == 'host': + field = 'host_name' + if value != getattr(self, field): + setattr(self, field, value) + updated_fields.add(field) + return updated_fields - def update_parent_failed_and_changed(self): - # Propagage failed and changed flags to parent events. - if self.parent: + def _update_parent_failed_and_changed(self): + # Propagate failed and changed flags to parent events. + if self.parent_id: parent = self.parent update_fields = [] if self.failed and not parent.failed: @@ -1240,9 +1237,10 @@ class JobEvent(CreatedModifiedModel): update_fields.append('changed') if update_fields: parent.save(update_fields=update_fields, from_parent_update=True) - parent.update_parent_failed_and_changed() + parent._update_parent_failed_and_changed() - def update_hosts(self, extra_host_pks=None): + def _update_hosts(self, extra_host_pks=None): + # Update job event hosts m2m from host_name, propagate to parent events. from awx.main.models.inventory import Host extra_host_pks = set(extra_host_pks or []) hostnames = set() @@ -1256,16 +1254,14 @@ class JobEvent(CreatedModifiedModel): pass qs = Host.objects.filter(inventory__jobs__id=self.job_id) qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks)) - qs = qs.exclude(job_events__pk=self.id) - for host in qs.only('id'): + qs = qs.exclude(job_events__pk=self.id).only('id') + for host in qs: self.hosts.add(host) - if self.parent: - self.parent.update_hosts(self.hosts.only('id').values_list('id', flat=True)) + if self.parent_id: + self.parent._update_hosts(qs.values_list('id', flat=True)) - def update_host_summary_from_stats(self): + def _update_host_summary_from_stats(self): from awx.main.models.inventory import Host - if self.event != 'playbook_on_stats': - return hostnames = set() try: for v in self.event_data.values(): @@ -1276,7 +1272,6 @@ class JobEvent(CreatedModifiedModel): qs = Host.objects.filter(inventory__jobs__id=self.job_id, name__in=hostnames) job = self.job - #for host in qs.only('id', 'name'): for host in hostnames: host_stats = {} for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'): @@ -1300,6 +1295,112 @@ class JobEvent(CreatedModifiedModel): job.inventory.update_computed_fields() emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job.id)) + def save(self, *args, **kwargs): + from awx.main.models.inventory import Host + # If update_fields has been specified, add our field names to it, + # if it hasn't been specified, then we're just doing a normal save. + update_fields = kwargs.get('update_fields', []) + # Update model fields and related objects unless we're only updating + # failed/changed flags triggered from a child event. + from_parent_update = kwargs.pop('from_parent_update', False) + if not from_parent_update: + # Update model fields from event data. + updated_fields = self._update_from_event_data() + for field in updated_fields: + if field not in update_fields: + update_fields.append(field) + # Update host related field from host_name. + if not self.host_id and self.host_name: + host_qs = Host.objects.filter(inventory__jobs__id=self.job_id, name=self.host_name) + host_id = host_qs.only('id').values_list('id', flat=True).first() + if host_id != self.host_id: + self.host_id = host_id + if 'host_id' not in update_fields: + update_fields.append('host_id') + # Update parent related field if not set. + if self.parent_id is None: + self.parent_id = self._find_parent_id() + if self.parent_id and 'parent_id' not in update_fields: + update_fields.append('parent_id') + super(JobEvent, self).save(*args, **kwargs) + # Update related objects after this event is saved. + if not from_parent_update: + if self.parent_id: + self._update_parent_failed_and_changed() + # FIXME: The update_hosts() call (and its queries) are the current + # performance bottleneck.... + if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False): + self._update_hosts() + if self.event == 'playbook_on_stats': + self._update_host_summary_from_stats() + + @classmethod + def create_from_data(self, **kwargs): + # Must have a job_id specified. + if not kwargs.get('job_id', None): + return + + # Convert the datetime for the job event's creation appropriately, + # and include a time zone for it. + # + # In the event of any issue, throw it out, and Django will just save + # the current time. + try: + if not isinstance(kwargs['created'], datetime.datetime): + kwargs['created'] = parse_datetime(kwargs['created']) + if not kwargs['created'].tzinfo: + kwargs['created'] = kwargs['created'].replace(tzinfo=utc) + except (KeyError, ValueError): + kwargs.pop('created', None) + + # Save UUID and parent UUID for determining parent-child relationship. + job_event_uuid = kwargs.get('uuid', None) + parent_event_uuid = kwargs.get('parent_uuid', None) + artifact_data = kwargs.get('artifact_data', None) + + # Sanity check: Don't honor keys that we don't recognize. + valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play', + 'role', 'task', 'created', 'counter', 'uuid', 'stdout', + 'start_line', 'end_line', 'verbosity'} + for key in kwargs.keys(): + if key not in valid_keys: + kwargs.pop(key) + + # Try to find a parent event based on UUID. + if parent_event_uuid: + cache_key = '{}_{}'.format(kwargs['job_id'], parent_event_uuid) + parent_id = cache.get(cache_key) + if parent_id is None: + parent_id = JobEvent.objects.filter(job_id=kwargs['job_id'], uuid=parent_event_uuid).only('id').values_list('id', flat=True).first() + if parent_id: + print("Settings cache: {} with value {}".format(cache_key, parent_id)) + cache.set(cache_key, parent_id, 300) + if parent_id: + kwargs['parent_id'] = parent_id + + job_event = JobEvent.objects.create(**kwargs) + + # Cache this job event ID vs. UUID for future parent lookups. + if job_event_uuid: + cache_key = '{}_{}'.format(kwargs['job_id'], job_event_uuid) + cache.set(cache_key, job_event.id, 300) + + # Save artifact data to parent job (if provided). + if artifact_data: + artifact_dict = json.loads(artifact_data) + event_data = kwargs.get('event_data', None) + if event_data and isinstance(event_data, dict): + res = event_data.get('res', None) + if res and isinstance(res, dict): + if res.get('_ansible_no_log', False): + artifact_dict['_ansible_no_log'] = True + parent_job = Job.objects.filter(pk=kwargs['job_id']).first() + if parent_job and parent_job.artifacts != artifact_dict: + parent_job.artifacts = artifact_dict + parent_job.save(update_fields=['artifacts']) + + return job_event + @classmethod def get_startevent_queryset(cls, parent_task, starting_events, ordering=None): ''' diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 677be8136d..674bedbffe 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -696,8 +696,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return StringIO(msg['missing' if self.finished else 'pending']) def _escape_ascii(self, content): - ansi_escape = re.compile(r'\x1b[^m]*m') - return ansi_escape.sub('', content) + # Remove ANSI escape sequences used to embed event data. + content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content) + # Remove ANSI color escape sequences. + content = re.sub(r'\x1b[^m]*m', '', content) + return content def _result_stdout_raw(self, redact_sensitive=False, escape_ascii=False): content = self.result_stdout_raw_handle().read() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index dcb2bc5258..503509890b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -49,7 +49,8 @@ from awx.main.models import * # noqa from awx.main.models import UnifiedJob from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, - check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) + check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, + OutputEventFilter) from awx.main.consumers import emit_channel_notification __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', @@ -397,6 +398,8 @@ class BaseTask(Task): if os.path.isdir(os.path.join(venv_libdir, python_ver)): env['PYTHONPATH'] = os.path.join(venv_libdir, python_ver, "site-packages") + ":" break + # Add awx/lib to PYTHONPATH. + env['PYTHONPATH'] = ':'.join(filter(None, [self.get_path_to('..', 'lib'), env.get('PYTHONPATH', '')])) return env def add_tower_venv(self, env): @@ -494,6 +497,17 @@ class BaseTask(Task): ''' return OrderedDict() + def get_stdout_handle(self, instance): + ''' + Return an open file object for capturing stdout. + ''' + if not os.path.exists(settings.JOBOUTPUT_ROOT): + os.makedirs(settings.JOBOUTPUT_ROOT) + stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1()))) + stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') + assert stdout_handle.name == stdout_filename + return stdout_handle + def run_pexpect(self, instance, args, cwd, env, passwords, stdout_handle, output_replacements=None, extra_update_fields=None): ''' @@ -643,10 +657,7 @@ class BaseTask(Task): cwd = self.build_cwd(instance, **kwargs) env = self.build_env(instance, **kwargs) safe_env = self.build_safe_env(instance, **kwargs) - if not os.path.exists(settings.JOBOUTPUT_ROOT): - os.makedirs(settings.JOBOUTPUT_ROOT) - stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (pk, str(uuid.uuid1()))) - stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') + stdout_handle = self.get_stdout_handle(instance) if self.should_use_proot(instance, **kwargs): if not check_proot_installed(): raise RuntimeError('proot is not installed') @@ -660,7 +671,7 @@ class BaseTask(Task): args = self.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) safe_args = self.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) instance = self.update_model(pk, job_args=json.dumps(safe_args), - job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) + job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name) status, rc = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle, extra_update_fields=extra_update_fields) except Exception: @@ -779,6 +790,7 @@ class RunJob(BaseTask): if job.project: env['PROJECT_REVISION'] = job.project.scm_revision env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path + env['ANSIBLE_STDOUT_CALLBACK'] = 'tower_display' env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE @@ -974,6 +986,16 @@ class RunJob(BaseTask): d[re.compile(r'^Vault password:\s*?$', re.M)] = 'vault_password' return d + def get_stdout_handle(self, instance): + ''' + Wrap stdout file object to capture events. + ''' + stdout_handle = super(RunJob, self).get_stdout_handle(instance) + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + JobEvent.create_from_data(**event_data) + return OutputEventFilter(stdout_handle, job_event_callback) + def get_ssh_key_path(self, instance, **kwargs): ''' If using an SSH key, return the path for use by ssh-agent. @@ -1019,11 +1041,6 @@ class RunJob(BaseTask): pass else: update_inventory_computed_fields.delay(inventory.id, True) - # Update job event fields after job has completed (only when using REST - # API callback). - if not getattr(settings, 'CALLBACK_CONSUMER_PORT', None) and not getattr(settings, 'CALLBACK_QUEUE', None): - for job_event in job.job_events.order_by('pk'): - job_event.save(post_process=True) class RunProjectUpdate(BaseTask): @@ -1597,6 +1614,7 @@ class RunAdHocCommand(BaseTask): env['INVENTORY_HOSTVARS'] = str(True) env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1' + env['ANSIBLE_STDOUT_CALLBACK'] = 'minimal' # Hardcoded by Ansible for ad-hoc commands (either minimal or oneline). env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = ad_hoc_command.task_auth_token or '' env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE @@ -1693,6 +1711,16 @@ class RunAdHocCommand(BaseTask): d[re.compile(r'^Password:\s*?$', re.M)] = 'ssh_password' return d + def get_stdout_handle(self, instance): + ''' + Wrap stdout file object to capture events. + ''' + stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance) + def ad_hoc_command_event_callback(event_data): + event_data.setdefault('ad_hoc_command_id', instance.id) + AdHocCommandEvent.create_from_data(**event_data) + return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback) + def get_ssh_key_path(self, instance, **kwargs): ''' If using an SSH key, return the path for use by ssh-agent. diff --git a/awx/main/utils.py b/awx/main/utils.py index c652f8e166..76f28f090a 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -4,6 +4,7 @@ # Python import base64 import hashlib +import json import logging import os import re @@ -36,7 +37,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided', - 'get_current_apps', 'set_current_apps'] + 'get_current_apps', 'set_current_apps', 'OutputEventFilter'] def get_object_or_400(klass, *args, **kwargs): @@ -640,3 +641,71 @@ def set_current_apps(apps): def get_current_apps(): global current_apps return current_apps + + +class OutputEventFilter(object): + ''' + File-like object that looks for encoded job events in stdout data. + ''' + + EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K') + + def __init__(self, fileobj=None, event_callback=None): + self._fileobj = fileobj + self._event_callback = event_callback + self._counter = 1 + self._start_line = 0 + self._buffer = '' + self._current_event_data = None + + def __getattr__(self, attr): + return getattr(self._fileobj, attr) + + def write(self, data): + if self._fileobj: + self._fileobj.write(data) + self._buffer += data + while True: + match = self.EVENT_DATA_RE.search(self._buffer) + if not match: + break + try: + base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1)) + event_data = json.loads(base64.b64decode(base64_data)) + except ValueError: + event_data = {} + self._emit_event(self._buffer[:match.start()], event_data) + self._buffer = self._buffer[match.end():] + + def close(self): + if self._fileobj: + self._fileobj.close() + if self._buffer: + self._emit_event(self._buffer) + self._buffer = '' + + def _emit_event(self, buffered_stdout, next_event_data=None): + if self._current_event_data: + event_data = self._current_event_data + stdout_chunks = [buffered_stdout] + elif buffered_stdout: + event_data = dict(event='verbose') + stdout_chunks = buffered_stdout.splitlines(True) + else: + stdout_chunks = [] + + for stdout_chunk in stdout_chunks: + event_data['counter'] = self._counter + self._counter += 1 + event_data['stdout'] = stdout_chunk + n_lines = stdout_chunk.count('\n') + event_data['start_line'] = self._start_line + event_data['end_line'] = self._start_line + n_lines + self._start_line += n_lines + if self._event_callback: + self._event_callback(event_data) + + if next_event_data.get('uuid', None): + self._current_event_data = next_event_data + else: + self._current_event_data = None diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py deleted file mode 100644 index 67f36612f6..0000000000 --- a/awx/plugins/callback/job_event_callback.py +++ /dev/null @@ -1,579 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# This file is a utility Ansible plugin that is not part of the AWX or Ansible -# packages. It does not import any code from either package, nor does its -# license apply to Ansible or AWX. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# Neither the name of the nor the names of its contributors -# may be used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -# Python -import datetime -import glob -import json -import logging -import os -import pwd -import urlparse -import re -from copy import deepcopy -from uuid import uuid4 - -# Kombu -from kombu import Connection, Exchange, Producer - -# Requests -import requests - -import psutil - -CENSOR_FIELD_WHITELIST = [ - 'msg', - 'failed', - 'changed', - 'results', - 'start', - 'end', - 'delta', - 'cmd', - '_ansible_no_log', - 'rc', - 'failed_when_result', - 'skipped', - 'skip_reason', -] - -def censor(obj, no_log=False): - if not isinstance(obj, dict): - if no_log: - return "the output has been hidden due to the fact that 'no_log: true' was specified for this result" - return obj - if obj.get('_ansible_no_log', no_log): - new_obj = {} - for k in CENSOR_FIELD_WHITELIST: - if k in obj: - new_obj[k] = obj[k] - if k == 'cmd' and k in obj: - if isinstance(obj['cmd'], list): - obj['cmd'] = ' '.join(obj['cmd']) - if re.search(r'\s', obj['cmd']): - new_obj['cmd'] = re.sub(r'^(([^\s\\]|\\\s)+).*$', - r'\1 ', - obj['cmd']) - new_obj['censored'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" - obj = new_obj - if 'results' in obj: - if isinstance(obj['results'], list): - for i in xrange(len(obj['results'])): - obj['results'][i] = censor(obj['results'][i], obj.get('_ansible_no_log', no_log)) - elif obj.get('_ansible_no_log', False): - obj['results'] = "the output has been hidden due to the fact that 'no_log: true' was specified for this result" - return obj - - -class TokenAuth(requests.auth.AuthBase): - - def __init__(self, token): - self.token = token - - def __call__(self, request): - request.headers['Authorization'] = 'Token %s' % self.token - return request - - -# TODO: non v2_ events are deprecated and should be purge/refactored out -class BaseCallbackModule(object): - ''' - Callback module for logging ansible-playbook job events via the REST API. - ''' - - def __init__(self): - self.base_url = os.getenv('REST_API_URL', '') - self.auth_token = os.getenv('REST_API_TOKEN', '') - self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) - self.connection_queue = os.getenv('CALLBACK_QUEUE', '') - self.connection = None - self.exchange = None - self._init_logging() - self._init_connection() - self.counter = 0 - self.active_playbook = None - self.active_play = None - self.active_task = None - - def _init_logging(self): - try: - self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0')) - except ValueError: - self.job_callback_debug = 0 - self.logger = logging.getLogger('awx.plugins.callback.job_event_callback') - if self.job_callback_debug >= 2: - self.logger.setLevel(logging.DEBUG) - elif self.job_callback_debug >= 1: - self.logger.setLevel(logging.INFO) - else: - self.logger.setLevel(logging.WARNING) - handler = logging.StreamHandler() - formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - self.logger.propagate = False - - def _init_connection(self): - self.connection = None - - def _start_connection(self): - self.connection = Connection(self.callback_connection) - self.exchange = Exchange(self.connection_queue, type='direct') - - def _post_job_event_queue_msg(self, event, event_data): - self.counter += 1 - msg = { - 'event': event, - 'event_data': event_data, - 'counter': self.counter, - 'created': datetime.datetime.utcnow().isoformat(), - } - if event in ('playbook_on_play_start', - 'playbook_on_stats', - 'playbook_on_vars_prompt'): - msg['parent_uuid'] = str(self.active_playbook) - elif event in ('playbook_on_notify', - 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_include', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - msg['parent_uuid'] = str(self.active_play) - elif event.startswith('runner_on_') or event.startswith('runner_item_on_'): - msg['parent_uuid'] = str(self.active_task) - else: - msg['parent_uuid'] = '' - - if "uuid" in event_data: - msg['uuid'] = str(event_data['uuid']) - else: - msg['uuid'] = '' - - if getattr(self, 'job_id', None): - msg['job_id'] = self.job_id - if getattr(self, 'ad_hoc_command_id', None): - msg['ad_hoc_command_id'] = self.ad_hoc_command_id - - if getattr(self, 'artifact_data', None): - msg['artifact_data'] = self.artifact_data - - active_pid = os.getpid() - if self.job_callback_debug: - msg.update({ - 'pid': active_pid, - }) - for retry_count in xrange(4): - try: - if not hasattr(self, 'connection_pid'): - self.connection_pid = active_pid - if self.connection_pid != active_pid: - self._init_connection() - if self.connection is None: - self._start_connection() - - producer = Producer(self.connection) - producer.publish(msg, - serializer='json', - compression='bzip2', - exchange=self.exchange, - declare=[self.exchange], - routing_key=self.connection_queue) - return - except Exception, e: - self.logger.info('Publish Job Event Exception: %r, retry=%d', e, - retry_count, exc_info=True) - retry_count += 1 - if retry_count >= 3: - break - - def _post_rest_api_event(self, event, event_data): - data = json.dumps({ - 'event': event, - 'event_data': event_data, - }) - parts = urlparse.urlsplit(self.base_url) - if parts.username and parts.password: - auth = (parts.username, parts.password) - elif self.auth_token: - auth = TokenAuth(self.auth_token) - else: - auth = None - port = parts.port or (443 if parts.scheme == 'https' else 80) - url = urlparse.urlunsplit([parts.scheme, - '%s:%d' % (parts.hostname, port), - parts.path, parts.query, parts.fragment]) - url = urlparse.urljoin(url, self.rest_api_path) - headers = {'content-type': 'application/json'} - response = requests.post(url, data=data, headers=headers, auth=auth) - response.raise_for_status() - - def _log_event(self, event, **event_data): - if 'res' in event_data: - event_data['res'] = censor(deepcopy(event_data['res'])) - - if self.callback_connection: - self._post_job_event_queue_msg(event, event_data) - else: - self._post_rest_api_event(event, event_data) - - def on_any(self, *args, **kwargs): - pass - - def runner_on_failed(self, host, res, ignore_errors=False): - self._log_event('runner_on_failed', host=host, res=res, - ignore_errors=ignore_errors) - - def v2_runner_on_failed(self, result, ignore_errors=False): - event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None - self._log_event('runner_on_failed', host=result._host.name, - res=result._result, task=result._task, - ignore_errors=ignore_errors, event_loop=event_is_loop) - - def runner_on_ok(self, host, res): - self._log_event('runner_on_ok', host=host, res=res) - - def v2_runner_on_ok(self, result): - event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None - self._log_event('runner_on_ok', host=result._host.name, - task=result._task, res=result._result, - event_loop=event_is_loop) - - def runner_on_error(self, host, msg): - self._log_event('runner_on_error', host=host, msg=msg) - - def v2_runner_on_error(self, result): - pass # Currently not implemented in v2 - - def runner_on_skipped(self, host, item=None): - self._log_event('runner_on_skipped', host=host, item=item) - - def v2_runner_on_skipped(self, result): - event_is_loop = result._task.loop if hasattr(result._task, 'loop') else None - self._log_event('runner_on_skipped', host=result._host.name, - task=result._task, event_loop=event_is_loop) - - def runner_on_unreachable(self, host, res): - self._log_event('runner_on_unreachable', host=host, res=res) - - def v2_runner_on_unreachable(self, result): - self._log_event('runner_on_unreachable', host=result._host.name, - task=result._task, res=result._result) - - def runner_on_no_hosts(self): - self._log_event('runner_on_no_hosts') - - def v2_runner_on_no_hosts(self, task): - self._log_event('runner_on_no_hosts', task=task) - - # V2 does not use the _on_async callbacks (yet). - - def runner_on_async_poll(self, host, res, jid, clock): - self._log_event('runner_on_async_poll', host=host, res=res, jid=jid, - clock=clock) - - def runner_on_async_ok(self, host, res, jid): - self._log_event('runner_on_async_ok', host=host, res=res, jid=jid) - - def runner_on_async_failed(self, host, res, jid): - self._log_event('runner_on_async_failed', host=host, res=res, jid=jid) - - def runner_on_file_diff(self, host, diff): - self._log_event('runner_on_file_diff', host=host, diff=diff) - - def v2_runner_on_file_diff(self, result, diff): - self._log_event('runner_on_file_diff', host=result._host.name, - task=result._task, diff=diff) - - def v2_runner_item_on_ok(self, result): - self._log_event('runner_item_on_ok', res=result._result, host=result._host.name, - task=result._task) - - def v2_runner_item_on_failed(self, result): - self._log_event('runner_item_on_failed', res=result._result, host=result._host.name, - task=result._task) - - def v2_runner_item_on_skipped(self, result): - self._log_event('runner_item_on_skipped', res=result._result, host=result._host.name, - task=result._task) - - @staticmethod - def terminate_ssh_control_masters(): - # Determine if control persist is being used and if any open sockets - # exist after running the playbook. - cp_path = os.environ.get('ANSIBLE_SSH_CONTROL_PATH', '') - if not cp_path: - return - cp_dir = os.path.dirname(cp_path) - if not os.path.exists(cp_dir): - return - cp_pattern = os.path.join(cp_dir, 'ansible-ssh-*') - cp_files = glob.glob(cp_pattern) - if not cp_files: - return - - # Attempt to find any running control master processes. - username = pwd.getpwuid(os.getuid())[0] - ssh_cm_procs = [] - for proc in psutil.process_iter(): - try: - pname = proc.name() - pcmdline = proc.cmdline() - pusername = proc.username() - except psutil.NoSuchProcess: - continue - if pusername != username: - continue - if pname != 'ssh': - continue - for cp_file in cp_files: - if pcmdline and cp_file in pcmdline[0]: - ssh_cm_procs.append(proc) - break - - # Terminate then kill control master processes. Workaround older - # version of psutil that may not have wait_procs implemented. - for proc in ssh_cm_procs: - proc.terminate() - procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5) - for proc in procs_alive: - proc.kill() - - -class JobCallbackModule(BaseCallbackModule): - ''' - Callback module for logging ansible-playbook job events via the REST API. - ''' - - # These events should never have an associated play. - EVENTS_WITHOUT_PLAY = [ - 'playbook_on_start', - 'playbook_on_stats', - ] - # These events should never have an associated task. - EVENTS_WITHOUT_TASK = EVENTS_WITHOUT_PLAY + [ - 'playbook_on_setup', - 'playbook_on_notify', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - ] - - def __init__(self): - self.job_id = int(os.getenv('JOB_ID', '0')) - self.rest_api_path = '/api/v1/jobs/%d/job_events/' % self.job_id - super(JobCallbackModule, self).__init__() - - def _log_event(self, event, **event_data): - play = getattr(self, 'play', None) - play_name = getattr(play, 'name', '') - if play_name and event not in self.EVENTS_WITHOUT_PLAY: - event_data['play'] = play_name - task = event_data.pop('task', None) or getattr(self, 'task', None) - task_name = None - role_name = None - if task: - if hasattr(task, 'get_name'): - # in v2, the get_name() method creates the name - task_name = task.get_name() - else: - # v1 datastructure - task_name = getattr(task, 'name', '') - if hasattr(task, '_role') and task._role: - # v2 datastructure - role_name = task._role._role_name - else: - # v1 datastructure - role_name = getattr(task, 'role_name', '') - if task_name and event not in self.EVENTS_WITHOUT_TASK: - event_data['task'] = task_name - if role_name and event not in self.EVENTS_WITHOUT_TASK: - event_data['role'] = role_name - self.artifact_data = None - if 'res' in event_data and 'artifact_data' in event_data['res']: - self.artifact_data = event_data['res']['artifact_data'] - super(JobCallbackModule, self)._log_event(event, **event_data) - - def playbook_on_start(self): - self._log_event('playbook_on_start') - - def v2_playbook_on_start(self, playbook): - # NOTE: the playbook parameter was added late in Ansible 2.0 development - # so we don't currently utilize but could later. - # NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them - self.active_playbook = str(uuid4()) - self._log_event('playbook_on_start', uuid=self.active_playbook) - - def playbook_on_notify(self, host, handler): - self._log_event('playbook_on_notify', host=host, handler=handler) - - def v2_playbook_on_notify(self, result, handler): - self._log_event('playbook_on_notify', host=result._host.name, - task=result._task, handler=handler) - - def playbook_on_no_hosts_matched(self): - self._log_event('playbook_on_no_hosts_matched') - - def v2_playbook_on_no_hosts_matched(self): - # since there is no task/play info, this is currently identical - # to the v1 callback which does the same thing - self.playbook_on_no_hosts_matched() - - def playbook_on_no_hosts_remaining(self): - self._log_event('playbook_on_no_hosts_remaining') - - def v2_playbook_on_no_hosts_remaining(self): - # since there is no task/play info, this is currently identical - # to the v1 callback which does the same thing - self.playbook_on_no_hosts_remaining() - - def playbook_on_task_start(self, name, is_conditional): - self._log_event('playbook_on_task_start', name=name, - is_conditional=is_conditional) - - def v2_playbook_on_task_start(self, task, is_conditional): - self.active_task = task._uuid - self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), - name=task.get_name(), is_conditional=is_conditional) - - def v2_playbook_on_cleanup_task_start(self, task): - # re-using playbook_on_task_start event here for this v2-specific - # event, though we may consider any changes necessary to distinguish - # this from a normal task - self.active_task = task._uuid - self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), - name=task.get_name()) - - def playbook_on_vars_prompt(self, varname, private=True, prompt=None, - encrypt=None, confirm=False, salt_size=None, - salt=None, default=None): - self._log_event('playbook_on_vars_prompt', varname=varname, - private=private, prompt=prompt, encrypt=encrypt, - confirm=confirm, salt_size=salt_size, salt=salt, - default=default) - - def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None, - encrypt=None, confirm=False, salt_size=None, - salt=None, default=None): - pass # not currently used in v2 (yet) - - def playbook_on_setup(self): - self._log_event('playbook_on_setup') - - def v2_playbook_on_setup(self): - pass # not currently used in v2 (yet) - - def playbook_on_import_for_host(self, host, imported_file): - # don't care about recording this one - # self._log_event('playbook_on_import_for_host', host=host, - # imported_file=imported_file) - pass - - def v2_playbook_on_import_for_host(self, result, imported_file): - pass # not currently used in v2 (yet) - - def playbook_on_not_import_for_host(self, host, missing_file): - # don't care about recording this one - #self._log_event('playbook_on_not_import_for_host', host=host, - # missing_file=missing_file) - pass - - def v2_playbook_on_not_import_for_host(self, result, missing_file): - pass # not currently used in v2 (yet) - - def playbook_on_play_start(self, name): - # Only play name is passed via callback, get host pattern from the play. - pattern = getattr(getattr(self, 'play', None), 'hosts', name) - self._log_event('playbook_on_play_start', name=name, pattern=pattern) - - def v2_playbook_on_play_start(self, play): - setattr(self, 'play', play) - # Ansible 2.0.0.2 doesn't default .name to hosts like it did in 1.9.4, - # though that default will likely return in a future version of Ansible. - if (not hasattr(play, 'name') or not play.name) and hasattr(play, 'hosts'): - if isinstance(play.hosts, list): - play.name = ','.join(play.hosts) - else: - play.name = play.hosts - self.active_play = play._uuid - self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid), - pattern=play.hosts) - - def playbook_on_stats(self, stats): - d = {} - for attr in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'): - d[attr] = getattr(stats, attr) - self._log_event('playbook_on_stats', **d) - self.terminate_ssh_control_masters() - - def v2_playbook_on_stats(self, stats): - self.playbook_on_stats(stats) - - def v2_playbook_on_include(self, included_file): - self._log_event('playbook_on_include', included_file=included_file) - -class AdHocCommandCallbackModule(BaseCallbackModule): - ''' - Callback module for logging ansible ad hoc events via ZMQ or the REST API. - ''' - - def __init__(self): - self.ad_hoc_command_id = int(os.getenv('AD_HOC_COMMAND_ID', '0')) - self.rest_api_path = '/api/v1/ad_hoc_commands/%d/events/' % self.ad_hoc_command_id - self.skipped_hosts = set() - super(AdHocCommandCallbackModule, self).__init__() - - def _log_event(self, event, **event_data): - # Ignore task for ad hoc commands (with v2). - event_data.pop('task', None) - super(AdHocCommandCallbackModule, self)._log_event(event, **event_data) - - def runner_on_file_diff(self, host, diff): - pass # Ignore file diff for ad hoc commands. - - def runner_on_ok(self, host, res): - # When running in check mode using a module that does not support check - # mode, Ansible v1.9 will call runner_on_skipped followed by - # runner_on_ok for the same host; only capture the skipped event and - # ignore the ok event. - if host not in self.skipped_hosts: - super(AdHocCommandCallbackModule, self).runner_on_ok(host, res) - - def runner_on_skipped(self, host, item=None): - super(AdHocCommandCallbackModule, self).runner_on_skipped(host, item) - self.skipped_hosts.add(host) - -if os.getenv('JOB_ID', ''): - CallbackModule = JobCallbackModule -elif os.getenv('AD_HOC_COMMAND_ID', ''): - CallbackModule = AdHocCommandCallbackModule diff --git a/awx/plugins/callback/minimal.py b/awx/plugins/callback/minimal.py new file mode 100644 index 0000000000..e34ebdd78a --- /dev/null +++ b/awx/plugins/callback/minimal.py @@ -0,0 +1,30 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import os +import sys + +# Add awx/lib to sys.path. +awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib')) +if awx_lib_path not in sys.path: + sys.path.insert(0, awx_lib_path) + +# Tower Display Callback +from tower_display_callback import TowerMinimalCallbackModule as CallbackModule diff --git a/awx/plugins/callback/tower_display.py b/awx/plugins/callback/tower_display.py new file mode 100644 index 0000000000..36f6a29537 --- /dev/null +++ b/awx/plugins/callback/tower_display.py @@ -0,0 +1,30 @@ +# Copyright (c) 2016 Ansible by Red Hat, Inc. +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import (absolute_import, division, print_function) + +# Python +import os +import sys + +# Add awx/lib to sys.path. +awx_lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'lib')) +if awx_lib_path not in sys.path: + sys.path.insert(0, awx_lib_path) + +# Tower Display Callback +from tower_display_callback import TowerDefaultCallbackModule as CallbackModule From 440f0539b08d1b52c27cf2cd0c9f162d0a196e4a Mon Sep 17 00:00:00 2001 From: Chris Church Date: Sat, 22 Oct 2016 00:26:06 -0400 Subject: [PATCH 2/5] Flake8 fixes. --- awx/api/views.py | 3 +-- awx/lib/sitecustomize.py | 2 +- awx/lib/tower_display_callback/__init__.py | 4 ++-- awx/lib/tower_display_callback/cleanup.py | 2 +- awx/lib/tower_display_callback/display.py | 6 +++--- awx/lib/tower_display_callback/events.py | 1 - awx/lib/tower_display_callback/module.py | 7 ++++--- awx/main/management/commands/run_callback_receiver.py | 1 - awx/main/models/ad_hoc_commands.py | 1 - awx/main/tasks.py | 4 ++++ awx/plugins/callback/minimal.py | 2 +- awx/plugins/callback/tower_display.py | 2 +- 12 files changed, 18 insertions(+), 17 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index c95cebcb9e..3fb9826519 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -10,9 +10,8 @@ import time import socket import sys import logging -from base64 import b64encode, b64decode +from base64 import b64encode from collections import OrderedDict -from HTMLParser import HTMLParser # Django from django.conf import settings diff --git a/awx/lib/sitecustomize.py b/awx/lib/sitecustomize.py index cf6cab211e..02ac2eba55 100644 --- a/awx/lib/sitecustomize.py +++ b/awx/lib/sitecustomize.py @@ -7,7 +7,7 @@ import sys def argv_ready(argv): if argv and os.path.basename(argv[0]) in {'ansible', 'ansible-playbook'}: - import tower_display_callback + import tower_display_callback # noqa class argv_placeholder(object): diff --git a/awx/lib/tower_display_callback/__init__.py b/awx/lib/tower_display_callback/__init__.py index 313a1d50c6..d984956c7f 100644 --- a/awx/lib/tower_display_callback/__init__.py +++ b/awx/lib/tower_display_callback/__init__.py @@ -18,8 +18,8 @@ from __future__ import (absolute_import, division, print_function) # Tower Display Callback -from . import cleanup # to register control persistent cleanup. -from . import display # to wrap ansible.display.Display methods. +from . import cleanup # noqa (registers control persistent cleanup) +from . import display # noqa (wraps ansible.display.Display methods) from .module import TowerDefaultCallbackModule, TowerMinimalCallbackModule __all__ = ['TowerDefaultCallbackModule', 'TowerMinimalCallbackModule'] diff --git a/awx/lib/tower_display_callback/cleanup.py b/awx/lib/tower_display_callback/cleanup.py index 1bc276f742..7a0387cddf 100644 --- a/awx/lib/tower_display_callback/cleanup.py +++ b/awx/lib/tower_display_callback/cleanup.py @@ -69,4 +69,4 @@ def terminate_ssh_control_masters(): proc.terminate() procs_gone, procs_alive = psutil.wait_procs(ssh_cm_procs, timeout=5) for proc in procs_alive: - proc.kill() \ No newline at end of file + proc.kill() diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py index ec32fec334..5b1265201e 100644 --- a/awx/lib/tower_display_callback/display.py +++ b/awx/lib/tower_display_callback/display.py @@ -18,10 +18,7 @@ from __future__ import (absolute_import, division, print_function) # Python -import cgi -import contextlib import functools -import json import sys import uuid @@ -36,6 +33,7 @@ __all__ = [] def with_context(**context): global event_context + def wrap(f): @functools.wraps(f) def wrapper(*args, **kwargs): @@ -57,6 +55,7 @@ for attr in dir(Display): def with_verbosity(f): global event_context + @functools.wraps(f) def wrapper(*args, **kwargs): host = args[2] if len(args) >= 3 else kwargs.get('host', None) @@ -72,6 +71,7 @@ Display.verbose = with_verbosity(Display.verbose) def display_with_context(f): + @functools.wraps(f) def wrapper(*args, **kwargs): log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False) diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index fa664e5856..ad7eb6418e 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -19,7 +19,6 @@ from __future__ import (absolute_import, division, print_function) # Python import base64 -import cgi import contextlib import datetime import json diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index 7be4835aa7..e5b4e21713 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -20,7 +20,6 @@ from __future__ import (absolute_import, division, print_function) # Python import contextlib import copy -import os import re import sys import uuid @@ -209,8 +208,10 @@ class BaseCallbackModule(CallbackBase): default=default, ) with self.capture_event_data('playbook_on_vars_prompt', **event_data): - super(BaseCallbackModule, self).v2_playbook_on_vars_prompt(varname, - private, prompt, encrypt, confirm, salt_size, salt, default) + super(BaseCallbackModule, self).v2_playbook_on_vars_prompt( + varname, private, prompt, encrypt, confirm, salt_size, salt, + default, + ) def v2_playbook_on_include(self, included_file): event_data = dict( diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index e99a34aa4f..6381660948 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,7 +3,6 @@ # Python import logging -import json from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index fc56802160..c81531d22c 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -10,7 +10,6 @@ from urlparse import urljoin # Django from django.conf import settings -from django.core.cache import cache from django.db import models from django.utils.dateparse import parse_datetime from django.utils.text import Truncator diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 503509890b..cd785ef96a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -991,9 +991,11 @@ class RunJob(BaseTask): Wrap stdout file object to capture events. ''' stdout_handle = super(RunJob, self).get_stdout_handle(instance) + def job_event_callback(event_data): event_data.setdefault('job_id', instance.id) JobEvent.create_from_data(**event_data) + return OutputEventFilter(stdout_handle, job_event_callback) def get_ssh_key_path(self, instance, **kwargs): @@ -1716,9 +1718,11 @@ class RunAdHocCommand(BaseTask): Wrap stdout file object to capture events. ''' stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance) + def ad_hoc_command_event_callback(event_data): event_data.setdefault('ad_hoc_command_id', instance.id) AdHocCommandEvent.create_from_data(**event_data) + return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback) def get_ssh_key_path(self, instance, **kwargs): diff --git a/awx/plugins/callback/minimal.py b/awx/plugins/callback/minimal.py index e34ebdd78a..fcbaa76d55 100644 --- a/awx/plugins/callback/minimal.py +++ b/awx/plugins/callback/minimal.py @@ -27,4 +27,4 @@ if awx_lib_path not in sys.path: sys.path.insert(0, awx_lib_path) # Tower Display Callback -from tower_display_callback import TowerMinimalCallbackModule as CallbackModule +from tower_display_callback import TowerMinimalCallbackModule as CallbackModule # noqa diff --git a/awx/plugins/callback/tower_display.py b/awx/plugins/callback/tower_display.py index 36f6a29537..725232dfe4 100644 --- a/awx/plugins/callback/tower_display.py +++ b/awx/plugins/callback/tower_display.py @@ -27,4 +27,4 @@ if awx_lib_path not in sys.path: sys.path.insert(0, awx_lib_path) # Tower Display Callback -from tower_display_callback import TowerDefaultCallbackModule as CallbackModule +from tower_display_callback import TowerDefaultCallbackModule as CallbackModule # noqa From 908eef13f65008dd7081b26375f741fd79011141 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 28 Oct 2016 11:37:30 -0400 Subject: [PATCH 3/5] Renamed job event migration. --- ...4_v310_job_event_stdout.py => 0045_v310_job_event_stdout.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename awx/main/migrations/{0044_v310_job_event_stdout.py => 0045_v310_job_event_stdout.py} (98%) diff --git a/awx/main/migrations/0044_v310_job_event_stdout.py b/awx/main/migrations/0045_v310_job_event_stdout.py similarity index 98% rename from awx/main/migrations/0044_v310_job_event_stdout.py rename to awx/main/migrations/0045_v310_job_event_stdout.py index 8a66aa9f94..27bce05632 100644 --- a/awx/main/migrations/0044_v310_job_event_stdout.py +++ b/awx/main/migrations/0045_v310_job_event_stdout.py @@ -7,7 +7,7 @@ from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('main', '0043_v310_scm_revision'), + ('main', '0044_v310_project_playbook_files'), ] operations = [ From c43334f8f44549eabdb86a4e1afd7566bba9ead0 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 28 Oct 2016 21:58:03 -0400 Subject: [PATCH 4/5] Update job events based on how they are used in Ansible 2.x. --- awx/lib/tower_display_callback/module.py | 114 ++++++++++++------ .../migrations/0045_v310_job_event_stdout.py | 2 +- awx/main/models/ad_hoc_commands.py | 14 +-- awx/main/models/jobs.py | 32 +++-- 4 files changed, 107 insertions(+), 55 deletions(-) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index e5b4e21713..02b30ef2bc 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -196,7 +196,6 @@ class BaseCallbackModule(CallbackBase): def v2_playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None): - return # not currently used in v2 (yet) - FIXME: Confirm this is still the case? event_data = dict( varname=varname, private=private, @@ -239,17 +238,17 @@ class BaseCallbackModule(CallbackBase): super(BaseCallbackModule, self).v2_playbook_on_play_start(play) def v2_playbook_on_import_for_host(self, result, imported_file): - return # not currently used in v2 (yet) / don't care about recording this one + # NOTE: Not used by Ansible 2.x. with self.capture_event_data('playbook_on_import_for_host'): super(BaseCallbackModule, self).v2_playbook_on_import_for_host(result, imported_file) def v2_playbook_on_not_import_for_host(self, result, missing_file): - return # not currently used in v2 (yet) / don't care about recording this one + # NOTE: Not used by Ansible 2.x. with self.capture_event_data('playbook_on_not_import_for_host'): super(BaseCallbackModule, self).v2_playbook_on_not_import_for_host(result, missing_file) def v2_playbook_on_setup(self): - return # not currently used in v2 (yet) + # NOTE: Not used by Ansible 2.x. with self.capture_event_data('playbook_on_setup'): super(BaseCallbackModule, self).v2_playbook_on_setup() @@ -257,6 +256,9 @@ class BaseCallbackModule(CallbackBase): # FIXME: Flag task path output as vv. task_uuid = str(task._uuid) if task_uuid in self.task_uuids: + # FIXME: When this task UUID repeats, it means the play is using the + # free strategy, so different hosts may be running different tasks + # within a play. return self.task_uuids.add(task_uuid) self.set_task(task) @@ -270,27 +272,27 @@ class BaseCallbackModule(CallbackBase): super(BaseCallbackModule, self).v2_playbook_on_task_start(task, is_conditional) def v2_playbook_on_cleanup_task_start(self, task): - # re-using playbook_on_task_start event here for this v2-specific - # event, though we may consider any changes necessary to distinguish - # this from a normal task FIXME! + # NOTE: Not used by Ansible 2.x. self.set_task(task) event_data = dict( task=task, name=task.get_name(), uuid=str(task._uuid), + is_conditional=True, ) with self.capture_event_data('playbook_on_task_start', **event_data): super(BaseCallbackModule, self).v2_playbook_on_cleanup_task_start(task) def v2_playbook_on_handler_task_start(self, task): - # re-using playbook_on_task_start event here for this v2-specific - # event, though we may consider any changes necessary to distinguish - # this from a normal task FIXME! + # NOTE: Re-using playbook_on_task_start event for this v2-specific + # event, but setting is_conditional=True, which is how v1 identified a + # task run as a handler. self.set_task(task) event_data = dict( task=task, name=task.get_name(), uuid=str(task._uuid), + is_conditional=True, ) with self.capture_event_data('playbook_on_task_start', **event_data): super(BaseCallbackModule, self).v2_playbook_on_handler_task_start(task) @@ -304,8 +306,9 @@ class BaseCallbackModule(CallbackBase): super(BaseCallbackModule, self).v2_playbook_on_no_hosts_remaining() def v2_playbook_on_notify(self, result, handler): + # NOTE: Not used by Ansible 2.x. event_data = dict( - host=result._host.name, + host=result._host.get_name(), task=result._task, handler=handler, ) @@ -329,7 +332,7 @@ class BaseCallbackModule(CallbackBase): def v2_runner_on_ok(self, result): # FIXME: Display detailed results or not based on verbosity. event_data = dict( - host=result._host.name, + host=result._host.get_name(), remote_addr=result._host.address, task=result._task, res=result._result, @@ -341,7 +344,8 @@ class BaseCallbackModule(CallbackBase): def v2_runner_on_failed(self, result, ignore_errors=False): # FIXME: Add verbosity for exception/results output. event_data = dict( - host=result._host.name, + host=result._host.get_name(), + remote_addr=result._host.address, res=result._result, task=result._task, ignore_errors=ignore_errors, @@ -350,12 +354,10 @@ class BaseCallbackModule(CallbackBase): with self.capture_event_data('runner_on_failed', **event_data): super(BaseCallbackModule, self).v2_runner_on_failed(result, ignore_errors) - def v2_runner_on_error(self, result): - pass # Not implemented in v2. - def v2_runner_on_skipped(self, result): event_data = dict( - host=result._host.name, + host=result._host.get_name(), + remote_addr=result._host.address, task=result._task, event_loop=result._task.loop if hasattr(result._task, 'loop') else None, ) @@ -364,7 +366,8 @@ class BaseCallbackModule(CallbackBase): def v2_runner_on_unreachable(self, result): event_data = dict( - host=result._host.name, + host=result._host.get_name(), + remote_addr=result._host.address, task=result._task, res=result._result, ) @@ -372,25 +375,69 @@ class BaseCallbackModule(CallbackBase): super(BaseCallbackModule, self).v2_runner_on_unreachable(result) def v2_runner_on_no_hosts(self, task): + # NOTE: Not used by Ansible 2.x. event_data = dict( task=task, ) with self.capture_event_data('runner_on_no_hosts', **event_data): super(BaseCallbackModule, self).v2_runner_on_no_hosts(task) - def v2_runner_on_file_diff(self, result, diff): - # FIXME: Ignore file diff for ad hoc commands? + def v2_runner_on_async_poll(self, result): + # NOTE: Not used by Ansible 2.x. event_data = dict( - host=result._host.name, + host=result._host.get_name(), + task=result._task, + res=result._result, + jid=result._result.get('ansible_job_id'), + ) + with self.capture_event_data('runner_on_async_poll', **event_data): + super(BaseCallbackModule, self).v2_runner_on_async_poll(result) + + def v2_runner_on_async_ok(self, result): + # NOTE: Not used by Ansible 2.x. + event_data = dict( + host=result._host.get_name(), + task=result._task, + res=result._result, + jid=result._result.get('ansible_job_id'), + ) + with self.capture_event_data('runner_on_async_ok', **event_data): + super(BaseCallbackModule, self).v2_runner_on_async_ok(result) + + def v2_runner_on_async_failed(self, result): + # NOTE: Not used by Ansible 2.x. + event_data = dict( + host=result._host.get_name(), + task=result._task, + res=result._result, + jid=result._result.get('ansible_job_id'), + ) + with self.capture_event_data('runner_on_async_failed', **event_data): + super(BaseCallbackModule, self).v2_runner_on_async_failed(result) + + def v2_runner_on_file_diff(self, result, diff): + # NOTE: Not used by Ansible 2.x. + event_data = dict( + host=result._host.get_name(), task=result._task, diff=diff, ) with self.capture_event_data('runner_on_file_diff', **event_data): super(BaseCallbackModule, self).v2_runner_on_file_diff(result, diff) + def v2_on_file_diff(self, result): + # NOTE: Logged as runner_on_file_diff. + event_data = dict( + host=result._host.get_name(), + task=result._task, + diff=result._result.get('diff'), + ) + with self.capture_event_data('runner_on_file_diff', **event_data): + super(BaseCallbackModule, self).v2_on_file_diff(result) + def v2_runner_item_on_ok(self, result): event_data = dict( - host=result._host.name, + host=result._host.get_name(), task=result._task, res=result._result, ) @@ -399,7 +446,7 @@ class BaseCallbackModule(CallbackBase): def v2_runner_item_on_failed(self, result): event_data = dict( - host=result._host.name, + host=result._host.get_name(), task=result._task, res=result._result, ) @@ -408,24 +455,21 @@ class BaseCallbackModule(CallbackBase): def v2_runner_item_on_skipped(self, result): event_data = dict( - host=result._host.name, + host=result._host.get_name(), task=result._task, res=result._result, ) with self.capture_event_data('runner_item_on_skipped', **event_data): super(BaseCallbackModule, self).v2_runner_item_on_skipped(result) - # V2 does not use the _on_async callbacks (yet). - - def runner_on_async_poll(self, host, res, jid, clock): - self._log_event('runner_on_async_poll', host=host, res=res, jid=jid, - clock=clock) - - def runner_on_async_ok(self, host, res, jid): - self._log_event('runner_on_async_ok', host=host, res=res, jid=jid) - - def runner_on_async_failed(self, host, res, jid): - self._log_event('runner_on_async_failed', host=host, res=res, jid=jid) + def v2_runner_retry(self, result): + event_data = dict( + host=result._host.get_name(), + task=result._task, + res=result._result, + ) + with self.capture_event_data('runner_retry', **event_data): + super(BaseCallbackModule, self).v2_runner_retry(result) class TowerDefaultCallbackModule(BaseCallbackModule, DefaultCallbackModule): diff --git a/awx/main/migrations/0045_v310_job_event_stdout.py b/awx/main/migrations/0045_v310_job_event_stdout.py index 27bce05632..e3325ddb6b 100644 --- a/awx/main/migrations/0045_v310_job_event_stdout.py +++ b/awx/main/migrations/0045_v310_job_event_stdout.py @@ -79,7 +79,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='jobevent', name='event', - field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]), + field=models.CharField(max_length=100, choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_item_on_ok', 'Item OK'), (b'runner_item_on_failed', 'Item Failed'), (b'runner_item_on_skipped', 'Item Skipped'), (b'runner_retry', 'Host Retry'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_include', 'Including File'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')]), ), migrations.AlterUniqueTogether( name='adhoccommandevent', diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index c81531d22c..65f40427b0 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -260,16 +260,16 @@ class AdHocCommandEvent(CreatedModifiedModel): ('runner_on_ok', _('Host OK'), False), ('runner_on_unreachable', _('Host Unreachable'), True), # Tower won't see no_hosts (check is done earlier without callback). - #('runner_on_no_hosts', _('No Hosts Matched'), False), + # ('runner_on_no_hosts', _('No Hosts Matched'), False), # Tower will see skipped (when running in check mode for a module that # does not support check mode). ('runner_on_skipped', _('Host Skipped'), False), - # Tower does not support async for ad hoc commands. - #('runner_on_async_poll', _('Host Polling'), False), - #('runner_on_async_ok', _('Host Async OK'), False), - #('runner_on_async_failed', _('Host Async Failure'), True), - # Tower does not yet support --diff mode - #('runner_on_file_diff', _('File Difference'), False), + # Tower does not support async for ad hoc commands (not used in v2). + # ('runner_on_async_poll', _('Host Polling'), False), + # ('runner_on_async_ok', _('Host Async OK'), False), + # ('runner_on_async_failed', _('Host Async Failure'), True), + # Tower does not yet support --diff mode. + # ('runner_on_file_diff', _('File Difference'), False), # Additional event types for captured stdout not directly related to # runner events. diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 74a8395a2a..a15e291d78 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -936,11 +936,12 @@ class JobEvent(CreatedModifiedModel): # - playbook_on_vars_prompt (for each play, but before play starts, we # currently don't handle responding to these prompts) # - playbook_on_play_start (once for each play) - # - playbook_on_import_for_host - # - playbook_on_not_import_for_host + # - playbook_on_import_for_host (not logged, not used for v2) + # - playbook_on_not_import_for_host (not logged, not used for v2) # - playbook_on_no_hosts_matched # - playbook_on_no_hosts_remaining - # - playbook_on_setup + # - playbook_on_include (only v2 - only used for handlers?) + # - playbook_on_setup (not used for v2) # - runner_on* # - playbook_on_task_start (once for each task within a play) # - runner_on_failed @@ -948,12 +949,16 @@ class JobEvent(CreatedModifiedModel): # - runner_on_error (not used for v2) # - runner_on_skipped # - runner_on_unreachable - # - runner_on_no_hosts - # - runner_on_async_poll - # - runner_on_async_ok - # - runner_on_async_failed - # - runner_on_file_diff - # - playbook_on_notify (once for each notification from the play) + # - runner_on_no_hosts (not used for v2) + # - runner_on_async_poll (not used for v2) + # - runner_on_async_ok (not used for v2) + # - runner_on_async_failed (not used for v2) + # - runner_on_file_diff (v2 event is v2_on_file_diff) + # - runner_item_on_ok (v2 only) + # - runner_item_on_failed (v2 only) + # - runner_item_on_skipped (v2 only) + # - runner_retry (v2 only) + # - playbook_on_notify (once for each notification from the play, not used for v2) # - playbook_on_stats EVENT_TYPES = [ @@ -967,19 +972,22 @@ class JobEvent(CreatedModifiedModel): (3, 'runner_on_async_poll', _('Host Polling'), False), (3, 'runner_on_async_ok', _('Host Async OK'), False), (3, 'runner_on_async_failed', _('Host Async Failure'), True), - # Tower does not yet support --diff mode + (3, 'runner_item_on_ok', _('Item OK'), False), + (3, 'runner_item_on_failed', _('Item Failed'), True), + (3, 'runner_item_on_skipped', _('Item Skipped'), False), + (3, 'runner_retry', _('Host Retry'), False), + # Tower does not yet support --diff mode. (3, 'runner_on_file_diff', _('File Difference'), False), (0, 'playbook_on_start', _('Playbook Started'), False), (2, 'playbook_on_notify', _('Running Handlers'), False), + (2, 'playbook_on_include', _('Including File'), False), (2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False), (2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False), (2, 'playbook_on_task_start', _('Task Started'), False), # Tower does not yet support vars_prompt (and will probably hang :) (1, 'playbook_on_vars_prompt', _('Variables Prompted'), False), (2, 'playbook_on_setup', _('Gathering Facts'), False), - # callback will not record this (2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False), - # callback will not record this (2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False), (1, 'playbook_on_play_start', _('Play Started'), False), (1, 'playbook_on_stats', _('Playbook Complete'), False), From 31faca2b4f3de37502c1025200cb2b9e6404e61b Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 28 Oct 2016 22:32:49 -0400 Subject: [PATCH 5/5] Add option to use callback queue for job events. --- awx/main/queue.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++- awx/main/tasks.py | 27 ++++++++++++++++++++------ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/awx/main/queue.py b/awx/main/queue.py index b0b8d0374e..bfb487441f 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -1,9 +1,19 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +# Python import json +import logging +import os + +# Django +from django.conf import settings + +# Kombu +from kombu import Connection, Exchange, Producer + +__all__ = ['FifoQueue', 'CallbackQueueDispatcher'] -__all__ = ['FifoQueue'] # TODO: Figure out wtf to do with this class class FifoQueue(object): @@ -33,3 +43,39 @@ class FifoQueue(object): answer = None if answer: return json.loads(answer) + + +class CallbackQueueDispatcher(object): + + def __init__(self): + self.callback_connection = getattr(settings, 'CALLBACK_CONNECTION', None) + self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None + self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') + + def dispatch(self, obj): + if not self.callback_connection or not self.connection_queue: + return + active_pid = os.getpid() + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self.connection = None + if self.connection is None: + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') + + producer = Producer(self.connection) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) + return + except Exception, e: + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, + retry_count, exc_info=True) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index cd785ef96a..35858577a7 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -47,6 +47,7 @@ from django.contrib.auth.models import User from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob +from awx.main.queue import CallbackQueueDispatcher from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, @@ -991,10 +992,17 @@ class RunJob(BaseTask): Wrap stdout file object to capture events. ''' stdout_handle = super(RunJob, self).get_stdout_handle(instance) + + if getattr(settings, 'USE_CALLBACK_QUEUE', False): + dispatcher = CallbackQueueDispatcher() - def job_event_callback(event_data): - event_data.setdefault('job_id', instance.id) - JobEvent.create_from_data(**event_data) + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + dispatcher.dispatch(event_data) + else: + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + JobEvent.create_from_data(**event_data) return OutputEventFilter(stdout_handle, job_event_callback) @@ -1719,9 +1727,16 @@ class RunAdHocCommand(BaseTask): ''' stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance) - def ad_hoc_command_event_callback(event_data): - event_data.setdefault('ad_hoc_command_id', instance.id) - AdHocCommandEvent.create_from_data(**event_data) + if getattr(settings, 'USE_CALLBACK_QUEUE', False): + dispatcher = CallbackQueueDispatcher() + + def ad_hoc_command_event_callback(event_data): + event_data.setdefault('ad_hoc_command_id', instance.id) + dispatcher.dispatch(event_data) + else: + def ad_hoc_command_event_callback(event_data): + event_data.setdefault('ad_hoc_command_id', instance.id) + AdHocCommandEvent.create_from_data(**event_data) return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)