change stdout composition to generate from job events on the fly

this approach totally removes the process of reading and writing stdout
files on the local file system at settings.JOBOUTPUT_ROOT when jobs are
run; now stdout content is only written on-demand as it's fetched for
the deprecated `stdout` endpoint

see: https://github.com/ansible/awx/issues/200
This commit is contained in:
Ryan Petrello
2017-12-14 14:40:13 -05:00
parent fc94b3a943
commit 0b30e7907b
12 changed files with 179 additions and 290 deletions

View File

@@ -614,14 +614,12 @@ class UnifiedJobTemplateSerializer(BaseSerializer):
class UnifiedJobSerializer(BaseSerializer): class UnifiedJobSerializer(BaseSerializer):
show_capabilities = ['start', 'delete'] show_capabilities = ['start', 'delete']
result_stdout = serializers.SerializerMethodField()
class Meta: class Meta:
model = UnifiedJob model = UnifiedJob
fields = ('*', 'unified_job_template', 'launch_type', 'status', fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args', 'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'result_stdout', 'job_cwd', 'job_env', 'job_explanation', 'execution_node',
'execution_node', 'result_traceback') 'result_traceback')
extra_kwargs = { extra_kwargs = {
'unified_job_template': { 'unified_job_template': {
'source': 'unified_job_template_id', 'source': 'unified_job_template_id',
@@ -702,25 +700,17 @@ class UnifiedJobSerializer(BaseSerializer):
return ret return ret
def get_result_stdout(self, obj):
obj_size = obj.result_stdout_size
if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY:
return _("Standard Output too large to display (%(text_size)d bytes), "
"only download supported for sizes over %(supported_size)d bytes") % {
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY}
return obj.result_stdout
class UnifiedJobListSerializer(UnifiedJobSerializer): class UnifiedJobListSerializer(UnifiedJobSerializer):
class Meta: class Meta:
fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback', '-result_stdout') fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback')
def get_field_names(self, declared_fields, info): def get_field_names(self, declared_fields, info):
field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info) field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info)
# Meta multiple inheritance and -field_name options don't seem to be # Meta multiple inheritance and -field_name options don't seem to be
# taking effect above, so remove the undesired fields here. # taking effect above, so remove the undesired fields here.
return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback', 'result_stdout')) return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback'))
def get_types(self): def get_types(self):
if type(self) is UnifiedJobListSerializer: if type(self) is UnifiedJobListSerializer:
@@ -760,14 +750,6 @@ class UnifiedJobStdoutSerializer(UnifiedJobSerializer):
class Meta: class Meta:
fields = ('result_stdout',) fields = ('result_stdout',)
def get_result_stdout(self, obj):
obj_size = obj.result_stdout_size
if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY:
return _("Standard Output too large to display (%(text_size)d bytes), "
"only download supported for sizes over %(supported_size)d bytes") % {
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY}
return obj.result_stdout
def get_types(self): def get_types(self):
if type(self) is UnifiedJobStdoutSerializer: if type(self) is UnifiedJobStdoutSerializer:
return ['project_update', 'inventory_update', 'job', 'ad_hoc_command', 'system_job'] return ['project_update', 'inventory_update', 'job', 'ad_hoc_command', 'system_job']
@@ -2966,9 +2948,11 @@ class SystemJobTemplateSerializer(UnifiedJobTemplateSerializer):
class SystemJobSerializer(UnifiedJobSerializer): class SystemJobSerializer(UnifiedJobSerializer):
result_stdout = serializers.SerializerMethodField()
class Meta: class Meta:
model = SystemJob model = SystemJob
fields = ('*', 'system_job_template', 'job_type', 'extra_vars') fields = ('*', 'system_job_template', 'job_type', 'extra_vars', 'result_stdout')
def get_related(self, obj): def get_related(self, obj):
res = super(SystemJobSerializer, self).get_related(obj) res = super(SystemJobSerializer, self).get_related(obj)
@@ -2980,6 +2964,9 @@ class SystemJobSerializer(UnifiedJobSerializer):
res['cancel'] = self.reverse('api:system_job_cancel', kwargs={'pk': obj.pk}) res['cancel'] = self.reverse('api:system_job_cancel', kwargs={'pk': obj.pk})
return res return res
def get_result_stdout(self, obj):
return obj.result_stdout
class SystemJobCancelSerializer(SystemJobSerializer): class SystemJobCancelSerializer(SystemJobSerializer):

View File

@@ -2,13 +2,11 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import os
import re import re
import cgi import cgi
import dateutil import dateutil
import time import time
import socket import socket
import subprocess
import sys import sys
import logging import logging
import requests import requests
@@ -20,7 +18,7 @@ import six
from django.conf import settings from django.conf import settings
from django.core.exceptions import FieldError from django.core.exceptions import FieldError
from django.db.models import Q, Count, F from django.db.models import Q, Count, F
from django.db import IntegrityError, transaction, connection from django.db import IntegrityError, transaction
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.utils.encoding import smart_text, force_text from django.utils.encoding import smart_text, force_text
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
@@ -4498,7 +4496,7 @@ class StdoutANSIFilter(object):
def __init__(self, fileobj): def __init__(self, fileobj):
self.fileobj = fileobj self.fileobj = fileobj
self.extra_data = '' self.extra_data = ''
if hasattr(fileobj,'close'): if hasattr(fileobj, 'close'):
self.close = fileobj.close self.close = fileobj.close
def read(self, size=-1): def read(self, size=-1):
@@ -4529,93 +4527,73 @@ class UnifiedJobStdout(RetrieveAPIView):
renderers.JSONRenderer, DownloadTextRenderer, AnsiDownloadRenderer] renderers.JSONRenderer, DownloadTextRenderer, AnsiDownloadRenderer]
filter_backends = () filter_backends = ()
new_in_148 = True new_in_148 = True
deprecated = True
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
unified_job = self.get_object() unified_job = self.get_object()
obj_size = unified_job.result_stdout_size try:
if request.accepted_renderer.format not in {'txt_download', 'ansi_download'} and obj_size > settings.STDOUT_MAX_BYTES_DISPLAY: target_format = request.accepted_renderer.format
response_message = _("Standard Output too large to display (%(text_size)d bytes), " if target_format in ('html', 'api', 'json'):
"only download supported for sizes over %(supported_size)d bytes") % { content_format = request.query_params.get('content_format', 'html')
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY} content_encoding = request.query_params.get('content_encoding', None)
start_line = request.query_params.get('start_line', 0)
end_line = request.query_params.get('end_line', None)
dark_val = request.query_params.get('dark', '')
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
content_only = bool(target_format in ('api', 'json'))
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content))
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),
'dark': dark_bg,
'content_only': content_only,
}
data = render_to_string('api/stdout.html', context).strip()
if target_format == 'api':
return Response(mark_safe(data))
if target_format == 'json':
if content_encoding == 'base64' and content_format == 'ansi':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)})
elif content_format == 'html':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body})
return Response(data)
elif target_format == 'txt':
return Response(unified_job.result_stdout)
elif target_format == 'ansi':
return Response(unified_job.result_stdout_raw)
elif target_format in {'txt_download', 'ansi_download'}:
filename = '{type}_{pk}{suffix}.txt'.format(
type=camelcase_to_underscore(unified_job.__class__.__name__),
pk=unified_job.id,
suffix='.ansi' if target_format == 'ansi_download' else ''
)
content_fd = unified_job.result_stdout_raw_handle(enforce_max_bytes=False)
if target_format == 'txt_download':
content_fd = StdoutANSIFilter(content_fd)
response = HttpResponse(FileWrapper(content_fd), content_type='text/plain')
response["Content-Disposition"] = 'attachment; filename="{}"'.format(filename)
return response
else:
return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs)
except StdoutMaxBytesExceeded as e:
response_message = _(
"Standard Output too large to display {text_size} bytes), "
"only download supported for sizes over {supported_size} bytes").format(
text_size=e.total, supported_size=e.supported
)
if request.accepted_renderer.format == 'json': if request.accepted_renderer.format == 'json':
return Response({'range': {'start': 0, 'end': 1, 'absolute_end': 1}, 'content': response_message}) return Response({'range': {'start': 0, 'end': 1, 'absolute_end': 1}, 'content': response_message})
else: else:
return Response(response_message) return Response(response_message)
if request.accepted_renderer.format in ('html', 'api', 'json'):
content_format = request.query_params.get('content_format', 'html')
content_encoding = request.query_params.get('content_encoding', None)
start_line = request.query_params.get('start_line', 0)
end_line = request.query_params.get('end_line', None)
dark_val = request.query_params.get('dark', '')
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
content_only = bool(request.accepted_renderer.format in ('api', 'json'))
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content))
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),
'dark': dark_bg,
'content_only': content_only,
}
data = render_to_string('api/stdout.html', context).strip()
if request.accepted_renderer.format == 'api':
return Response(mark_safe(data))
if request.accepted_renderer.format == 'json':
if content_encoding == 'base64' and content_format == 'ansi':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)})
elif content_format == 'html':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body})
return Response(data)
elif request.accepted_renderer.format == 'txt':
return Response(unified_job.result_stdout)
elif request.accepted_renderer.format == 'ansi':
return Response(unified_job.result_stdout_raw)
elif request.accepted_renderer.format in {'txt_download', 'ansi_download'}:
if not os.path.exists(unified_job.result_stdout_file):
write_fd = open(unified_job.result_stdout_file, 'w')
with connection.cursor() as cursor:
try:
tablename, related_name = {
Job: ('main_jobevent', 'job_id'),
AdHocCommand: ('main_adhoccommandevent', 'ad_hoc_command_id'),
}.get(unified_job.__class__, ('main_genericcommandevent', 'unified_job_id'))
cursor.copy_expert(
"copy (select stdout from {} where {}={} order by start_line) to stdout".format(
tablename,
related_name,
unified_job.id
),
write_fd
)
write_fd.close()
subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(unified_job.result_stdout_file),
shell=True).wait()
except Exception as e:
return Response({"error": _("Error generating stdout download file: {}".format(e))})
try:
content_fd = open(unified_job.result_stdout_file, 'r')
if request.accepted_renderer.format == 'txt_download':
# For txt downloads, filter out ANSI escape sequences.
content_fd = StdoutANSIFilter(content_fd)
suffix = ''
else:
suffix = '_ansi'
response = HttpResponse(FileWrapper(content_fd), content_type='text/plain')
response["Content-Disposition"] = 'attachment; filename="job_%s%s.txt"' % (str(unified_job.id), suffix)
return response
except Exception as e:
return Response({"error": _("Error generating stdout download file: %s") % str(e)}, status=status.HTTP_400_BAD_REQUEST)
else:
return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs)
class ProjectUpdateStdout(UnifiedJobStdout): class ProjectUpdateStdout(UnifiedJobStdout):

View File

@@ -445,7 +445,7 @@ class IsolatedManager(object):
instance.hostname, instance.modified)) instance.hostname, instance.modified))
@staticmethod @staticmethod
def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'): def get_stdout_handle(instance, private_data_dir, event_data_key='job_id'):
dispatcher = CallbackQueueDispatcher() dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data): def job_event_callback(event_data):
@@ -463,7 +463,7 @@ class IsolatedManager(object):
event_data.get('event', ''), event_data['uuid'], instance.id, event_data)) event_data.get('event', ''), event_data['uuid'], instance.id, event_data))
dispatcher.dispatch(event_data) dispatcher.dispatch(event_data)
return OutputEventFilter(stdout_handle, job_event_callback) return OutputEventFilter(job_event_callback)
def run(self, instance, host, private_data_dir, proot_temp_dir): def run(self, instance, host, private_data_dir, proot_temp_dir):
""" """

View File

@@ -99,7 +99,6 @@ def run_pexpect(args, cwd, env, logfile,
password_patterns = expect_passwords.keys() password_patterns = expect_passwords.keys()
password_values = expect_passwords.values() password_values = expect_passwords.values()
logfile_pos = logfile.tell()
child = pexpect.spawn( child = pexpect.spawn(
args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True, args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True,
encoding='utf-8', echo=False, encoding='utf-8', echo=False,
@@ -116,8 +115,6 @@ def run_pexpect(args, cwd, env, logfile,
password = password_values[result_id] password = password_values[result_id]
if password is not None: if password is not None:
child.sendline(password) child.sendline(password)
if logfile_pos != logfile.tell():
logfile_pos = logfile.tell()
last_stdout_update = time.time() last_stdout_update = time.time()
if cancelled_callback: if cancelled_callback:
try: try:

View File

@@ -78,4 +78,8 @@ class Migration(migrations.Migration):
'ordering': ('-pk',), 'ordering': ('-pk',),
}, },
), ),
migrations.RemoveField(
model_name='unifiedjob',
name='result_stdout_file',
),
] ]

View File

@@ -326,6 +326,44 @@ class BasePlaybookEvent(CreatedModifiedModel):
def job_verbosity(self): def job_verbosity(self):
return 0 return 0
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if hasattr(self, 'job') and not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(BasePlaybookEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if hasattr(self, 'job') and not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id))
class JobEvent(BasePlaybookEvent): class JobEvent(BasePlaybookEvent):
''' '''
@@ -465,41 +503,6 @@ class JobEvent(BasePlaybookEvent):
if update_fields: if update_fields:
host_summary.save(update_fields=update_fields) host_summary.save(update_fields=update_fields)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id))
@property @property
def job_verbosity(self): def job_verbosity(self):
return self.job.verbosity return self.job.verbosity
@@ -602,7 +605,7 @@ class BaseCommandEvent(CreatedModifiedModel):
class AdHocCommandEvent(BaseCommandEvent): class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command', 'event'] VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event']
class Meta: class Meta:
app_label = 'main' app_label = 'main'

View File

@@ -2,14 +2,13 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import codecs
import json import json
import logging import logging
import re
import os import os
import os.path import re
import subprocess
import tempfile
from collections import OrderedDict from collections import OrderedDict
from StringIO import StringIO
# Django # Django
from django.conf import settings from django.conf import settings
@@ -42,7 +41,7 @@ from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
from awx.main.fields import JSONField, AskForField from awx.main.fields import JSONField, AskForField
__all__ = ['UnifiedJobTemplate', 'UnifiedJob'] __all__ = ['UnifiedJobTemplate', 'UnifiedJob', 'StdoutMaxBytesExceeded']
logger = logging.getLogger('awx.main.models.unified_jobs') logger = logging.getLogger('awx.main.models.unified_jobs')
@@ -514,6 +513,13 @@ class UnifiedJobDeprecatedStdout(models.Model):
) )
class StdoutMaxBytesExceeded(Exception):
def __init__(self, total, supported):
self.total = total
self.supported = supported
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
''' '''
Concrete base class for unified job run by the task engine. Concrete base class for unified job run by the task engine.
@@ -642,11 +648,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
default='', default='',
editable=False, editable=False,
)) ))
result_stdout_file = models.TextField( # FilePathfield?
blank=True,
default='',
editable=False,
)
result_traceback = models.TextField( result_traceback = models.TextField(
blank=True, blank=True,
default='', default='',
@@ -822,14 +823,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# Done. # Done.
return result return result
def delete(self):
if self.result_stdout_file != "":
try:
os.remove(self.result_stdout_file)
except Exception:
pass
super(UnifiedJob, self).delete()
def copy_unified_job(self, limit=None): def copy_unified_job(self, limit=None):
''' '''
Returns saved object, including related fields. Returns saved object, including related fields.
@@ -912,36 +905,48 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
related.result_stdout_text = value related.result_stdout_text = value
related.save() related.save()
def result_stdout_raw_handle(self, attempt=0): def result_stdout_raw_handle(self, enforce_max_bytes=True):
"""Return a file-like object containing the standard out of the """Return a file-like object containing the standard out of the
job's result. job's result.
""" """
msg = { if not os.path.exists(settings.JOBOUTPUT_ROOT):
'pending': 'Waiting for results...', os.makedirs(settings.JOBOUTPUT_ROOT)
'missing': 'stdout capture is missing', fd = tempfile.NamedTemporaryFile(
} prefix='{}-{}-'.format(self.model_to_str(), self.pk),
if self.result_stdout_text: suffix='.out',
return StringIO(self.result_stdout_text) dir=settings.JOBOUTPUT_ROOT
)
legacy_stdout_text = self.result_stdout_text
if legacy_stdout_text:
fd.write(legacy_stdout_text)
fd.flush()
else: else:
if not os.path.exists(self.result_stdout_file) or os.stat(self.result_stdout_file).st_size < 1: with connection.cursor() as cursor:
return StringIO(msg['missing' if self.finished else 'pending']) tablename = self._meta.db_table
related_name = {
'main_job': 'job_id',
'main_adhoccommand': 'ad_hoc_command_id',
'main_projectupdate': 'project_update_id',
'main_inventoryupdate': 'inventory_update_id',
'main_systemjob': 'system_job_id',
}[tablename]
cursor.copy_expert(
"copy (select stdout from {} where {}={} order by start_line) to stdout".format(
tablename + 'event',
related_name,
self.id
),
fd
)
fd.flush()
subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(fd.name), shell=True).wait()
# There is a potential timing issue here, because another if enforce_max_bytes:
# process may be deleting the stdout file after it is written total_size = os.stat(fd.name).st_size
# to the database. max_supported = settings.STDOUT_MAX_BYTES_DISPLAY
# if total_size > max_supported:
# Therefore, if we get an IOError (which generally means the raise StdoutMaxBytesExceeded(total_size, max_supported)
# file does not exist), reload info from the database and return open(fd.name, 'r')
# try again.
try:
return codecs.open(self.result_stdout_file, "r",
encoding='utf-8')
except IOError:
if attempt < 3:
self.result_stdout_text = type(self).objects.get(id=self.id).result_stdout_text
return self.result_stdout_raw_handle(attempt=attempt + 1)
else:
return StringIO(msg['missing' if self.finished else 'pending'])
def _escape_ascii(self, content): def _escape_ascii(self, content):
# Remove ANSI escape sequences used to embed event data. # Remove ANSI escape sequences used to embed event data.
@@ -966,13 +971,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def result_stdout(self): def result_stdout(self):
return self._result_stdout_raw(escape_ascii=True) return self._result_stdout_raw(escape_ascii=True)
@property
def result_stdout_size(self):
try:
return os.stat(self.result_stdout_file).st_size
except Exception:
return len(self.result_stdout)
def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False): def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False):
return_buffer = u"" return_buffer = u""
if end_line is not None: if end_line is not None:

View File

@@ -2,7 +2,6 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import codecs
from collections import OrderedDict from collections import OrderedDict
import ConfigParser import ConfigParser
import cStringIO import cStringIO
@@ -17,7 +16,6 @@ import tempfile
import time import time
import traceback import traceback
import urlparse import urlparse
import uuid
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
import yaml import yaml
import fcntl import fcntl
@@ -735,17 +733,8 @@ class BaseTask(LogErrorsTask):
def get_stdout_handle(self, instance): def get_stdout_handle(self, instance):
''' '''
Return an open file object for capturing stdout. Return an virtual file object for capturing stdout and events.
''' '''
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(
settings.JOBOUTPUT_ROOT,
"%d-%s.out" % (instance.pk, str(uuid.uuid1()))
)
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
assert stdout_handle.name == stdout_filename
dispatcher = CallbackQueueDispatcher() dispatcher = CallbackQueueDispatcher()
def event_callback(event_data): def event_callback(event_data):
@@ -756,7 +745,7 @@ class BaseTask(LogErrorsTask):
event_data.update(cache_event) event_data.update(cache_event)
dispatcher.dispatch(event_data) dispatcher.dispatch(event_data)
return OutputEventFilter(stdout_handle, event_callback) return OutputEventFilter(event_callback)
def pre_run_hook(self, instance, **kwargs): def pre_run_hook(self, instance, **kwargs):
''' '''
@@ -838,10 +827,8 @@ class BaseTask(LogErrorsTask):
if isolated_host is None: if isolated_host is None:
stdout_handle = self.get_stdout_handle(instance) stdout_handle = self.get_stdout_handle(instance)
else: else:
base_handle = super(self.__class__, self).get_stdout_handle(instance) stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle(
stdout_handle = isolated_manager.IsolatedManager.wrap_stdout_handle( instance, kwargs['private_data_dir'], event_data_key=self.event_data_key)
instance, kwargs['private_data_dir'], base_handle,
event_data_key=self.event_data_key)
if self.should_use_proot(instance, **kwargs): if self.should_use_proot(instance, **kwargs):
if not check_proot_installed(): if not check_proot_installed():
raise RuntimeError('bubblewrap is not installed') raise RuntimeError('bubblewrap is not installed')
@@ -858,7 +845,7 @@ class BaseTask(LogErrorsTask):
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args), instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name) job_cwd=cwd, job_env=safe_env)
expect_passwords = {} expect_passwords = {}
for k, v in self.get_password_prompts(**kwargs).items(): for k, v in self.get_password_prompts(**kwargs).items():

View File

@@ -31,6 +31,7 @@ from awx.main.models import (
) )
from awx.main import tasks from awx.main import tasks
from awx.main.queue import CallbackQueueDispatcher
from awx.main.utils import encrypt_field, encrypt_value from awx.main.utils import encrypt_field, encrypt_value
@@ -199,6 +200,7 @@ class TestJobExecution:
self.run_pexpect.return_value = ['successful', 0] self.run_pexpect.return_value = ['successful', 0]
self.patches = [ self.patches = [
mock.patch.object(CallbackQueueDispatcher, 'dispatch', lambda obj: None),
mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path), mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path),
# don't emit websocket statuses; they use the DB and complicate testing # don't emit websocket statuses; they use the DB and complicate testing
mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()), mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()),

View File

@@ -1,52 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import mock
from mock import Mock
from StringIO import StringIO
from django.utils.timezone import now
# AWX
from awx.main import models
# stdout file present
@mock.patch('os.path.exists', return_value=True)
@mock.patch('codecs.open', return_value='my_file_handler')
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle_file__found(exists, open):
unified_job = models.UnifiedJob()
with mock.patch('os.stat', return_value=Mock(st_size=1)):
result = unified_job.result_stdout_raw_handle()
assert result == 'my_file_handler'
# stdout file missing, job finished
@mock.patch('os.path.exists', return_value=False)
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle__missing(exists):
unified_job = models.UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = now()
result = unified_job.result_stdout_raw_handle()
assert isinstance(result, StringIO)
assert result.read() == 'stdout capture is missing'
# stdout file missing, job not finished
@mock.patch('os.path.exists', return_value=False)
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle__pending(exists):
unified_job = models.UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = None
result = unified_job.result_stdout_raw_handle()
assert isinstance(result, StringIO)
assert result.read() == 'Waiting for results...'

View File

@@ -1,4 +1,3 @@
import cStringIO
import pytest import pytest
import base64 import base64
import json import json
@@ -33,8 +32,7 @@ def fake_cache():
@pytest.fixture @pytest.fixture
def wrapped_handle(job_event_callback): def wrapped_handle(job_event_callback):
# Preliminary creation of resources usually done in tasks.py # Preliminary creation of resources usually done in tasks.py
stdout_handle = cStringIO.StringIO() return OutputEventFilter(job_event_callback)
return OutputEventFilter(stdout_handle, job_event_callback)
@pytest.fixture @pytest.fixture
@@ -80,15 +78,6 @@ def test_separate_verbose_events(fake_callback, wrapped_handle):
assert event_data['event'] == 'verbose' assert event_data['event'] == 'verbose'
def test_verbose_event_no_markings(fake_callback, wrapped_handle):
'''
This occurs with jobs that do not have events but still generate
and output stream, like system jobs
'''
wrapped_handle.write('Running tower-manage command \n')
assert wrapped_handle._fileobj.getvalue() == 'Running tower-manage command \n'
def test_large_data_payload(fake_callback, fake_cache, wrapped_handle): def test_large_data_payload(fake_callback, fake_cache, wrapped_handle):
# Pretend that this is done by the Ansible callback module # Pretend that this is done by the Ansible callback module
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}

View File

@@ -845,25 +845,23 @@ class OutputEventFilter(object):
EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K') EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
def __init__(self, fileobj=None, event_callback=None, raw_callback=None): def __init__(self, event_callback):
self._fileobj = fileobj
self._event_callback = event_callback self._event_callback = event_callback
self._event_ct = 0 self._event_ct = 0
self._raw_callback = raw_callback
self._counter = 1 self._counter = 1
self._start_line = 0 self._start_line = 0
self._buffer = '' self._buffer = ''
self._current_event_data = None self._current_event_data = None
def __getattr__(self, attr): def flush(self):
return getattr(self._fileobj, attr) # pexpect wants to flush the file it writes to, but we're not
# actually capturing stdout to a raw file; we're just
# implementing a custom `write` method to discover and emit events from
# the stdout stream
pass
def write(self, data): def write(self, data):
if self._fileobj:
self._fileobj.write(data)
self._buffer += data self._buffer += data
if self._raw_callback:
self._raw_callback(data)
while True: while True:
match = self.EVENT_DATA_RE.search(self._buffer) match = self.EVENT_DATA_RE.search(self._buffer)
if not match: if not match:
@@ -877,8 +875,6 @@ class OutputEventFilter(object):
self._buffer = self._buffer[match.end():] self._buffer = self._buffer[match.end():]
def close(self): def close(self):
if self._fileobj:
self._fileobj.close()
if self._buffer: if self._buffer:
self._emit_event(self._buffer) self._emit_event(self._buffer)
self._buffer = '' self._buffer = ''