Merge pull request #833 from ryanpetrello/stdout-events

generalize stdout event processing to emit events for all job types
This commit is contained in:
Ryan Petrello 2018-01-04 11:28:52 -05:00 committed by GitHub
commit 6d413bd412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1768 additions and 1023 deletions

View File

@ -614,14 +614,12 @@ class UnifiedJobTemplateSerializer(BaseSerializer):
class UnifiedJobSerializer(BaseSerializer):
show_capabilities = ['start', 'delete']
result_stdout = serializers.SerializerMethodField()
class Meta:
model = UnifiedJob
fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'result_stdout',
'execution_node', 'result_traceback')
'job_cwd', 'job_env', 'job_explanation', 'execution_node',
'result_traceback')
extra_kwargs = {
'unified_job_template': {
'source': 'unified_job_template_id',
@ -702,25 +700,17 @@ class UnifiedJobSerializer(BaseSerializer):
return ret
def get_result_stdout(self, obj):
obj_size = obj.result_stdout_size
if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY:
return _("Standard Output too large to display (%(text_size)d bytes), "
"only download supported for sizes over %(supported_size)d bytes") % {
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY}
return obj.result_stdout
class UnifiedJobListSerializer(UnifiedJobSerializer):
class Meta:
fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback', '-result_stdout')
fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback')
def get_field_names(self, declared_fields, info):
field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info)
# Meta multiple inheritance and -field_name options don't seem to be
# taking effect above, so remove the undesired fields here.
return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback', 'result_stdout'))
return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback'))
def get_types(self):
if type(self) is UnifiedJobListSerializer:
@ -760,14 +750,6 @@ class UnifiedJobStdoutSerializer(UnifiedJobSerializer):
class Meta:
fields = ('result_stdout',)
def get_result_stdout(self, obj):
obj_size = obj.result_stdout_size
if obj_size > settings.STDOUT_MAX_BYTES_DISPLAY:
return _("Standard Output too large to display (%(text_size)d bytes), "
"only download supported for sizes over %(supported_size)d bytes") % {
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY}
return obj.result_stdout
def get_types(self):
if type(self) is UnifiedJobStdoutSerializer:
return ['project_update', 'inventory_update', 'job', 'ad_hoc_command', 'system_job']
@ -1123,6 +1105,7 @@ class ProjectUpdateSerializer(UnifiedJobSerializer, ProjectOptionsSerializer):
cancel = self.reverse('api:project_update_cancel', kwargs={'pk': obj.pk}),
scm_inventory_updates = self.reverse('api:project_update_scm_inventory_updates', kwargs={'pk': obj.pk}),
notifications = self.reverse('api:project_update_notifications_list', kwargs={'pk': obj.pk}),
events = self.reverse('api:project_update_events_list', kwargs={'pk': obj.pk}),
))
return res
@ -1744,6 +1727,7 @@ class InventoryUpdateSerializer(UnifiedJobSerializer, InventorySourceOptionsSeri
res.update(dict(
cancel = self.reverse('api:inventory_update_cancel', kwargs={'pk': obj.pk}),
notifications = self.reverse('api:inventory_update_notifications_list', kwargs={'pk': obj.pk}),
events = self.reverse('api:inventory_update_events_list', kwargs={'pk': obj.pk}),
))
if obj.source_project_update_id:
res['source_project_update'] = self.reverse('api:project_update_detail',
@ -2966,9 +2950,11 @@ class SystemJobTemplateSerializer(UnifiedJobTemplateSerializer):
class SystemJobSerializer(UnifiedJobSerializer):
result_stdout = serializers.SerializerMethodField()
class Meta:
model = SystemJob
fields = ('*', 'system_job_template', 'job_type', 'extra_vars')
fields = ('*', 'system_job_template', 'job_type', 'extra_vars', 'result_stdout')
def get_related(self, obj):
res = super(SystemJobSerializer, self).get_related(obj)
@ -2978,8 +2964,12 @@ class SystemJobSerializer(UnifiedJobSerializer):
res['notifications'] = self.reverse('api:system_job_notifications_list', kwargs={'pk': obj.pk})
if obj.can_cancel or True:
res['cancel'] = self.reverse('api:system_job_cancel', kwargs={'pk': obj.pk})
res['events'] = self.reverse('api:system_job_events_list', kwargs={'pk': obj.pk})
return res
def get_result_stdout(self, obj):
return obj.result_stdout
class SystemJobCancelSerializer(SystemJobSerializer):
@ -3428,6 +3418,41 @@ class JobEventWebSocketSerializer(JobEventSerializer):
return 'job_events'
class ProjectUpdateEventSerializer(JobEventSerializer):
class Meta:
model = ProjectUpdateEvent
fields = ('*', '-name', '-description', '-job', '-job_id',
'-parent_uuid', '-parent', '-host', 'project_update')
def get_related(self, obj):
res = super(JobEventSerializer, self).get_related(obj)
res['project_update'] = self.reverse(
'api:project_update_detail', kwargs={'pk': obj.project_update_id}
)
return res
class ProjectUpdateEventWebSocketSerializer(ProjectUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = ProjectUpdateEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'project_update_events'
class AdHocCommandEventSerializer(BaseSerializer):
event_display = serializers.CharField(source='get_event_display', read_only=True)
@ -3487,6 +3512,76 @@ class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer):
return 'ad_hoc_command_events'
class InventoryUpdateEventSerializer(AdHocCommandEventSerializer):
class Meta:
model = InventoryUpdateEvent
fields = ('*', '-name', '-description', '-ad_hoc_command', '-host',
'-host_name', 'inventory_update')
def get_related(self, obj):
res = super(AdHocCommandEventSerializer, self).get_related(obj)
res['inventory_update'] = self.reverse(
'api:inventory_update_detail', kwargs={'pk': obj.inventory_update_id}
)
return res
class InventoryUpdateEventWebSocketSerializer(InventoryUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = InventoryUpdateEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'inventory_update_events'
class SystemJobEventSerializer(AdHocCommandEventSerializer):
class Meta:
model = SystemJobEvent
fields = ('*', '-name', '-description', '-ad_hoc_command', '-host',
'-host_name', 'system_job')
def get_related(self, obj):
res = super(AdHocCommandEventSerializer, self).get_related(obj)
res['system_job'] = self.reverse(
'api:system_job_detail', kwargs={'pk': obj.system_job_id}
)
return res
class SystemJobEventWebSocketSerializer(SystemJobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = SystemJobEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'system_job_events'
class JobLaunchSerializer(BaseSerializer):
# Representational fields

View File

@ -9,6 +9,7 @@ from awx.api.views import (
InventoryUpdateCancel,
InventoryUpdateStdout,
InventoryUpdateNotificationsList,
InventoryUpdateEventsList,
)
@ -18,6 +19,7 @@ urls = [
url(r'^(?P<pk>[0-9]+)/cancel/$', InventoryUpdateCancel.as_view(), name='inventory_update_cancel'),
url(r'^(?P<pk>[0-9]+)/stdout/$', InventoryUpdateStdout.as_view(), name='inventory_update_stdout'),
url(r'^(?P<pk>[0-9]+)/notifications/$', InventoryUpdateNotificationsList.as_view(), name='inventory_update_notifications_list'),
url(r'^(?P<pk>[0-9]+)/events/$', InventoryUpdateEventsList.as_view(), name='inventory_update_events_list'),
]
__all__ = ['urls']

View File

@ -10,6 +10,7 @@ from awx.api.views import (
ProjectUpdateStdout,
ProjectUpdateScmInventoryUpdates,
ProjectUpdateNotificationsList,
ProjectUpdateEventsList,
)
@ -20,6 +21,7 @@ urls = [
url(r'^(?P<pk>[0-9]+)/stdout/$', ProjectUpdateStdout.as_view(), name='project_update_stdout'),
url(r'^(?P<pk>[0-9]+)/scm_inventory_updates/$', ProjectUpdateScmInventoryUpdates.as_view(), name='project_update_scm_inventory_updates'),
url(r'^(?P<pk>[0-9]+)/notifications/$', ProjectUpdateNotificationsList.as_view(), name='project_update_notifications_list'),
url(r'^(?P<pk>[0-9]+)/events/$', ProjectUpdateEventsList.as_view(), name='project_update_events_list'),
]
__all__ = ['urls']

View File

@ -8,6 +8,7 @@ from awx.api.views import (
SystemJobDetail,
SystemJobCancel,
SystemJobNotificationsList,
SystemJobEventsList
)
@ -16,6 +17,7 @@ urls = [
url(r'^(?P<pk>[0-9]+)/$', SystemJobDetail.as_view(), name='system_job_detail'),
url(r'^(?P<pk>[0-9]+)/cancel/$', SystemJobCancel.as_view(), name='system_job_cancel'),
url(r'^(?P<pk>[0-9]+)/notifications/$', SystemJobNotificationsList.as_view(), name='system_job_notifications_list'),
url(r'^(?P<pk>[0-9]+)/events/$', SystemJobEventsList.as_view(), name='system_job_events_list'),
]
__all__ = ['urls']

View File

@ -2,13 +2,11 @@
# All Rights Reserved.
# Python
import os
import re
import cgi
import dateutil
import time
import socket
import subprocess
import sys
import logging
import requests
@ -20,7 +18,7 @@ import six
from django.conf import settings
from django.core.exceptions import FieldError
from django.db.models import Q, Count, F
from django.db import IntegrityError, transaction, connection
from django.db import IntegrityError, transaction
from django.shortcuts import get_object_or_404
from django.utils.encoding import smart_text, force_text
from django.utils.safestring import mark_safe
@ -1368,6 +1366,45 @@ class ProjectUpdateDetail(UnifiedJobDeletionMixin, RetrieveDestroyAPIView):
new_in_13 = True
class ProjectUpdateEventsList(SubListAPIView):
model = ProjectUpdateEvent
serializer_class = ProjectUpdateEventSerializer
parent_model = ProjectUpdate
relationship = 'project_update_events'
view_name = _('Project Update Events List')
def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
return super(ProjectUpdateEventsList, self).finalize_response(request, response, *args, **kwargs)
class SystemJobEventsList(SubListAPIView):
model = SystemJobEvent
serializer_class = SystemJobEventSerializer
parent_model = SystemJob
relationship = 'system_job_events'
view_name = _('System Job Events List')
def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
return super(SystemJobEventsList, self).finalize_response(request, response, *args, **kwargs)
class InventoryUpdateEventsList(SubListAPIView):
model = InventoryUpdateEvent
serializer_class = InventoryUpdateEventSerializer
parent_model = InventoryUpdate
relationship = 'inventory_update_events'
view_name = _('Inventory Update Events List')
def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
return super(InventoryUpdateEventsList, self).finalize_response(request, response, *args, **kwargs)
class ProjectUpdateCancel(RetrieveAPIView):
model = ProjectUpdate
@ -4498,7 +4535,7 @@ class StdoutANSIFilter(object):
def __init__(self, fileobj):
self.fileobj = fileobj
self.extra_data = ''
if hasattr(fileobj,'close'):
if hasattr(fileobj, 'close'):
self.close = fileobj.close
def read(self, size=-1):
@ -4532,97 +4569,69 @@ class UnifiedJobStdout(RetrieveAPIView):
def retrieve(self, request, *args, **kwargs):
unified_job = self.get_object()
obj_size = unified_job.result_stdout_size
if request.accepted_renderer.format not in {'txt_download', 'ansi_download'} and obj_size > settings.STDOUT_MAX_BYTES_DISPLAY:
response_message = _("Standard Output too large to display (%(text_size)d bytes), "
"only download supported for sizes over %(supported_size)d bytes") % {
'text_size': obj_size, 'supported_size': settings.STDOUT_MAX_BYTES_DISPLAY}
try:
target_format = request.accepted_renderer.format
if target_format in ('html', 'api', 'json'):
content_format = request.query_params.get('content_format', 'html')
content_encoding = request.query_params.get('content_encoding', None)
start_line = request.query_params.get('start_line', 0)
end_line = request.query_params.get('end_line', None)
dark_val = request.query_params.get('dark', '')
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
content_only = bool(target_format in ('api', 'json'))
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content))
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),
'dark': dark_bg,
'content_only': content_only,
}
data = render_to_string('api/stdout.html', context).strip()
if target_format == 'api':
return Response(mark_safe(data))
if target_format == 'json':
if content_encoding == 'base64' and content_format == 'ansi':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)})
elif content_format == 'html':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body})
return Response(data)
elif target_format == 'txt':
return Response(unified_job.result_stdout)
elif target_format == 'ansi':
return Response(unified_job.result_stdout_raw)
elif target_format in {'txt_download', 'ansi_download'}:
filename = '{type}_{pk}{suffix}.txt'.format(
type=camelcase_to_underscore(unified_job.__class__.__name__),
pk=unified_job.id,
suffix='.ansi' if target_format == 'ansi_download' else ''
)
content_fd = unified_job.result_stdout_raw_handle(enforce_max_bytes=False)
if target_format == 'txt_download':
content_fd = StdoutANSIFilter(content_fd)
response = HttpResponse(FileWrapper(content_fd), content_type='text/plain')
response["Content-Disposition"] = 'attachment; filename="{}"'.format(filename)
return response
else:
return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs)
except StdoutMaxBytesExceeded as e:
response_message = _(
"Standard Output too large to display ({text_size} bytes), "
"only download supported for sizes over {supported_size} bytes").format(
text_size=e.total, supported_size=e.supported
)
if request.accepted_renderer.format == 'json':
return Response({'range': {'start': 0, 'end': 1, 'absolute_end': 1}, 'content': response_message})
else:
return Response(response_message)
if request.accepted_renderer.format in ('html', 'api', 'json'):
content_format = request.query_params.get('content_format', 'html')
content_encoding = request.query_params.get('content_encoding', None)
start_line = request.query_params.get('start_line', 0)
end_line = request.query_params.get('end_line', None)
dark_val = request.query_params.get('dark', '')
dark = bool(dark_val and dark_val[0].lower() in ('1', 't', 'y'))
content_only = bool(request.accepted_renderer.format in ('api', 'json'))
dark_bg = (content_only and dark) or (not content_only and (dark or not dark_val))
content, start, end, absolute_end = unified_job.result_stdout_raw_limited(start_line, end_line)
# Remove any ANSI escape sequences containing job event data.
content = re.sub(r'\x1b\[K(?:[A-Za-z0-9+/=]+\x1b\[\d+D)+\x1b\[K', '', content)
body = ansiconv.to_html(cgi.escape(content))
context = {
'title': get_view_name(self.__class__),
'body': mark_safe(body),
'dark': dark_bg,
'content_only': content_only,
}
data = render_to_string('api/stdout.html', context).strip()
if request.accepted_renderer.format == 'api':
return Response(mark_safe(data))
if request.accepted_renderer.format == 'json':
if content_encoding == 'base64' and content_format == 'ansi':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': b64encode(content)})
elif content_format == 'html':
return Response({'range': {'start': start, 'end': end, 'absolute_end': absolute_end}, 'content': body})
return Response(data)
elif request.accepted_renderer.format == 'txt':
return Response(unified_job.result_stdout)
elif request.accepted_renderer.format == 'ansi':
return Response(unified_job.result_stdout_raw)
elif request.accepted_renderer.format in {'txt_download', 'ansi_download'}:
if not os.path.exists(unified_job.result_stdout_file):
write_fd = open(unified_job.result_stdout_file, 'w')
with connection.cursor() as cursor:
try:
tablename, related_name = {
Job: ('main_jobevent', 'job_id'),
AdHocCommand: ('main_adhoccommandevent', 'ad_hoc_command_id'),
}.get(unified_job.__class__, (None, None))
if tablename is None:
# stdout job event reconstruction isn't supported
# for certain job types (such as inventory syncs),
# so just grab the raw stdout from the DB
write_fd.write(unified_job.result_stdout_text)
write_fd.close()
else:
cursor.copy_expert(
"copy (select stdout from {} where {}={} order by start_line) to stdout".format(
tablename,
related_name,
unified_job.id
),
write_fd
)
write_fd.close()
subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(unified_job.result_stdout_file),
shell=True).wait()
except Exception as e:
return Response({"error": _("Error generating stdout download file: {}".format(e))})
try:
content_fd = open(unified_job.result_stdout_file, 'r')
if request.accepted_renderer.format == 'txt_download':
# For txt downloads, filter out ANSI escape sequences.
content_fd = StdoutANSIFilter(content_fd)
suffix = ''
else:
suffix = '_ansi'
response = HttpResponse(FileWrapper(content_fd), content_type='text/plain')
response["Content-Disposition"] = 'attachment; filename="job_%s%s.txt"' % (str(unified_job.id), suffix)
return response
except Exception as e:
return Response({"error": _("Error generating stdout download file: %s") % str(e)}, status=status.HTTP_400_BAD_REQUEST)
else:
return super(UnifiedJobStdout, self).retrieve(request, *args, **kwargs)
class ProjectUpdateStdout(UnifiedJobStdout):

View File

@ -123,6 +123,8 @@ class EventContext(object):
event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
if os.getenv('AD_HOC_COMMAND_ID', ''):
event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
if os.getenv('PROJECT_UPDATE_ID', ''):
event_data['project_update_id'] = int(os.getenv('PROJECT_UPDATE_ID', '0'))
event_data.setdefault('pid', os.getpid())
event_data.setdefault('uuid', str(uuid.uuid4()))
event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
@ -145,7 +147,7 @@ class EventContext(object):
event_data['res'] = {}
event_dict = dict(event=event, event_data=event_data)
for key in event_data.keys():
if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created',):
if key in ('job_id', 'ad_hoc_command_id', 'project_update_id', 'uuid', 'parent_uuid', 'created',):
event_dict[key] = event_data.pop(key)
elif key in ('verbosity', 'pid'):
event_dict[key] = event_data[key]

View File

@ -1987,6 +1987,64 @@ class JobEventAccess(BaseAccess):
return False
class ProjectUpdateEventAccess(BaseAccess):
'''
I can see project update event records whenever I can access the project update
'''
model = ProjectUpdateEvent
def filtered_queryset(self):
return self.model.objects.filter(
Q(project_update__in=ProjectUpdate.accessible_pk_qs(self.user, 'read_role')))
def can_add(self, data):
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
return False
class InventoryUpdateEventAccess(BaseAccess):
'''
I can see inventory update event records whenever I can access the inventory update
'''
model = InventoryUpdateEvent
def filtered_queryset(self):
return self.model.objects.filter(
Q(inventory_update__in=InventoryUpdate.accessible_pk_qs(self.user, 'read_role')))
def can_add(self, data):
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
return False
class SystemJobEventAccess(BaseAccess):
'''
I can only see manage System Jobs events if I'm a super user
'''
model = SystemJobEvent
def can_add(self, data):
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
return False
class UnifiedJobTemplateAccess(BaseAccess):
'''
I can see a unified job template whenever I can see the same project,

View File

@ -445,7 +445,7 @@ class IsolatedManager(object):
instance.hostname, instance.modified))
@staticmethod
def wrap_stdout_handle(instance, private_data_dir, stdout_handle, event_data_key='job_id'):
def get_stdout_handle(instance, private_data_dir, event_data_key='job_id'):
dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data):
@ -463,7 +463,7 @@ class IsolatedManager(object):
event_data.get('event', ''), event_data['uuid'], instance.id, event_data))
dispatcher.dispatch(event_data)
return OutputEventFilter(stdout_handle, job_event_callback)
return OutputEventFilter(job_event_callback)
def run(self, instance, host, private_data_dir, proot_temp_dir):
"""

View File

@ -99,7 +99,6 @@ def run_pexpect(args, cwd, env, logfile,
password_patterns = expect_passwords.keys()
password_values = expect_passwords.values()
logfile_pos = logfile.tell()
child = pexpect.spawn(
args[0], args[1:], cwd=cwd, env=env, ignore_sighup=True,
encoding='utf-8', echo=False,
@ -116,8 +115,6 @@ def run_pexpect(args, cwd, env, logfile,
password = password_values[result_id]
if password is not None:
child.sendline(password)
if logfile_pos != logfile.tell():
logfile_pos = logfile.tell()
last_stdout_update = time.time()
if cancelled_callback:
try:

View File

@ -12,11 +12,17 @@ from awx.main.models import (
UnifiedJob,
Job,
AdHocCommand,
ProjectUpdate,
InventoryUpdate,
SystemJob
)
from awx.main.consumers import emit_channel_notification
from awx.api.serializers import (
JobEventWebSocketSerializer,
AdHocCommandEventWebSocketSerializer,
ProjectUpdateEventWebSocketSerializer,
InventoryUpdateEventWebSocketSerializer,
SystemJobEventWebSocketSerializer
)
@ -60,7 +66,16 @@ class ReplayJobEvents():
return self.replay_elapsed().total_seconds() - (self.recording_elapsed(created).total_seconds() * (1.0 / speed))
def get_job_events(self, job):
job_events = job.job_events.order_by('created')
if type(job) is Job:
job_events = job.job_events.order_by('created')
elif type(job) is AdHocCommand:
job_events = job.ad_hoc_command_events.order_by('created')
elif type(job) is ProjectUpdate:
job_events = job.project_update_events.order_by('created')
elif type(job) is InventoryUpdate:
job_events = job.inventory_update_events.order_by('created')
elif type(job) is SystemJob:
job_events = job.system_job_events.order_by('created')
if job_events.count() == 0:
raise RuntimeError("No events for job id {}".format(job.id))
return job_events
@ -70,6 +85,12 @@ class ReplayJobEvents():
return JobEventWebSocketSerializer
elif type(job) is AdHocCommand:
return AdHocCommandEventWebSocketSerializer
elif type(job) is ProjectUpdate:
return ProjectUpdateEventWebSocketSerializer
elif type(job) is InventoryUpdate:
return InventoryUpdateEventWebSocketSerializer
elif type(job) is SystemJob:
return SystemJobEventWebSocketSerializer
else:
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
sys.exit(1)

View File

@ -25,6 +25,7 @@ from django.core.cache import cache as django_cache
# AWX
from awx.main.models import * # noqa
from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@ -128,8 +129,17 @@ class CallbackBrokerWorker(ConsumerMixin):
logger.error("Exception on worker thread, restarting: " + str(e))
continue
try:
if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id')
event_map = {
'job_id': JobEvent,
'ad_hoc_command_id': AdHocCommandEvent,
'project_update_id': ProjectUpdateEvent,
'inventory_update_id': InventoryUpdateEvent,
'system_job_id': SystemJobEvent,
}
if not any([key in body for key in event_map]):
raise Exception('Payload does not have a job identifier')
if settings.DEBUG:
from pygments import highlight
from pygments.lexers import PythonLexer
@ -140,16 +150,26 @@ class CallbackBrokerWorker(ConsumerMixin):
))
def _save_event_data():
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
for key, cls in event_map.items():
if key in body:
cls.create_from_data(**body)
job_identifier = 'unknown job'
if 'job_id' in body:
job_identifier = body['job_id']
elif 'ad_hoc_command_id' in body:
job_identifier = body['ad_hoc_command_id']
for key in event_map.keys():
if key in body:
job_identifier = body[key]
break
if body.get('event') == 'EOF':
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification(
'jobs-summary',
dict(group_name='jobs', unified_job_id=job_identifier)
)
continue
retries = 0
while retries <= self.MAX_RETRIES:

View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.7 on 2017-12-14 15:13
from __future__ import unicode_literals
import awx.main.fields
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0017_v330_move_deprecated_stdout'),
]
operations = [
migrations.CreateModel(
name='InventoryUpdateEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('inventory_update', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.InventoryUpdate')),
],
options={
'ordering': ('-pk',),
},
),
migrations.CreateModel(
name='ProjectUpdateEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event', models.CharField(choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_item_on_ok', 'Item OK'), (b'runner_item_on_failed', 'Item Failed'), (b'runner_item_on_skipped', 'Item Skipped'), (b'runner_retry', 'Host Retry'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_include', 'Including File'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')], max_length=100)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('failed', models.BooleanField(default=False, editable=False)),
('changed', models.BooleanField(default=False, editable=False)),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('playbook', models.CharField(default=b'', editable=False, max_length=1024)),
('play', models.CharField(default=b'', editable=False, max_length=1024)),
('role', models.CharField(default=b'', editable=False, max_length=1024)),
('task', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('project_update', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.ProjectUpdate')),
],
options={
'ordering': ('pk',),
},
),
migrations.CreateModel(
name='SystemJobEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('system_job', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.SystemJob')),
],
options={
'ordering': ('-pk',),
},
),
migrations.RemoveField(
model_name='unifiedjob',
name='result_stdout_file',
),
]

View File

@ -12,6 +12,7 @@ from awx.main.models.credential import * # noqa
from awx.main.models.projects import * # noqa
from awx.main.models.inventory import * # noqa
from awx.main.models.jobs import * # noqa
from awx.main.models.events import * # noqa
from awx.main.models.ad_hoc_commands import * # noqa
from awx.main.models.schedules import * # noqa
from awx.main.models.activity_stream import * # noqa

View File

@ -2,29 +2,26 @@
# All Rights Reserved.
# Python
import datetime
import logging
from urlparse import urljoin
# Django
from django.conf import settings
from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
# AWX
from awx.api.versioning import reverse
from awx.main.models.base import * # noqa
from awx.main.models.events import AdHocCommandEvent
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.notifications import JobNotificationMixin, NotificationTemplate
from awx.main.fields import JSONField
logger = logging.getLogger('awx.main.models.ad_hoc_commands')
__all__ = ['AdHocCommand', 'AdHocCommandEvent']
__all__ = ['AdHocCommand']
class AdHocCommand(UnifiedJob, JobNotificationMixin):
@ -127,6 +124,10 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
raise ValidationError(_('No argument passed to %s module.') % self.module_name)
return module_args
@property
def event_class(self):
return AdHocCommandEvent
@property
def passwords_needed_to_start(self):
'''Return list of password field names needed to start the job.'''
@ -224,169 +225,3 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def get_notification_friendly_name(self):
return "AdHoc Command"
class AdHocCommandEvent(CreatedModifiedModel):
'''
An event/message logged from the ad hoc event callback for each host.
'''
EVENT_TYPES = [
# (event, verbose name, failed)
('runner_on_failed', _('Host Failed'), True),
('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback).
# ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that
# does not support check mode).
('runner_on_skipped', _('Host Skipped'), False),
# Tower does not support async for ad hoc commands (not used in v2).
# ('runner_on_async_poll', _('Host Polling'), False),
# ('runner_on_async_ok', _('Host Async OK'), False),
# ('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode.
# ('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
related_name='ad_hoc_command_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='ad_hoc_command_events',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display(), self.created.isoformat())
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the ad hoc command event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created',
'counter', 'uuid', 'stdout', 'start_line', 'end_line',
'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
return AdHocCommandEvent.objects.create(**kwargs)

774
awx/main/models/events.py Normal file
View File

@ -0,0 +1,774 @@
import datetime
import logging
from django.conf import settings
from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import force_text
from awx.api.versioning import reverse
from awx.main.fields import JSONField
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields
analytics_logger = logging.getLogger('awx.analytics.job_events')
__all__ = ['JobEvent', 'ProjectUpdateEvent', 'AdHocCommandEvent',
'InventoryUpdateEvent', 'SystemJobEvent']
class BasePlaybookEvent(CreatedModifiedModel):
'''
An event/message logged from a playbook callback for each host.
'''
VALID_KEYS = [
'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created',
'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line',
'verbosity'
]
class Meta:
abstract = True
# Playbook events will be structured to form the following hierarchy:
# - playbook_on_start (once for each playbook file)
# - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play)
# - playbook_on_import_for_host (not logged, not used for v2)
# - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining
# - playbook_on_include (only v2 - only used for handlers?)
# - playbook_on_setup (not used for v2)
# - runner_on*
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
# - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
# - runner_on_no_hosts (not used for v2)
# - runner_on_async_poll (not used for v2)
# - runner_on_async_ok (not used for v2)
# - runner_on_async_failed (not used for v2)
# - runner_on_file_diff (v2 event is v2_on_file_diff)
# - runner_item_on_ok (v2 only)
# - runner_item_on_failed (v2 only)
# - runner_item_on_skipped (v2 only)
# - runner_retry (v2 only)
# - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats
EVENT_TYPES = [
# (level, event, verbose name, failed)
(3, 'runner_on_failed', _('Host Failed'), True),
(3, 'runner_on_ok', _('Host OK'), False),
(3, 'runner_on_error', _('Host Failure'), True),
(3, 'runner_on_skipped', _('Host Skipped'), False),
(3, 'runner_on_unreachable', _('Host Unreachable'), True),
(3, 'runner_on_no_hosts', _('No Hosts Remaining'), False),
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
(3, 'runner_item_on_ok', _('Item OK'), False),
(3, 'runner_item_on_failed', _('Item Failed'), True),
(3, 'runner_item_on_skipped', _('Item Skipped'), False),
(3, 'runner_retry', _('Host Retry'), False),
# Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
# Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
LEVEL_FOR_EVENT = dict([(x[1], x[0]) for x in EVENT_TYPES])
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
role = models.CharField(
max_length=1024,
default='',
editable=False,
)
task = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
@property
def event_level(self):
return self.LEVEL_FOR_EVENT.get(self.event, 0)
def get_event_display2(self):
msg = self.get_event_display()
if self.event == 'playbook_on_play_start':
if self.play:
msg = "%s (%s)" % (msg, self.play)
elif self.event == 'playbook_on_task_start':
if self.task:
if self.event_data.get('is_conditional', False):
msg = 'Handler Notified'
if self.role:
msg = '%s (%s | %s)' % (msg, self.role, self.task)
else:
msg = "%s (%s)" % (msg, self.task)
# Change display for runner events triggered by async polling. Some of
# these events may not show in most cases, due to filterting them out
# of the job event queryset returned to the user.
res = self.event_data.get('res', {})
# Fix for existing records before we had added the workaround on save
# to change async_ok to async_failed.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
msg = 'Host Async Failed'
except (AttributeError, TypeError):
pass
# Runner events with ansible_job_id are part of async starting/polling.
if self.event in ('runner_on_ok', 'runner_on_failed'):
try:
module_name = res['invocation']['module_name']
job_id = res['ansible_job_id']
except (TypeError, KeyError, AttributeError):
module_name = None
job_id = None
if module_name and job_id:
if module_name == 'async_status':
msg = 'Host Async Checking'
else:
msg = 'Host Async Started'
# Handle both 1.2 on_failed and 1.3+ on_async_failed events when an
# async task times out.
if self.event in ('runner_on_failed', 'runner_on_async_failed'):
try:
if res['msg'] == 'timed out':
msg = 'Host Async Timeout'
except (TypeError, KeyError, AttributeError):
pass
return msg
def _update_from_event_data(self):
# Update event model fields from event data.
updated_fields = set()
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
updated_fields.add('changed')
# If we're not in verbose mode, wipe out any module arguments.
invocation = res.get('invocation', None)
if isinstance(invocation, dict) and self.job_verbosity == 0 and 'module_args' in invocation:
event_data['res']['invocation']['module_args'] = ''
self.event_data = event_data
updated_fields.add('event_data')
if self.event == 'playbook_on_stats':
try:
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
for field in ('playbook', 'play', 'task', 'role'):
value = force_text(event_data.get(field, '')).strip()
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
@classmethod
def create_from_data(self, **kwargs):
pk = None
for key in ('job_id', 'project_update_id'):
if key in kwargs:
pk = key
if pk is None:
# payload must contain either a job_id or a project_update_id
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
for key in kwargs.keys():
if key not in self.VALID_KEYS:
kwargs.pop(key)
event_data = kwargs.get('event_data', None)
artifact_dict = None
if event_data:
artifact_dict = event_data.pop('artifact_data', None)
job_event = self.objects.create(**kwargs)
analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
# Save artifact data to parent job (if provided).
if artifact_dict:
if event_data and isinstance(event_data, dict):
# Note: Core has not added support for marking artifacts as
# sensitive yet. Going forward, core will not use
# _ansible_no_log to denote sensitive set_stats calls.
# Instead, they plan to add a flag outside of the traditional
# no_log mechanism. no_log will not work for this feature,
# in core, because sensitive data is scrubbed before sending
# data to the callback. The playbook_on_stats is the callback
# in which the set_stats data is used.
# Again, the sensitive artifact feature has not yet landed in
# core. The below is how we mark artifacts payload as
# senstive
# artifact_dict['_ansible_no_log'] = True
#
parent_job = self.objects.filter(pk=pk).first()
if hasattr(parent_job, 'artifacts') and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@property
def job_verbosity(self):
return 0
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if hasattr(self, 'job') and not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(BasePlaybookEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if hasattr(self, 'job') and not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
class JobEvent(BasePlaybookEvent):
'''
An event/message logged from the callback when running a job.
'''
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id']
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent_uuid'),
]
job = models.ForeignKey(
'Job',
related_name='job_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='job_events_as_primary_host',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
hosts = models.ManyToManyField(
'Host',
related_name='job_events',
editable=False,
)
parent = models.ForeignKey(
'self',
related_name='children',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
parent_uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:job_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())
def _update_from_event_data(self):
# Update job event hostname
updated_fields = super(JobEvent, self)._update_from_event_data()
value = force_text(self.event_data.get('host', '')).strip()
if value != getattr(self, 'host_name'):
setattr(self, 'host_name', value)
updated_fields.add('host_name')
return updated_fields
def _update_parents_failed_and_changed(self):
# Update parent events to reflect failed, changed
runner_events = JobEvent.objects.filter(job=self.job,
event__startswith='runner_on')
changed_events = runner_events.filter(changed=True)
failed_events = runner_events.filter(failed=True)
JobEvent.objects.filter(uuid__in=changed_events.values_list('parent_uuid', flat=True)).update(changed=True)
JobEvent.objects.filter(uuid__in=failed_events.values_list('parent_uuid', flat=True)).update(failed=True)
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
if self.host_name:
hostnames.add(self.host_name)
if self.event == 'playbook_on_stats':
try:
for v in self.event_data.values():
hostnames.update(v.keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
qs = self.job.inventory.hosts.all()
qs = qs.filter(models.Q(name__in=hostnames) | models.Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent_uuid:
parent = JobEvent.objects.filter(uuid=self.parent_uuid)
if parent.exists():
parent = parent[0]
parent._update_hosts(qs.values_list('id', flat=True))
def _hostnames(self):
hostnames = set()
try:
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
hostnames.update(self.event_data.get(stat, {}).keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
return hostnames
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
qs = self.job.inventory.hosts.filter(name__in=hostnames)
job = self.job
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
try:
host_stats[stat] = self.event_data.get(stat, {}).get(host, 0)
except AttributeError: # in case event_data[stat] isn't a dict.
pass
if qs.filter(name=host).exists():
host_actual = qs.get(name=host)
host_summary, created = job.job_host_summaries.get_or_create(host=host_actual, host_name=host_actual.name, defaults=host_stats)
else:
host_summary, created = job.job_host_summaries.get_or_create(host_name=host, defaults=host_stats)
if not created:
update_fields = []
for stat, value in host_stats.items():
if getattr(host_summary, stat) != value:
setattr(host_summary, stat, value)
update_fields.append(stat)
if update_fields:
host_summary.save(update_fields=update_fields)
@property
def job_verbosity(self):
return self.job.verbosity
class ProjectUpdateEvent(BasePlaybookEvent):
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id']
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('project_update', 'event'),
('project_update', 'uuid'),
('project_update', 'start_line'),
('project_update', 'end_line'),
]
project_update = models.ForeignKey(
'ProjectUpdate',
related_name='project_update_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def host_name(self):
return 'localhost'
class BaseCommandEvent(CreatedModifiedModel):
'''
An event/message logged from a command for each host.
'''
VALID_KEYS = [
'event_data', 'created', 'counter', 'uuid', 'stdout', 'start_line',
'end_line', 'verbosity'
]
class Meta:
abstract = True
event_data = JSONField(
blank=True,
default={},
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display(), self.created.isoformat())
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
for key in kwargs.keys():
if key not in self.VALID_KEYS:
kwargs.pop(key)
return self.objects.create(**kwargs)
class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
EVENT_TYPES = [
# (event, verbose name, failed)
('runner_on_failed', _('Host Failed'), True),
('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback).
# ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that
# does not support check mode).
('runner_on_skipped', _('Host Skipped'), False),
# Tower does not support async for ad hoc commands (not used in v2).
# ('runner_on_async_poll', _('Host Polling'), False),
# ('runner_on_async_ok', _('Host Async OK'), False),
# ('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode.
# ('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
related_name='ad_hoc_command_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='ad_hoc_command_events',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
class InventoryUpdateEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('inventory_update', 'uuid'),
('inventory_update', 'start_line'),
('inventory_update', 'end_line'),
]
inventory_update = models.ForeignKey(
'InventoryUpdate',
related_name='inventory_update_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def event(self):
return 'verbose'
@property
def failed(self):
return False
@property
def changed(self):
return False
class SystemJobEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('system_job', 'uuid'),
('system_job', 'start_line'),
('system_job', 'end_line'),
]
system_job = models.ForeignKey(
'SystemJob',
related_name='system_job_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def event(self):
return 'verbose'
@property
def failed(self):
return False
@property
def changed(self):
return False

View File

@ -29,6 +29,7 @@ from awx.main.fields import (
)
from awx.main.managers import HostManager
from awx.main.models.base import * # noqa
from awx.main.models.events import InventoryUpdateEvent
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.mixins import ResourceMixin, TaskManagerInventoryUpdateMixin
from awx.main.models.notifications import (
@ -1590,6 +1591,10 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
self.inventory_source.source_project.get_project_path(check_if_exists=False),
self.source_path)
@property
def event_class(self):
return InventoryUpdateEvent
@property
def task_impact(self):
return 50

View File

@ -14,12 +14,9 @@ from django.conf import settings
from django.db import models
#from django.core.cache import cache
import memcache
from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from dateutil import parser
from dateutil.tz import tzutc
from django.utils.encoding import force_text, smart_str
from django.utils.timezone import utc
from django.utils.encoding import smart_str
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, FieldDoesNotExist
@ -29,27 +26,23 @@ from rest_framework.exceptions import ParseError
# AWX
from awx.api.versioning import reverse
from awx.main.models.base import * # noqa
from awx.main.models.events import JobEvent, SystemJobEvent
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin,
)
from awx.main.utils import (
ignore_inventory_computed_fields,
parse_yaml_or_json,
)
from awx.main.utils import parse_yaml_or_json
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin, TaskManagerJobMixin
from awx.main.fields import JSONField, AskForField
from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.main.models.jobs')
analytics_logger = logging.getLogger('awx.analytics.job_events')
system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
__all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobTemplate', 'SystemJob']
__all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'SystemJobTemplate', 'SystemJob']
class JobOptions(BaseModel):
@ -520,6 +513,10 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
def get_ui_url(self):
return urljoin(settings.TOWER_URL_BASE, "/#/jobs/{}".format(self.pk))
@property
def event_class(self):
return JobEvent
@property
def ask_diff_mode_on_launch(self):
if self.job_template is not None:
@ -1032,477 +1029,6 @@ class JobHostSummary(CreatedModifiedModel):
#self.host.update_computed_fields()
class JobEvent(CreatedModifiedModel):
'''
An event/message logged from the callback when running a job.
'''
# Playbook events will be structured to form the following hierarchy:
# - playbook_on_start (once for each playbook file)
# - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play)
# - playbook_on_import_for_host (not logged, not used for v2)
# - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining
# - playbook_on_include (only v2 - only used for handlers?)
# - playbook_on_setup (not used for v2)
# - runner_on*
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
# - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
# - runner_on_no_hosts (not used for v2)
# - runner_on_async_poll (not used for v2)
# - runner_on_async_ok (not used for v2)
# - runner_on_async_failed (not used for v2)
# - runner_on_file_diff (v2 event is v2_on_file_diff)
# - runner_item_on_ok (v2 only)
# - runner_item_on_failed (v2 only)
# - runner_item_on_skipped (v2 only)
# - runner_retry (v2 only)
# - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats
EVENT_TYPES = [
# (level, event, verbose name, failed)
(3, 'runner_on_failed', _('Host Failed'), True),
(3, 'runner_on_ok', _('Host OK'), False),
(3, 'runner_on_error', _('Host Failure'), True),
(3, 'runner_on_skipped', _('Host Skipped'), False),
(3, 'runner_on_unreachable', _('Host Unreachable'), True),
(3, 'runner_on_no_hosts', _('No Hosts Remaining'), False),
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
(3, 'runner_item_on_ok', _('Item OK'), False),
(3, 'runner_item_on_failed', _('Item Failed'), True),
(3, 'runner_item_on_skipped', _('Item Skipped'), False),
(3, 'runner_retry', _('Host Retry'), False),
# Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
# Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
LEVEL_FOR_EVENT = dict([(x[1], x[0]) for x in EVENT_TYPES])
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent_uuid'),
]
job = models.ForeignKey(
'Job',
related_name='job_events',
on_delete=models.CASCADE,
editable=False,
)
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='job_events_as_primary_host',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
hosts = models.ManyToManyField(
'Host',
related_name='job_events',
editable=False,
)
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
role = models.CharField(
max_length=1024,
default='',
editable=False,
)
task = models.CharField(
max_length=1024,
default='',
editable=False,
)
parent = models.ForeignKey(
'self',
related_name='children',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
parent_uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:job_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())
@property
def event_level(self):
return self.LEVEL_FOR_EVENT.get(self.event, 0)
def get_event_display2(self):
msg = self.get_event_display()
if self.event == 'playbook_on_play_start':
if self.play:
msg = "%s (%s)" % (msg, self.play)
elif self.event == 'playbook_on_task_start':
if self.task:
if self.event_data.get('is_conditional', False):
msg = 'Handler Notified'
if self.role:
msg = '%s (%s | %s)' % (msg, self.role, self.task)
else:
msg = "%s (%s)" % (msg, self.task)
# Change display for runner events trigged by async polling. Some of
# these events may not show in most cases, due to filterting them out
# of the job event queryset returned to the user.
res = self.event_data.get('res', {})
# Fix for existing records before we had added the workaround on save
# to change async_ok to async_failed.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
msg = 'Host Async Failed'
except (AttributeError, TypeError):
pass
# Runner events with ansible_job_id are part of async starting/polling.
if self.event in ('runner_on_ok', 'runner_on_failed'):
try:
module_name = res['invocation']['module_name']
job_id = res['ansible_job_id']
except (TypeError, KeyError, AttributeError):
module_name = None
job_id = None
if module_name and job_id:
if module_name == 'async_status':
msg = 'Host Async Checking'
else:
msg = 'Host Async Started'
# Handle both 1.2 on_failed and 1.3+ on_async_failed events when an
# async task times out.
if self.event in ('runner_on_failed', 'runner_on_async_failed'):
try:
if res['msg'] == 'timed out':
msg = 'Host Async Timeout'
except (TypeError, KeyError, AttributeError):
pass
return msg
def _update_from_event_data(self):
# Update job event model fields from event data.
updated_fields = set()
job = self.job
verbosity = job.verbosity
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
updated_fields.add('changed')
# If we're not in verbose mode, wipe out any module arguments.
invocation = res.get('invocation', None)
if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation:
event_data['res']['invocation']['module_args'] = ''
self.event_data = event_data
updated_fields.add('event_data')
if self.event == 'playbook_on_stats':
try:
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
for field in ('playbook', 'play', 'task', 'role', 'host'):
value = force_text(event_data.get(field, '')).strip()
if field == 'host':
field = 'host_name'
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
def _update_parents_failed_and_changed(self):
# Update parent events to reflect failed, changed
runner_events = JobEvent.objects.filter(job=self.job,
event__startswith='runner_on')
changed_events = runner_events.filter(changed=True)
failed_events = runner_events.filter(failed=True)
JobEvent.objects.filter(uuid__in=changed_events.values_list('parent_uuid', flat=True)).update(changed=True)
JobEvent.objects.filter(uuid__in=failed_events.values_list('parent_uuid', flat=True)).update(failed=True)
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
if self.host_name:
hostnames.add(self.host_name)
if self.event == 'playbook_on_stats':
try:
for v in self.event_data.values():
hostnames.update(v.keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
qs = self.job.inventory.hosts.all()
qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent_uuid:
parent = JobEvent.objects.filter(uuid=self.parent_uuid)
if parent.exists():
parent = parent[0]
parent._update_hosts(qs.values_list('id', flat=True))
def _hostnames(self):
hostnames = set()
try:
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
hostnames.update(self.event_data.get(stat, {}).keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
return hostnames
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
qs = self.job.inventory.hosts.filter(name__in=hostnames)
job = self.job
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
try:
host_stats[stat] = self.event_data.get(stat, {}).get(host, 0)
except AttributeError: # in case event_data[stat] isn't a dict.
pass
if qs.filter(name=host).exists():
host_actual = qs.get(name=host)
host_summary, created = job.job_host_summaries.get_or_create(host=host_actual, host_name=host_actual.name, defaults=host_stats)
else:
host_summary, created = job.job_host_summaries.get_or_create(host_name=host, defaults=host_stats)
if not created:
update_fields = []
for stat, value in host_stats.items():
if getattr(host_summary, stat) != value:
setattr(host_summary, stat, value)
update_fields.append(stat)
if update_fields:
host_summary.save(update_fields=update_fields)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id))
@classmethod
def create_from_data(self, **kwargs):
# Must have a job_id specified.
if not kwargs.get('job_id', None):
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play',
'role', 'task', 'created', 'counter', 'uuid', 'stdout',
'parent_uuid', 'start_line', 'end_line', 'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
event_data = kwargs.get('event_data', None)
artifact_dict = None
if event_data:
artifact_dict = event_data.pop('artifact_data', None)
job_event = JobEvent.objects.create(**kwargs)
analytics_logger.info('Job event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
# Save artifact data to parent job (if provided).
if artifact_dict:
if event_data and isinstance(event_data, dict):
# Note: Core has not added support for marking artifacts as
# sensitive yet. Going forward, core will not use
# _ansible_no_log to denote sensitive set_stats calls.
# Instead, they plan to add a flag outside of the traditional
# no_log mechanism. no_log will not work for this feature,
# in core, because sensitive data is scrubbed before sending
# data to the callback. The playbook_on_stats is the callback
# in which the set_stats data is used.
# Again, the sensitive artifact feature has not yet landed in
# core. The below is how we mark artifacts payload as
# senstive
# artifact_dict['_ansible_no_log'] = True
#
parent_job = Job.objects.filter(pk=kwargs['job_id']).first()
if parent_job and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@classmethod
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
'''
We need to pull information about each start event.
This is super tricky, because this table has a one-to-many
relationship with itself (parent-child), and we're getting
information for an arbitrary number of children. This means we
need stats on grandchildren, sorted by child.
'''
qs = (JobEvent.objects.filter(parent__parent=parent_task,
parent__event__in=starting_events)
.values('parent__id', 'event', 'changed')
.annotate(num=Count('event'))
.order_by('parent__id'))
if ordering is not None:
qs = qs.order_by(ordering)
return qs
class SystemJobOptions(BaseModel):
'''
Common fields for SystemJobTemplate and SystemJob.
@ -1644,6 +1170,10 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
def get_ui_url(self):
return urljoin(settings.TOWER_URL_BASE, "/#/management_jobs/{}".format(self.pk))
@property
def event_class(self):
return SystemJobEvent
@property
def task_impact(self):
return 150

View File

@ -18,6 +18,7 @@ from django.utils.timezone import now, make_aware, get_default_timezone
# AWX
from awx.api.versioning import reverse
from awx.main.models.base import * # noqa
from awx.main.models.events import ProjectUpdateEvent
from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin,
@ -485,6 +486,10 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
websocket_data.update(dict(project_id=self.project.id))
return websocket_data
@property
def event_class(self):
return ProjectUpdateEvent
@property
def task_impact(self):
return 0 if self.job_type == 'run' else 20

View File

@ -2,14 +2,14 @@
# All Rights Reserved.
# Python
import codecs
from cStringIO import StringIO
import json
import logging
import re
import os
import os.path
import re
import subprocess
import tempfile
from collections import OrderedDict
from StringIO import StringIO
# Django
from django.conf import settings
@ -42,7 +42,7 @@ from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.consumers import emit_channel_notification
from awx.main.fields import JSONField, AskForField
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
__all__ = ['UnifiedJobTemplate', 'UnifiedJob', 'StdoutMaxBytesExceeded']
logger = logging.getLogger('awx.main.models.unified_jobs')
@ -514,6 +514,13 @@ class UnifiedJobDeprecatedStdout(models.Model):
)
class StdoutMaxBytesExceeded(Exception):
def __init__(self, total, supported):
self.total = total
self.supported = supported
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
'''
Concrete base class for unified job run by the task engine.
@ -642,11 +649,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
default='',
editable=False,
))
result_stdout_file = models.TextField( # FilePathfield?
blank=True,
default='',
editable=False,
)
result_traceback = models.TextField(
blank=True,
default='',
@ -822,14 +824,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# Done.
return result
def delete(self):
if self.result_stdout_file != "":
try:
os.remove(self.result_stdout_file)
except Exception:
pass
super(UnifiedJob, self).delete()
def copy_unified_job(self, limit=None):
'''
Returns saved object, including related fields.
@ -899,6 +893,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
config.credentials.add(*job_creds)
return config
@property
def event_class(self):
raise NotImplementedError()
@property
def result_stdout_text(self):
related = UnifiedJobDeprecatedStdout.objects.get(pk=self.pk)
@ -912,36 +910,100 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
related.result_stdout_text = value
related.save()
def result_stdout_raw_handle(self, attempt=0):
"""Return a file-like object containing the standard out of the
job's result.
def result_stdout_raw_handle(self, enforce_max_bytes=True):
"""
msg = {
'pending': 'Waiting for results...',
'missing': 'stdout capture is missing',
}
if self.result_stdout_text:
return StringIO(self.result_stdout_text)
else:
if not os.path.exists(self.result_stdout_file) or os.stat(self.result_stdout_file).st_size < 1:
return StringIO(msg['missing' if self.finished else 'pending'])
This method returns a file-like object ready to be read which contains
all stdout for the UnifiedJob.
# There is a potential timing issue here, because another
# process may be deleting the stdout file after it is written
# to the database.
#
# Therefore, if we get an IOError (which generally means the
# file does not exist), reload info from the database and
# try again.
try:
return codecs.open(self.result_stdout_file, "r",
encoding='utf-8')
except IOError:
if attempt < 3:
self.result_stdout_text = type(self).objects.get(id=self.id).result_stdout_text
return self.result_stdout_raw_handle(attempt=attempt + 1)
If the size of the file is greater than
`settings.STDOUT_MAX_BYTES_DISPLAY`, a StdoutMaxBytesExceeded exception
will be raised.
"""
max_supported = settings.STDOUT_MAX_BYTES_DISPLAY
if enforce_max_bytes:
# If enforce_max_bytes is True, we're not grabbing the whole file,
# just the first <settings.STDOUT_MAX_BYTES_DISPLAY> bytes;
# in this scenario, it's probably safe to use a StringIO.
fd = StringIO()
else:
# If enforce_max_bytes = False, that means they're downloading
# the entire file. To avoid ballooning memory, let's write the
# stdout content to a temporary disk location
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
fd = tempfile.NamedTemporaryFile(
prefix='{}-{}-'.format(self.model_to_str(), self.pk),
suffix='.out',
dir=settings.JOBOUTPUT_ROOT
)
# Before the addition of event-based stdout, older versions of
# awx stored stdout as raw text blobs in a certain database column
# (`main_unifiedjob.result_stdout_text`)
# For older installs, this data still exists in the database; check for
# it and use if it exists
legacy_stdout_text = self.result_stdout_text
if legacy_stdout_text:
if enforce_max_bytes and len(legacy_stdout_text) > max_supported:
raise StdoutMaxBytesExceeded(len(legacy_stdout_text), max_supported)
fd.write(legacy_stdout_text)
if hasattr(fd, 'name'):
fd.flush()
return open(fd.name, 'r')
else:
# we just wrote to this StringIO, so rewind it
fd.seek(0)
return fd
else:
# Note: the code in this block _intentionally_ does not use the
# Django ORM because of the potential size (many MB+) of
# `main_jobevent.stdout`; we *do not* want to generate queries
# here that construct model objects by fetching large gobs of
# data (and potentially ballooning memory usage); instead, we
# just want to write concatenated values of a certain column
# (`stdout`) directly to a file
with connection.cursor() as cursor:
tablename = self._meta.db_table
related_name = {
'main_job': 'job_id',
'main_adhoccommand': 'ad_hoc_command_id',
'main_projectupdate': 'project_update_id',
'main_inventoryupdate': 'inventory_update_id',
'main_systemjob': 'system_job_id',
}[tablename]
if enforce_max_bytes:
# detect the length of all stdout for this UnifiedJob, and
# if it exceeds settings.STDOUT_MAX_BYTES_DISPLAY bytes,
# don't bother actually fetching the data
total = self.event_class.objects.filter(**{related_name: self.id}).aggregate(
total=models.Sum(models.Func(models.F('stdout'), function='LENGTH'))
)['total']
if total > max_supported:
raise StdoutMaxBytesExceeded(total, max_supported)
cursor.copy_expert(
"copy (select stdout from {} where {}={} order by start_line) to stdout".format(
tablename + 'event',
related_name,
self.id
),
fd
)
if hasattr(fd, 'name'):
# If we're dealing with a physical file, use `sed` to clean
# up escaped line sequences
fd.flush()
subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(fd.name), shell=True).wait()
return open(fd.name, 'r')
else:
return StringIO(msg['missing' if self.finished else 'pending'])
# If we're dealing with an in-memory string buffer, use
# string.replace()
fd = StringIO(fd.getvalue().replace('\\r\\n', '\n'))
return fd
def _escape_ascii(self, content):
# Remove ANSI escape sequences used to embed event data.
@ -966,13 +1028,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def result_stdout(self):
return self._result_stdout_raw(escape_ascii=True)
@property
def result_stdout_size(self):
try:
return os.stat(self.result_stdout_file).st_size
except Exception:
return len(self.result_stdout)
def _result_stdout_raw_limited(self, start_line=0, end_line=None, redact_sensitive=True, escape_ascii=False):
return_buffer = u""
if end_line is not None:

View File

@ -24,7 +24,7 @@ from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_gr
from awx.main.tasks import update_inventory_computed_fields
from awx.main.fields import is_implicit_parent
from awx.main.consumers import emit_channel_notification
from awx.main import consumers
from awx.conf.utils import conf_to_dict
@ -48,7 +48,7 @@ def emit_job_event_detail(sender, **kwargs):
created = kwargs['created']
if created:
event_serialized = JobEventWebSocketSerializer(instance).data
emit_channel_notification('job_events-' + str(instance.job.id), event_serialized)
consumers.emit_channel_notification('job_events-' + str(instance.job.id), event_serialized)
def emit_ad_hoc_command_event_detail(sender, **kwargs):
@ -56,7 +56,31 @@ def emit_ad_hoc_command_event_detail(sender, **kwargs):
created = kwargs['created']
if created:
event_serialized = AdHocCommandEventWebSocketSerializer(instance).data
emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized)
consumers.emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized)
def emit_project_update_event_detail(sender, **kwargs):
instance = kwargs['instance']
created = kwargs['created']
if created:
event_serialized = ProjectUpdateEventWebSocketSerializer(instance).data
consumers.emit_channel_notification('project_update_events-' + str(instance.project_update_id), event_serialized)
def emit_inventory_update_event_detail(sender, **kwargs):
instance = kwargs['instance']
created = kwargs['created']
if created:
event_serialized = InventoryUpdateEventWebSocketSerializer(instance).data
consumers.emit_channel_notification('inventory_update_events-' + str(instance.inventory_update_id), event_serialized)
def emit_system_job_event_detail(sender, **kwargs):
instance = kwargs['instance']
created = kwargs['created']
if created:
event_serialized = SystemJobEventWebSocketSerializer(instance).data
consumers.emit_channel_notification('system_job_events-' + str(instance.system_job_id), event_serialized)
def emit_update_inventory_computed_fields(sender, **kwargs):
@ -222,6 +246,9 @@ connect_computed_field_signals()
post_save.connect(emit_job_event_detail, sender=JobEvent)
post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent)
post_save.connect(emit_project_update_event_detail, sender=ProjectUpdateEvent)
post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent)
post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(org_admin_edit_members, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
# Python
import codecs
from collections import OrderedDict
import ConfigParser
import cStringIO
@ -17,7 +16,6 @@ import tempfile
import time
import traceback
import urlparse
import uuid
from distutils.version import LooseVersion as Version
import yaml
import fcntl
@ -498,6 +496,7 @@ def with_path_cleanup(f):
class BaseTask(LogErrorsTask):
name = None
model = None
event_model = None
abstract = True
cleanup_paths = []
proot_show_paths = []
@ -518,17 +517,13 @@ class BaseTask(LogErrorsTask):
if updates:
update_fields = ['modified']
for field, value in updates.items():
if field in ('result_stdout', 'result_traceback'):
if field in ('result_traceback'):
for srch, repl in output_replacements:
value = value.replace(srch, repl)
setattr(instance, field, value)
update_fields.append(field)
if field == 'status':
update_fields.append('failed')
if 'result_stdout_text' in update_fields:
# result_stdout_text is now deprecated, and is no longer
# an actual Django field (it's a property)
update_fields.remove('result_stdout_text')
instance.save(update_fields=update_fields)
return instance
except DatabaseError as e:
@ -738,14 +733,19 @@ class BaseTask(LogErrorsTask):
def get_stdout_handle(self, instance):
'''
Return an open file object for capturing stdout.
Return an virtual file object for capturing stdout and events.
'''
if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1())))
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
assert stdout_handle.name == stdout_filename
return stdout_handle
dispatcher = CallbackQueueDispatcher()
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
return OutputEventFilter(event_callback)
def pre_run_hook(self, instance, **kwargs):
'''
@ -827,10 +827,8 @@ class BaseTask(LogErrorsTask):
if isolated_host is None:
stdout_handle = self.get_stdout_handle(instance)
else:
base_handle = super(self.__class__, self).get_stdout_handle(instance)
stdout_handle = isolated_manager.IsolatedManager.wrap_stdout_handle(
instance, kwargs['private_data_dir'], base_handle,
event_data_key=self.event_data_key)
stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle(
instance, kwargs['private_data_dir'], event_data_key=self.event_data_key)
if self.should_use_proot(instance, **kwargs):
if not check_proot_installed():
raise RuntimeError('bubblewrap is not installed')
@ -847,7 +845,7 @@ class BaseTask(LogErrorsTask):
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
instance = self.update_model(pk, job_args=json.dumps(safe_args),
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_handle.name)
job_cwd=cwd, job_env=safe_env)
expect_passwords = {}
for k, v in self.get_password_prompts(**kwargs).items():
@ -940,7 +938,8 @@ class RunJob(BaseTask):
name = 'awx.main.tasks.run_job'
model = Job
event_data_key= 'job_id'
event_model = JobEvent
event_data_key = 'job_id'
def build_private_data(self, job, **kwargs):
'''
@ -1207,29 +1206,6 @@ class RunJob(BaseTask):
d[re.compile(r'Vault password \({}\):\s*?$'.format(vault_id), re.M)] = k
return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunJob, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
else:
def job_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
JobEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, job_event_callback)
def should_use_proot(self, instance, **kwargs):
'''
Return whether this task should use proot.
@ -1290,6 +1266,8 @@ class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update'
model = ProjectUpdate
event_model = ProjectUpdateEvent
event_data_key = 'project_update_id'
@property
def proot_show_paths(self):
@ -1343,6 +1321,10 @@ class RunProjectUpdate(BaseTask):
# give ansible a hint about the intended tmpdir to work around issues
# like https://github.com/ansible/ansible/issues/30064
env['TMP'] = settings.AWX_PROOT_BASE_PATH
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
env['PROJECT_UPDATE_ID'] = str(project_update.pk)
env['ANSIBLE_CALLBACK_PLUGINS'] = self.get_path_to('..', 'plugins', 'callback')
env['ANSIBLE_STDOUT_CALLBACK'] = 'awx_display'
return env
def _build_scm_url_extra_vars(self, project_update, **kwargs):
@ -1480,16 +1462,6 @@ class RunProjectUpdate(BaseTask):
def get_idle_timeout(self):
return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None)
def get_stdout_handle(self, instance):
stdout_handle = super(RunProjectUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
project_request_id = '' if self.request.id is None else self.request.id
scm_revision = project_update.project.scm_revision
@ -1615,6 +1587,8 @@ class RunInventoryUpdate(BaseTask):
name = 'awx.main.tasks.run_inventory_update'
model = InventoryUpdate
event_model = InventoryUpdateEvent
event_data_key = 'inventory_update_id'
def build_private_data(self, inventory_update, **kwargs):
"""
@ -1986,16 +1960,6 @@ class RunInventoryUpdate(BaseTask):
args.append('--traceback')
return args
def get_stdout_handle(self, instance):
stdout_handle = super(RunInventoryUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_cwd(self, inventory_update, **kwargs):
return self.get_path_to('..', 'plugins', 'inventory')
@ -2042,6 +2006,7 @@ class RunAdHocCommand(BaseTask):
name = 'awx.main.tasks.run_ad_hoc_command'
model = AdHocCommand
event_model = AdHocCommandEvent
event_data_key = 'ad_hoc_command_id'
def build_private_data(self, ad_hoc_command, **kwargs):
@ -2199,29 +2164,6 @@ class RunAdHocCommand(BaseTask):
d[re.compile(r'Password:\s*?$', re.M)] = 'ssh_password'
return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def ad_hoc_command_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
else:
def ad_hoc_command_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
AdHocCommandEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)
def should_use_proot(self, instance, **kwargs):
'''
Return whether this task should use proot.
@ -2233,6 +2175,8 @@ class RunSystemJob(BaseTask):
name = 'awx.main.tasks.run_system_job'
model = SystemJob
event_model = SystemJobEvent
event_data_key = 'system_job_id'
def build_args(self, system_job, **kwargs):
args = ['awx-manage', system_job.job_type]
@ -2259,16 +2203,6 @@ class RunSystemJob(BaseTask):
logger.exception("%s Failed to parse system job", system_job.log_format)
return args
def get_stdout_handle(self, instance):
stdout_handle = super(RunSystemJob, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_env(self, instance, **kwargs):
env = super(RunSystemJob, self).build_env(instance,
**kwargs)

View File

@ -0,0 +1,231 @@
import re
import shutil
import tempfile
from django.conf import settings
from django.db.backends.sqlite3.base import SQLiteCursorWrapper
import pytest
from awx.api.versioning import reverse
from awx.main.models import (Job, JobEvent, AdHocCommand, AdHocCommandEvent,
Project, ProjectUpdate, ProjectUpdateEvent,
InventoryUpdate, InventorySource,
InventoryUpdateEvent, SystemJob, SystemJobEvent)
def _mk_project_update():
project = Project()
project.save()
return ProjectUpdate(project=project)
def _mk_inventory_update():
source = InventorySource()
source.save()
iu = InventoryUpdate(inventory_source=source)
return iu
@pytest.fixture(scope='function')
def sqlite_copy_expert(request):
# copy_expert is postgres-specific, and SQLite doesn't support it; mock its
# behavior to test that it writes a file that contains stdout from events
path = tempfile.mkdtemp(prefix='job-event-stdout')
def write_stdout(self, sql, fd):
# simulate postgres copy_expert support with ORM code
parts = sql.split(' ')
tablename = parts[parts.index('from') + 1]
for cls in (JobEvent, AdHocCommandEvent, ProjectUpdateEvent,
InventoryUpdateEvent, SystemJobEvent):
if cls._meta.db_table == tablename:
for event in cls.objects.order_by('start_line').all():
fd.write(event.stdout)
setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout)
request.addfinalizer(lambda: shutil.rmtree(path))
request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert'))
return path
@pytest.mark.django_db
@pytest.mark.parametrize('Parent, Child, relation, view', [
[Job, JobEvent, 'job', 'api:job_stdout'],
[AdHocCommand, AdHocCommandEvent, 'ad_hoc_command', 'api:ad_hoc_command_stdout'],
[_mk_project_update, ProjectUpdateEvent, 'project_update', 'api:project_update_stdout'],
[_mk_inventory_update, InventoryUpdateEvent, 'inventory_update', 'api:inventory_update_stdout'],
])
def test_text_stdout(sqlite_copy_expert, Parent, Child, relation, view, get, admin):
job = Parent()
job.save()
for i in range(3):
Child(**{relation: job, 'stdout': 'Testing {}\n'.format(i), 'start_line': i}).save()
url = reverse(view, kwargs={'pk': job.pk}) + '?format=txt'
response = get(url, user=admin, expect=200)
assert response.content.splitlines() == ['Testing %d' % i for i in range(3)]
@pytest.mark.django_db
@pytest.mark.parametrize('Parent, Child, relation, view', [
[Job, JobEvent, 'job', 'api:job_stdout'],
[AdHocCommand, AdHocCommandEvent, 'ad_hoc_command', 'api:ad_hoc_command_stdout'],
[_mk_project_update, ProjectUpdateEvent, 'project_update', 'api:project_update_stdout'],
[_mk_inventory_update, InventoryUpdateEvent, 'inventory_update', 'api:inventory_update_stdout'],
])
@pytest.mark.parametrize('download', [True, False])
def test_ansi_stdout_filtering(sqlite_copy_expert, Parent, Child, relation,
view, download, get, admin):
job = Parent()
job.save()
for i in range(3):
Child(**{
relation: job,
'stdout': '\x1B[0;36mTesting {}\x1B[0m\n'.format(i),
'start_line': i
}).save()
url = reverse(view, kwargs={'pk': job.pk})
# ansi codes in ?format=txt should get filtered
fmt = "?format={}".format("txt_download" if download else "txt")
response = get(url + fmt, user=admin, expect=200)
assert response.content.splitlines() == ['Testing %d' % i for i in range(3)]
has_download_header = response.has_header('Content-Disposition')
assert has_download_header if download else not has_download_header
# ask for ansi and you'll get it
fmt = "?format={}".format("ansi_download" if download else "ansi")
response = get(url + fmt, user=admin, expect=200)
assert response.content.splitlines() == ['\x1B[0;36mTesting %d\x1B[0m' % i for i in range(3)]
has_download_header = response.has_header('Content-Disposition')
assert has_download_header if download else not has_download_header
@pytest.mark.django_db
@pytest.mark.parametrize('Parent, Child, relation, view', [
[Job, JobEvent, 'job', 'api:job_stdout'],
[AdHocCommand, AdHocCommandEvent, 'ad_hoc_command', 'api:ad_hoc_command_stdout'],
[_mk_project_update, ProjectUpdateEvent, 'project_update', 'api:project_update_stdout'],
[_mk_inventory_update, InventoryUpdateEvent, 'inventory_update', 'api:inventory_update_stdout'],
])
def test_colorized_html_stdout(sqlite_copy_expert, Parent, Child, relation, view, get, admin):
job = Parent()
job.save()
for i in range(3):
Child(**{
relation: job,
'stdout': '\x1B[0;36mTesting {}\x1B[0m\n'.format(i),
'start_line': i
}).save()
url = reverse(view, kwargs={'pk': job.pk}) + '?format=html'
response = get(url, user=admin, expect=200)
assert '.ansi36 { color: #2dbaba; }' in response.content
for i in range(3):
assert '<span class="ansi36">Testing {}</span>'.format(i) in response.content
@pytest.mark.django_db
@pytest.mark.parametrize('Parent, Child, relation, view', [
[Job, JobEvent, 'job', 'api:job_stdout'],
[AdHocCommand, AdHocCommandEvent, 'ad_hoc_command', 'api:ad_hoc_command_stdout'],
[_mk_project_update, ProjectUpdateEvent, 'project_update', 'api:project_update_stdout'],
[_mk_inventory_update, InventoryUpdateEvent, 'inventory_update', 'api:inventory_update_stdout'],
])
def test_stdout_line_range(sqlite_copy_expert, Parent, Child, relation, view, get, admin):
job = Parent()
job.save()
for i in range(20):
Child(**{relation: job, 'stdout': 'Testing {}\n'.format(i), 'start_line': i}).save()
url = reverse(view, kwargs={'pk': job.pk}) + '?format=html&start_line=5&end_line=10'
response = get(url, user=admin, expect=200)
assert re.findall('Testing [0-9]+', response.content) == ['Testing %d' % i for i in range(5, 10)]
@pytest.mark.django_db
def test_text_stdout_from_system_job_events(sqlite_copy_expert, get, admin):
job = SystemJob()
job.save()
for i in range(3):
SystemJobEvent(system_job=job, stdout='Testing {}\n'.format(i), start_line=i).save()
url = reverse('api:system_job_detail', kwargs={'pk': job.pk})
response = get(url, user=admin, expect=200)
assert response.data['result_stdout'].splitlines() == ['Testing %d' % i for i in range(3)]
@pytest.mark.django_db
@pytest.mark.parametrize('Parent, Child, relation, view', [
[Job, JobEvent, 'job', 'api:job_stdout'],
[AdHocCommand, AdHocCommandEvent, 'ad_hoc_command', 'api:ad_hoc_command_stdout'],
[_mk_project_update, ProjectUpdateEvent, 'project_update', 'api:project_update_stdout'],
[_mk_inventory_update, InventoryUpdateEvent, 'inventory_update', 'api:inventory_update_stdout'],
])
@pytest.mark.parametrize('fmt', ['txt', 'ansi'])
def test_max_bytes_display(sqlite_copy_expert, Parent, Child, relation, view, fmt, get, admin):
job = Parent()
job.save()
total_bytes = settings.STDOUT_MAX_BYTES_DISPLAY + 1
large_stdout = 'X' * total_bytes
Child(**{relation: job, 'stdout': large_stdout, 'start_line': 0}).save()
url = reverse(view, kwargs={'pk': job.pk})
response = get(url + '?format={}'.format(fmt), user=admin, expect=200)
assert response.content == (
'Standard Output too large to display ({actual} bytes), only download '
'supported for sizes over {max} bytes'.format(
actual=total_bytes,
max=settings.STDOUT_MAX_BYTES_DISPLAY
)
)
response = get(url + '?format={}_download'.format(fmt), user=admin, expect=200)
assert response.content == large_stdout
@pytest.mark.django_db
@pytest.mark.parametrize('Cls, view', [
[_mk_project_update, 'api:project_update_stdout'],
[_mk_inventory_update, 'api:inventory_update_stdout']
])
@pytest.mark.parametrize('fmt', ['txt', 'ansi', 'txt_download', 'ansi_download'])
def test_legacy_result_stdout_text_fallback(Cls, view, fmt, get, admin):
# older versions of stored raw stdout in a raw text blob at
# main_unifiedjob.result_stdout_text; this test ensures that fallback
# works properly if no job events exist
job = Cls()
job.save()
job.result_stdout_text = 'LEGACY STDOUT!'
job.save()
url = reverse(view, kwargs={'pk': job.pk})
response = get(url + '?format={}'.format(fmt), user=admin, expect=200)
assert response.content == 'LEGACY STDOUT!'
@pytest.mark.django_db
@pytest.mark.parametrize('Cls, view', [
[_mk_project_update, 'api:project_update_stdout'],
[_mk_inventory_update, 'api:inventory_update_stdout']
])
@pytest.mark.parametrize('fmt', ['txt', 'ansi'])
def test_legacy_result_stdout_with_max_bytes(Cls, view, fmt, get, admin):
job = Cls()
job.save()
total_bytes = settings.STDOUT_MAX_BYTES_DISPLAY + 1
large_stdout = 'X' * total_bytes
job.result_stdout_text = large_stdout
job.save()
url = reverse(view, kwargs={'pk': job.pk})
response = get(url + '?format={}'.format(fmt), user=admin, expect=200)
assert response.content == (
'Standard Output too large to display ({actual} bytes), only download '
'supported for sizes over {max} bytes'.format(
actual=total_bytes,
max=settings.STDOUT_MAX_BYTES_DISPLAY
)
)
response = get(url + '?format={}'.format(fmt + '_download'), user=admin, expect=200)
assert response.content == large_stdout

View File

@ -544,7 +544,8 @@ def _request(verb):
response.data = data_copy
print(response.data)
assert response.status_code == expect
response.render()
if hasattr(response, 'render'):
response.render()
return response
return rf

View File

@ -0,0 +1,69 @@
import mock
import pytest
from awx.main.models import (Job, JobEvent, ProjectUpdate, ProjectUpdateEvent,
AdHocCommand, AdHocCommandEvent, InventoryUpdate,
InventorySource, InventoryUpdateEvent, SystemJob,
SystemJobEvent)
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_job_event_websocket_notifications(emit):
j = Job(id=123)
j.save()
JobEvent.create_from_data(job_id=j.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'job_events-123'
assert payload['job'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_ad_hoc_event_websocket_notifications(emit):
ahc = AdHocCommand(id=123)
ahc.save()
AdHocCommandEvent.create_from_data(ad_hoc_command_id=ahc.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'ad_hoc_command_events-123'
assert payload['ad_hoc_command'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_project_update_event_websocket_notifications(emit, project):
pu = ProjectUpdate(id=123, project=project)
pu.save()
ProjectUpdateEvent.create_from_data(project_update_id=pu.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'project_update_events-123'
assert payload['project_update'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_inventory_update_event_websocket_notifications(emit, inventory):
source = InventorySource()
source.save()
iu = InventoryUpdate(id=123, inventory_source=source)
iu.save()
InventoryUpdateEvent.create_from_data(inventory_update_id=iu.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'inventory_update_events-123'
assert payload['inventory_update'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_system_job_event_websocket_notifications(emit, inventory):
j = SystemJob(id=123)
j.save()
SystemJobEvent.create_from_data(system_job_id=j.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'system_job_events-123'
assert payload['system_job'] == 123

View File

@ -0,0 +1,46 @@
from datetime import datetime
from django.utils.timezone import utc
import mock
import pytest
from awx.main.models import (JobEvent, ProjectUpdateEvent, AdHocCommandEvent,
InventoryUpdateEvent, SystemJobEvent)
@pytest.mark.parametrize('job_identifier, cls', [
['job_id', JobEvent],
['project_update_id', ProjectUpdateEvent],
['ad_hoc_command_id', AdHocCommandEvent],
['inventory_update_id', InventoryUpdateEvent],
['system_job_id', SystemJobEvent],
])
@pytest.mark.parametrize('created', [
datetime(2018, 1, 1).isoformat(), datetime(2018, 1, 1)
])
def test_event_parse_created(job_identifier, cls, created):
with mock.patch.object(cls, 'objects') as manager:
cls.create_from_data(**{
job_identifier: 123,
'created': created
})
expected_created = datetime(2018, 1, 1).replace(tzinfo=utc)
manager.create.assert_called_with(**{
job_identifier: 123,
'created': expected_created
})
@pytest.mark.parametrize('job_identifier, cls', [
['job_id', JobEvent],
['project_update_id', ProjectUpdateEvent],
['ad_hoc_command_id', AdHocCommandEvent],
['inventory_update_id', InventoryUpdateEvent],
['system_job_id', SystemJobEvent],
])
def test_playbook_event_strip_invalid_keys(job_identifier, cls):
with mock.patch.object(cls, 'objects') as manager:
cls.create_from_data(**{
job_identifier: 123,
'extra_key': 'extra_value'
})
manager.create.assert_called_with(**{job_identifier: 123})

View File

@ -31,6 +31,7 @@ from awx.main.models import (
)
from awx.main import tasks
from awx.main.queue import CallbackQueueDispatcher
from awx.main.utils import encrypt_field, encrypt_value
@ -199,6 +200,7 @@ class TestJobExecution:
self.run_pexpect.return_value = ['successful', 0]
self.patches = [
mock.patch.object(CallbackQueueDispatcher, 'dispatch', lambda obj: None),
mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path),
# don't emit websocket statuses; they use the DB and complicate testing
mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()),

View File

@ -1,52 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import mock
from mock import Mock
from StringIO import StringIO
from django.utils.timezone import now
# AWX
from awx.main import models
# stdout file present
@mock.patch('os.path.exists', return_value=True)
@mock.patch('codecs.open', return_value='my_file_handler')
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle_file__found(exists, open):
unified_job = models.UnifiedJob()
with mock.patch('os.stat', return_value=Mock(st_size=1)):
result = unified_job.result_stdout_raw_handle()
assert result == 'my_file_handler'
# stdout file missing, job finished
@mock.patch('os.path.exists', return_value=False)
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle__missing(exists):
unified_job = models.UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = now()
result = unified_job.result_stdout_raw_handle()
assert isinstance(result, StringIO)
assert result.read() == 'stdout capture is missing'
# stdout file missing, job not finished
@mock.patch('os.path.exists', return_value=False)
@mock.patch.object(models.UnifiedJob, 'result_stdout_text', '')
def test_result_stdout_raw_handle__pending(exists):
unified_job = models.UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = None
result = unified_job.result_stdout_raw_handle()
assert isinstance(result, StringIO)
assert result.read() == 'Waiting for results...'

View File

@ -1,4 +1,3 @@
import cStringIO
import pytest
import base64
import json
@ -33,8 +32,7 @@ def fake_cache():
@pytest.fixture
def wrapped_handle(job_event_callback):
# Preliminary creation of resources usually done in tasks.py
stdout_handle = cStringIO.StringIO()
return OutputEventFilter(stdout_handle, job_event_callback)
return OutputEventFilter(job_event_callback)
@pytest.fixture
@ -80,15 +78,6 @@ def test_separate_verbose_events(fake_callback, wrapped_handle):
assert event_data['event'] == 'verbose'
def test_verbose_event_no_markings(fake_callback, wrapped_handle):
'''
This occurs with jobs that do not have events but still generate
and output stream, like system jobs
'''
wrapped_handle.write('Running tower-manage command \n')
assert wrapped_handle._fileobj.getvalue() == 'Running tower-manage command \n'
def test_large_data_payload(fake_callback, fake_cache, wrapped_handle):
# Pretend that this is done by the Ansible callback module
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}

View File

@ -845,25 +845,23 @@ class OutputEventFilter(object):
EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
def __init__(self, fileobj=None, event_callback=None, raw_callback=None):
self._fileobj = fileobj
def __init__(self, event_callback):
self._event_callback = event_callback
self._event_ct = 0
self._raw_callback = raw_callback
self._counter = 1
self._start_line = 0
self._buffer = ''
self._current_event_data = None
def __getattr__(self, attr):
return getattr(self._fileobj, attr)
def flush(self):
# pexpect wants to flush the file it writes to, but we're not
# actually capturing stdout to a raw file; we're just
# implementing a custom `write` method to discover and emit events from
# the stdout stream
pass
def write(self, data):
if self._fileobj:
self._fileobj.write(data)
self._buffer += data
if self._raw_callback:
self._raw_callback(data)
while True:
match = self.EVENT_DATA_RE.search(self._buffer)
if not match:
@ -877,13 +875,13 @@ class OutputEventFilter(object):
self._buffer = self._buffer[match.end():]
def close(self):
if self._fileobj:
self._fileobj.close()
if self._buffer:
self._emit_event(self._buffer)
self._buffer = ''
self._event_callback(dict(event='EOF'))
def _emit_event(self, buffered_stdout, next_event_data=None):
next_event_data = next_event_data or {}
if self._current_event_data:
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]

View File

@ -216,15 +216,14 @@ export default
self.unsubscribe(state);
}
else{
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "job_events")){
state.data.socket.groups.job_events = [id];
}
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "ad_hoc_command_events")){
state.data.socket.groups.ad_hoc_command_events = [id];
}
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "workflow_events")){
state.data.socket.groups.workflow_events = [id];
}
["job_events", "ad_hoc_command_events", "workflow_events",
"project_update_events", "inventory_update_events",
"system_job_events"
].forEach(function(group) {
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty(group)){
state.data.socket.groups[group] = [id];
}
});
self.subscribe(state);
}
return true;

View File

@ -19,7 +19,7 @@ export default {
jobType: 'ad_hoc_commands',
socket: {
"groups": {
"jobs": ["status_changed"],
"jobs": ["status_changed", "summary"],
"ad_hoc_command_events": []
}
}

View File

@ -20,7 +20,8 @@ export default {
data: {
socket: {
"groups":{
"jobs": ["status_changed"]
"jobs": ["status_changed", "summary"],
"inventory_update_events": [],
}
},
jobType: 'inventory_updates'

View File

@ -19,7 +19,8 @@ export default {
jobType: 'system_jobs',
socket: {
"groups": {
"jobs": ["status_changed"]
"jobs": ["status_changed", "summary"],
"system_job_events": [],
}
}
}

View File

@ -21,7 +21,8 @@ export default {
jobType: 'project_updates',
socket: {
"groups": {
"jobs": ["status_changed"]
"jobs": ["status_changed", "summary"],
"project_update_events": [],
}
},
}