diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 3923e5c9ef..6c5e4711f6 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -614,14 +614,12 @@ class UnifiedJobTemplateSerializer(BaseSerializer): class UnifiedJobSerializer(BaseSerializer): show_capabilities = ['start', 'delete'] - result_stdout = serializers.SerializerMethodField() - class Meta: model = UnifiedJob fields = ('*', 'unified_job_template', 'launch_type', 'status', 'failed', 'started', 'finished', 'elapsed', 'job_args', - 'job_cwd', 'job_env', 'job_explanation', 'result_stdout', - 'execution_node', 'result_traceback') + 'job_cwd', 'job_env', 'job_explanation', 'execution_node', + 'result_traceback') extra_kwargs = { 'unified_job_template': { 'source': 'unified_job_template_id', @@ -702,25 +700,17 @@ class UnifiedJobSerializer(BaseSerializer): return ret - def get_result_stdout(self, obj): - obj_size = obj.result_stdout_size - if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY: - return _("Standard Output too large to display (%(text_size)d bytes), " - "only download supported for sizes over %(supported_size)d bytes") % { - 'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY} - return obj.result_stdout - class UnifiedJobListSerializer(UnifiedJobSerializer): class Meta: - fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback', '-result_stdout') + fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback') def get_field_names(self, declared_fields, info): field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info) # Meta multiple inheritance and -field_name options don't seem to be # taking effect above, so remove the undesired fields here. - return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback', 'result_stdout')) + return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback')) def get_types(self): if type(self) is UnifiedJobListSerializer: @@ -760,14 +750,6 @@ class UnifiedJobStdoutSerializer(UnifiedJobSerializer): class Meta: fields = ('result_stdout',) - def get_result_stdout(self, obj): - obj_size = obj.result_stdout_size - if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY: - return _("Standard Output too large to display (%(text_size)d bytes), " - "only download supported for sizes over %(supported_size)d bytes") % { - 'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY} - return obj.result_stdout - def get_types(self): if type(self) is UnifiedJobStdoutSerializer: return ['project_update', 'inventory_update', 'job', 'ad_hoc_command', 'system_job'] @@ -2966,9 +2948,11 @@ class SystemJobTemplateSerializer(UnifiedJobTemplateSerializer): class SystemJobSerializer(UnifiedJobSerializer): + result_stdout = serializers.SerializerMethodField() + class Meta: model = SystemJob - fields = ('*', 'system_job_template', 'job_type', 'extra_vars') + fields = ('*', 'system_job_template', 'job_type', 'extra_vars', 'result_stdout') def get_related(self, obj): res = super(SystemJobSerializer, self).get_related(obj) @@ -2980,6 +2964,9 @@ class SystemJobSerializer(UnifiedJobSerializer): res['cancel'] = self.reverse('api:system_job_cancel', kwargs={'pk': obj.pk}) return res + def get_result_stdout(self, obj): + return obj.result_stdout + class SystemJobCancelSerializer(SystemJobSerializer): diff --git a/awx/api/views.py b/awx/api/views.py index a41ac8a484..08af3c8658 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2,13 +2,11 @@ # All Rights Reserved. # Python -import os import re import cgi import dateutil import time import socket -import subprocess import sys import logging import requests @@ -20,7 +18,7 @@ import six from django.conf import settings from django.core.exceptions import FieldError from django.db.models import Q, Count, F -from django.db import IntegrityError, transaction, connection +from django.db import IntegrityError, transaction from django.shortcuts import get_object_or_404 from django.utils.encoding import smart_text, force_text from django.utils.safestring import mark_safe @@ -4498,7 +4496,7 @@ class StdoutANSIFilter(object): def __init__(self, fileobj): self.fileobj = fileobj self.extra_data = '' - if hasattr(fileobj,'close'): + if hasattr(fileobj, 'close'): self.close = fileobj.close def read(self, size=-1): @@ -4529,93 +4527,73 @@ class UnifiedJobStdout(RetrieveAPIView): renderers.JSONRenderer, DownloadTextRenderer, AnsiDownloadRenderer] filter_backends = () new_in_148 = True + deprecated = True def retrieve(self, request, *args, **kwargs): unified_job = self.get_object() - obj_size = unified_job.result_stdout_size - if request.accepted_renderer.format not in {'txt_download', 'ansi_download'} and obj_size > settings.STDOUT_MAX_BYTES_DISPLAY: - response_message = _("Standard Output too large to display (%(text_size)d bytes), " - "only download supported for sizes over %(supported_size)d bytes") % { - 'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY} + try: + target_format = request.accepted_renderer.format + if target_format in ('html', 'api', 'json'): + content_format = request.query_params.get('content_format', 'html') + content_encoding = request.query_params.get('content_encoding', None) + start_line = request.query_params.get('start_line', 0) + end_line = request.query_params.get('end_line', None) + dark_val = request.query_params.get('dark', '') + dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y')) + content_only = bool(target_format in ('api', 'json')) + dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val)) + content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line) + + # Remove any ANSI escape sequences containing job event data. + content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content) + + body = ansiconv.to_html(cgi.escape(content)) + + context = { + 'title': get_view_name(self.__class__), + 'body': mark_safe(body), + 'dark': dark_bg, + 'content_only': content_only, + } + data = render_to_string('api/stdout.html', context).strip() + + if target_format == 'api': + return Response(mark_safe(data)) + if target_format == 'json': + if content_encoding == 'base64' and content_format == 'ansi': + return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)}) + elif content_format == 'html': + return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body}) + return Response(data) + elif target_format == 'txt': + return Response(unified_job.result_stdout) + elif target_format == 'ansi': + return Response(unified_job.result_stdout_raw) + elif target_format in {'txt_download', 'ansi_download'}: + filename = '{type}_{pk}{suffix}.txt'.format( + type=camelcase_to_underscore(unified_job.__class__.__name__), + pk=unified_job.id, + suffix='.ansi' if target_format == 'ansi_download' else '' + ) + content_fd = unified_job.result_stdout_raw_handle(enforce_max_bytes=False) + if target_format == 'txt_download': + content_fd = StdoutANSIFilter(content_fd) + response = HttpResponse(FileWrapper(content_fd), content_type='text/plain') + response["Content-Disposition"] = 'attachment; filename="{}"'.format(filename) + return response + else: + return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs) + except StdoutMaxBytesExceeded as e: + response_message = _( + "Standard Output too large to display {text_size} bytes), " + "only download supported for sizes over {supported_size} bytes").format( + text_size=e.total, supported_size=e.supported + ) if request.accepted_renderer.format == 'json': return Response({'range': {'start': 0, 'end': 1, 'absolute_end': 1}, 'content': response_message}) else: return Response(response_message) - if request.accepted_renderer.format in ('html', 'api', 'json'): - content_format = request.query_params.get('content_format', 'html') - content_encoding = request.query_params.get('content_encoding', None) - start_line = request.query_params.get('start_line', 0) - end_line = request.query_params.get('end_line', None) - dark_val = request.query_params.get('dark', '') - dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y')) - content_only = bool(request.accepted_renderer.format in ('api', 'json')) - dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val)) - content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line) - - # Remove any ANSI escape sequences containing job event data. - content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content) - - body = ansiconv.to_html(cgi.escape(content)) - - context = { - 'title': get_view_name(self.__class__), - 'body': mark_safe(body), - 'dark': dark_bg, - 'content_only': content_only, - } - data = render_to_string('api/stdout.html', context).strip() - - if request.accepted_renderer.format == 'api': - return Response(mark_safe(data)) - if request.accepted_renderer.format == 'json': - if content_encoding == 'base64' and content_format == 'ansi': - return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)}) - elif content_format == 'html': - return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body}) - return Response(data) - elif request.accepted_renderer.format == 'txt': - return Response(unified_job.result_stdout) - elif request.accepted_renderer.format == 'ansi': - return Response(unified_job.result_stdout_raw) - elif request.accepted_renderer.format in {'txt_download', 'ansi_download'}: - if not os.path.exists(unified_job.result_stdout_file): - write_fd = open(unified_job.result_stdout_file, 'w') - with connection.cursor() as cursor: - try: - tablename, related_name = { - Job: ('main_jobevent', 'job_id'), - AdHocCommand: ('main_adhoccommandevent', 'ad_hoc_command_id'), - }.get(unified_job.__class__, ('main_genericcommandevent', 'unified_job_id')) - cursor.copy_expert( - "copy (select stdout from {} where {}={} order by start_line) to stdout".format( - tablename, - related_name, - unified_job.id - ), - write_fd - ) - write_fd.close() - subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(unified_job.result_stdout_file), - shell=True).wait() - except Exception as e: - return Response({"error": _("Error generating stdout download file: {}".format(e))}) - try: - content_fd = open(unified_job.result_stdout_file, 'r') - if request.accepted_renderer.format == 'txt_download': - # For txt downloads, filter out ANSI escape sequences. - content_fd = StdoutANSIFilter(content_fd) - suffix = '' - else: - suffix = '_ansi' - response = HttpResponse(FileWrapper(content_fd), content_type='text/plain') - response["Content-Disposition"] = 'attachment; filename="job_%s%s.txt"' % (str(unified_job.id), suffix) - return response - except Exception as e: - return Response({"error": _("Error generating stdout download file: %s") % str(e)}, status=status.HTTP_400_BAD_REQUEST) - else: - return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs) - class ProjectUpdateStdout(UnifiedJobStdout): diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 265531443f..c4bd95efa7 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -445,7 +445,7 @@ class IsolatedManager(object): instance.hostname, instance.modified)) @staticmethod - def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'): + def get_stdout_handle(instance, private_data_dir, event_data_key='job_id'): dispatcher = CallbackQueueDispatcher() def job_event_callback(event_data): @@ -463,7 +463,7 @@ class IsolatedManager(object): event_data.get('event', ''), event_data['uuid'], instance.id, event_data)) dispatcher.dispatch(event_data) - return OutputEventFilter(stdout_handle, job_event_callback) + return OutputEventFilter(job_event_callback) def run(self, instance, host, private_data_dir, proot_temp_dir): """ diff --git a/awx/main/expect/run.py b/awx/main/expect/run.py index db269f2d09..496c7583e0 100755 --- a/awx/main/expect/run.py +++ b/awx/main/expect/run.py @@ -99,7 +99,6 @@ def run_pexpect(args, cwd, env, logfile, password_patterns = expect_passwords.keys() password_values = expect_passwords.values() - logfile_pos = logfile.tell() child = pexpect.spawn( args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, encoding='utf-8', echo=False, @@ -116,8 +115,6 @@ def run_pexpect(args, cwd, env, logfile, password = password_values[result_id] if password is not None: child.sendline(password) - if logfile_pos != logfile.tell(): - logfile_pos = logfile.tell() last_stdout_update = time.time() if cancelled_callback: try: diff --git a/awx/main/migrations/0018_add_additional_stdout_events.py b/awx/main/migrations/0018_add_additional_stdout_events.py index 39f39edf57..fd29c2676b 100644 --- a/awx/main/migrations/0018_add_additional_stdout_events.py +++ b/awx/main/migrations/0018_add_additional_stdout_events.py @@ -78,4 +78,8 @@ class Migration(migrations.Migration): 'ordering': ('-pk',), }, ), + migrations.RemoveField( + model_name='unifiedjob', + name='result_stdout_file', + ), ] diff --git a/awx/main/models/events.py b/awx/main/models/events.py index dedd0a8335..e864c10caf 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -326,6 +326,44 @@ class BasePlaybookEvent(CreatedModifiedModel): def job_verbosity(self): return 0 + def save(self, *args, **kwargs): + # If update_fields has been specified, add our field names to it, + # if it hasn't been specified, then we're just doing a normal save. + update_fields = kwargs.get('update_fields', []) + # Update model fields and related objects unless we're only updating + # failed/changed flags triggered from a child event. + from_parent_update = kwargs.pop('from_parent_update', False) + if not from_parent_update: + # Update model fields from event data. + updated_fields = self._update_from_event_data() + for field in updated_fields: + if field not in update_fields: + update_fields.append(field) + + # Update host related field from host_name. + if hasattr(self, 'job') and not self.host_id and self.host_name: + host_qs = self.job.inventory.hosts.filter(name=self.host_name) + host_id = host_qs.only('id').values_list('id', flat=True).first() + if host_id != self.host_id: + self.host_id = host_id + if 'host_id' not in update_fields: + update_fields.append('host_id') + super(BasePlaybookEvent, self).save(*args, **kwargs) + + # Update related objects after this event is saved. + if hasattr(self, 'job') and not from_parent_update: + if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False): + self._update_hosts() + if self.event == 'playbook_on_stats': + self._update_parents_failed_and_changed() + + hostnames = self._hostnames() + self._update_host_summary_from_stats(hostnames) + self.job.inventory.update_computed_fields() + + emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id)) + + class JobEvent(BasePlaybookEvent): ''' @@ -465,41 +503,6 @@ class JobEvent(BasePlaybookEvent): if update_fields: host_summary.save(update_fields=update_fields) - def save(self, *args, **kwargs): - # If update_fields has been specified, add our field names to it, - # if it hasn't been specified, then we're just doing a normal save. - update_fields = kwargs.get('update_fields', []) - # Update model fields and related objects unless we're only updating - # failed/changed flags triggered from a child event. - from_parent_update = kwargs.pop('from_parent_update', False) - if not from_parent_update: - # Update model fields from event data. - updated_fields = self._update_from_event_data() - for field in updated_fields: - if field not in update_fields: - update_fields.append(field) - # Update host related field from host_name. - if not self.host_id and self.host_name: - host_qs = self.job.inventory.hosts.filter(name=self.host_name) - host_id = host_qs.only('id').values_list('id', flat=True).first() - if host_id != self.host_id: - self.host_id = host_id - if 'host_id' not in update_fields: - update_fields.append('host_id') - super(JobEvent, self).save(*args, **kwargs) - # Update related objects after this event is saved. - if not from_parent_update: - if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False): - self._update_hosts() - if self.event == 'playbook_on_stats': - self._update_parents_failed_and_changed() - - hostnames = self._hostnames() - self._update_host_summary_from_stats(hostnames) - self.job.inventory.update_computed_fields() - - emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id)) - @property def job_verbosity(self): return self.job.verbosity @@ -602,7 +605,7 @@ class BaseCommandEvent(CreatedModifiedModel): class AdHocCommandEvent(BaseCommandEvent): - VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command', 'event'] + VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event'] class Meta: app_label = 'main' diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 75e5e8dd05..b9a97e0ab2 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -2,14 +2,13 @@ # All Rights Reserved. # Python -import codecs import json import logging -import re import os -import os.path +import re +import subprocess +import tempfile from collections import OrderedDict -from StringIO import StringIO # Django from django.conf import settings @@ -42,7 +41,7 @@ from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.consumers import emit_channel_notification from awx.main.fields import JSONField, AskForField -__all__ = ['UnifiedJobTemplate', 'UnifiedJob'] +__all__ = ['UnifiedJobTemplate', 'UnifiedJob', 'StdoutMaxBytesExceeded'] logger = logging.getLogger('awx.main.models.unified_jobs') @@ -514,6 +513,13 @@ class UnifiedJobDeprecatedStdout(models.Model): ) +class StdoutMaxBytesExceeded(Exception): + + def __init__(self, total, supported): + self.total = total + self.supported = supported + + class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): ''' Concrete base class for unified job run by the task engine. @@ -642,11 +648,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique default='', editable=False, )) - result_stdout_file = models.TextField( # FilePathfield? - blank=True, - default='', - editable=False, - ) result_traceback = models.TextField( blank=True, default='', @@ -822,14 +823,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # Done. return result - def delete(self): - if self.result_stdout_file != "": - try: - os.remove(self.result_stdout_file) - except Exception: - pass - super(UnifiedJob, self).delete() - def copy_unified_job(self, limit=None): ''' Returns saved object, including related fields. @@ -912,36 +905,48 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique related.result_stdout_text = value related.save() - def result_stdout_raw_handle(self, attempt=0): + def result_stdout_raw_handle(self, enforce_max_bytes=True): """Return a file-like object containing the standard out of the job's result. """ - msg = { - 'pending': 'Waiting for results...', - 'missing': 'stdout capture is missing', - } - if self.result_stdout_text: - return StringIO(self.result_stdout_text) + if not os.path.exists(settings.JOBOUTPUT_ROOT): + os.makedirs(settings.JOBOUTPUT_ROOT) + fd = tempfile.NamedTemporaryFile( + prefix='{}-{}-'.format(self.model_to_str(), self.pk), + suffix='.out', + dir=settings.JOBOUTPUT_ROOT + ) + legacy_stdout_text = self.result_stdout_text + if legacy_stdout_text: + fd.write(legacy_stdout_text) + fd.flush() else: - if not os.path.exists(self.result_stdout_file) or os.stat(self.result_stdout_file).st_size < 1: - return StringIO(msg['missing' if self.finished else 'pending']) + with connection.cursor() as cursor: + tablename = self._meta.db_table + related_name = { + 'main_job': 'job_id', + 'main_adhoccommand': 'ad_hoc_command_id', + 'main_projectupdate': 'project_update_id', + 'main_inventoryupdate': 'inventory_update_id', + 'main_systemjob': 'system_job_id', + }[tablename] + cursor.copy_expert( + "copy (select stdout from {} where {}={} order by start_line) to stdout".format( + tablename + 'event', + related_name, + self.id + ), + fd + ) + fd.flush() + subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(fd.name), shell=True).wait() - # There is a potential timing issue here, because another - # process may be deleting the stdout file after it is written - # to the database. - # - # Therefore, if we get an IOError (which generally means the - # file does not exist), reload info from the database and - # try again. - try: - return codecs.open(self.result_stdout_file, "r", - encoding='utf-8') - except IOError: - if attempt < 3: - self.result_stdout_text = type(self).objects.get(id=self.id).result_stdout_text - return self.result_stdout_raw_handle(attempt=attempt + 1) - else: - return StringIO(msg['missing' if self.finished else 'pending']) + if enforce_max_bytes: + total_size = os.stat(fd.name).st_size + max_supported = settings.STDOUT_MAX_BYTES_DISPLAY + if total_size > max_supported: + raise StdoutMaxBytesExceeded(total_size, max_supported) + return open(fd.name, 'r') def _escape_ascii(self, content): # Remove ANSI escape sequences used to embed event data. @@ -966,13 +971,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def result_stdout(self): return self._result_stdout_raw(escape_ascii=True) - @property - def result_stdout_size(self): - try: - return os.stat(self.result_stdout_file).st_size - except Exception: - return len(self.result_stdout) - def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False): return_buffer = u"" if end_line is not None: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c9bc0d1c4d..d744fc7cae 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import codecs from collections import OrderedDict import ConfigParser import cStringIO @@ -17,7 +16,6 @@ import tempfile import time import traceback import urlparse -import uuid from distutils.version import LooseVersion as Version import yaml import fcntl @@ -735,17 +733,8 @@ class BaseTask(LogErrorsTask): def get_stdout_handle(self, instance): ''' - Return an open file object for capturing stdout. + Return an virtual file object for capturing stdout and events. ''' - if not os.path.exists(settings.JOBOUTPUT_ROOT): - os.makedirs(settings.JOBOUTPUT_ROOT) - stdout_filename = os.path.join( - settings.JOBOUTPUT_ROOT, - "%d-%s.out" % (instance.pk, str(uuid.uuid1())) - ) - stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') - assert stdout_handle.name == stdout_filename - dispatcher = CallbackQueueDispatcher() def event_callback(event_data): @@ -756,7 +745,7 @@ class BaseTask(LogErrorsTask): event_data.update(cache_event) dispatcher.dispatch(event_data) - return OutputEventFilter(stdout_handle, event_callback) + return OutputEventFilter(event_callback) def pre_run_hook(self, instance, **kwargs): ''' @@ -838,10 +827,8 @@ class BaseTask(LogErrorsTask): if isolated_host is None: stdout_handle = self.get_stdout_handle(instance) else: - base_handle = super(self.__class__, self).get_stdout_handle(instance) - stdout_handle = isolated_manager.IsolatedManager.wrap_stdout_handle( - instance, kwargs['private_data_dir'], base_handle, - event_data_key=self.event_data_key) + stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle( + instance, kwargs['private_data_dir'], event_data_key=self.event_data_key) if self.should_use_proot(instance, **kwargs): if not check_proot_installed(): raise RuntimeError('bubblewrap is not installed') @@ -858,7 +845,7 @@ class BaseTask(LogErrorsTask): args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) instance = self.update_model(pk, job_args=json.dumps(safe_args), - job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name) + job_cwd=cwd, job_env=safe_env) expect_passwords = {} for k, v in self.get_password_prompts(**kwargs).items(): diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 9749c3fc65..2bd36349d7 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -31,6 +31,7 @@ from awx.main.models import ( ) from awx.main import tasks +from awx.main.queue import CallbackQueueDispatcher from awx.main.utils import encrypt_field, encrypt_value @@ -199,6 +200,7 @@ class TestJobExecution: self.run_pexpect.return_value = ['successful', 0] self.patches = [ + mock.patch.object(CallbackQueueDispatcher, 'dispatch', lambda obj: None), mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path), # don't emit websocket statuses; they use the DB and complicate testing mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()), diff --git a/awx/main/tests/unit/test_unified_jobs.py b/awx/main/tests/unit/test_unified_jobs.py deleted file mode 100644 index 659eed63a5..0000000000 --- a/awx/main/tests/unit/test_unified_jobs.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import mock -from mock import Mock -from StringIO import StringIO -from django.utils.timezone import now - -# AWX -from awx.main import models - - -# stdout file present -@mock.patch('os.path.exists', return_value=True) -@mock.patch('codecs.open', return_value='my_file_handler') -@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '') -def test_result_stdout_raw_handle_file__found(exists, open): - unified_job = models.UnifiedJob() - - with mock.patch('os.stat', return_value=Mock(st_size=1)): - result = unified_job.result_stdout_raw_handle() - - assert result == 'my_file_handler' - - -# stdout file missing, job finished -@mock.patch('os.path.exists', return_value=False) -@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '') -def test_result_stdout_raw_handle__missing(exists): - unified_job = models.UnifiedJob() - unified_job.result_stdout_file = 'dummy' - unified_job.finished = now() - - result = unified_job.result_stdout_raw_handle() - - assert isinstance(result, StringIO) - assert result.read() == 'stdout capture is missing' - - -# stdout file missing, job not finished -@mock.patch('os.path.exists', return_value=False) -@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '') -def test_result_stdout_raw_handle__pending(exists): - unified_job = models.UnifiedJob() - unified_job.result_stdout_file = 'dummy' - unified_job.finished = None - - result = unified_job.result_stdout_raw_handle() - - assert isinstance(result, StringIO) - assert result.read() == 'Waiting for results...' diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py index d0b2b890aa..aa5a210fcb 100644 --- a/awx/main/tests/unit/utils/test_event_filter.py +++ b/awx/main/tests/unit/utils/test_event_filter.py @@ -1,4 +1,3 @@ -import cStringIO import pytest import base64 import json @@ -33,8 +32,7 @@ def fake_cache(): @pytest.fixture def wrapped_handle(job_event_callback): # Preliminary creation of resources usually done in tasks.py - stdout_handle = cStringIO.StringIO() - return OutputEventFilter(stdout_handle, job_event_callback) + return OutputEventFilter(job_event_callback) @pytest.fixture @@ -80,15 +78,6 @@ def test_separate_verbose_events(fake_callback, wrapped_handle): assert event_data['event'] == 'verbose' -def test_verbose_event_no_markings(fake_callback, wrapped_handle): - ''' - This occurs with jobs that do not have events but still generate - and output stream, like system jobs - ''' - wrapped_handle.write('Running tower-manage command \n') - assert wrapped_handle._fileobj.getvalue() == 'Running tower-manage command \n' - - def test_large_data_payload(fake_callback, fake_cache, wrapped_handle): # Pretend that this is done by the Ansible callback module fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index b4cea8d4b7..3c38ec4b0f 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -845,25 +845,23 @@ class OutputEventFilter(object): EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K') - def __init__(self, fileobj=None, event_callback=None, raw_callback=None): - self._fileobj = fileobj + def __init__(self, event_callback): self._event_callback = event_callback self._event_ct = 0 - self._raw_callback = raw_callback self._counter = 1 self._start_line = 0 self._buffer = '' self._current_event_data = None - def __getattr__(self, attr): - return getattr(self._fileobj, attr) + def flush(self): + # pexpect wants to flush the file it writes to, but we're not + # actually capturing stdout to a raw file; we're just + # implementing a custom `write` method to discover and emit events from + # the stdout stream + pass def write(self, data): - if self._fileobj: - self._fileobj.write(data) self._buffer += data - if self._raw_callback: - self._raw_callback(data) while True: match = self.EVENT_DATA_RE.search(self._buffer) if not match: @@ -877,8 +875,6 @@ class OutputEventFilter(object): self._buffer = self._buffer[match.end():] def close(self): - if self._fileobj: - self._fileobj.close() if self._buffer: self._emit_event(self._buffer) self._buffer = ''