Track emitted events on model

This commit is contained in:
AlanCoding 2018-03-12 14:39:28 -04:00
parent 0db584e23e
commit b803a6e557
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
6 changed files with 82 additions and 22 deletions

View File

@ -685,13 +685,18 @@ class UnifiedJobTemplateSerializer(BaseSerializer):
class UnifiedJobSerializer(BaseSerializer):
show_capabilities = ['start', 'delete']
events_processed = serializers.BooleanField(
help_text=_('Indicates whether all of the events generated by this '
'unified job have been saved to the database.'),
read_only=True
)
class Meta:
model = UnifiedJob
fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'execution_node',
'result_traceback')
'result_traceback', 'events_processed')
extra_kwargs = {
'unified_job_template': {
'source': 'unified_job_template_id',
@ -781,13 +786,13 @@ class UnifiedJobSerializer(BaseSerializer):
class UnifiedJobListSerializer(UnifiedJobSerializer):
class Meta:
fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback')
fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback', '-events_processed')
def get_field_names(self, declared_fields, info):
field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info)
# Meta multiple inheritance and -field_name options don't seem to be
# taking effect above, so remove the undesired fields here.
return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback'))
return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback', 'events_processed'))
def get_types(self):
if type(self) is UnifiedJobListSerializer:
@ -3503,7 +3508,8 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta:
model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', '-execution_node',)
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous',
'-execution_node', '-events_processed',)
def get_related(self, obj):
res = super(WorkflowJobSerializer, self).get_related(obj)

View File

@ -145,6 +145,16 @@ class UnifiedJobDeletionMixin(object):
# Still allow deletion of new status, because these can be manually created
if obj.status in ACTIVE_STATES and obj.status != 'new':
raise PermissionDenied(detail=_("Cannot delete running job resource."))
elif not obj.events_processed:
# Prohibit deletion if job events are still coming in
if obj.finished and now() < obj.finished + dateutil.relativedelta.relativedelta(minutes=1):
# less than 1 minute has passed since job finished and events are not in
return Response({"error": _("Job has not finished processing events.")},
status=status.HTTP_400_BAD_REQUEST)
else:
# if it has been > 1 minute, events are probably lost
logger.warning('Allowing deletion of {} through the API without all events '
'processed.'.format(obj.log_format))
obj.delete()
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-03-12 17:47
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0025_v330_delete_authtoken'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='emitted_events',
field=models.PositiveIntegerField(default=0, editable=False),
),
]

View File

@ -570,6 +570,12 @@ class BaseCommandEvent(CreatedModifiedModel):
return self.objects.create(**kwargs)
def get_event_display(self):
'''
Needed for __unicode__
'''
return self.event
class AdHocCommandEvent(BaseCommandEvent):

View File

@ -547,6 +547,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
default=None,
editable=False,
)
emitted_events = models.PositiveIntegerField(
default=0,
editable=False,
)
unified_job_template = models.ForeignKey(
'UnifiedJobTemplate',
null=True, # Some jobs can be run without a template.
@ -905,6 +909,29 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
related.result_stdout_text = value
related.save()
@property
def event_parent_key(self):
tablename = self._meta.db_table
return {
'main_job': 'job_id',
'main_adhoccommand': 'ad_hoc_command_id',
'main_projectupdate': 'project_update_id',
'main_inventoryupdate': 'inventory_update_id',
'main_systemjob': 'system_job_id',
}[tablename]
def get_event_queryset(self):
return self.event_class.objects.filter(**{self.event_parent_key: self.id})
@property
def events_processed(self):
'''
Returns True / False, whether all events from job have been saved
'''
if self.status in ACTIVE_STATES:
return False # tally of events is only available at end of run
return self.emitted_events == self.get_event_queryset().count()
def result_stdout_raw_handle(self, enforce_max_bytes=True):
"""
This method returns a file-like object ready to be read which contains
@ -960,20 +987,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# (`stdout`) directly to a file
with connection.cursor() as cursor:
tablename = self._meta.db_table
related_name = {
'main_job': 'job_id',
'main_adhoccommand': 'ad_hoc_command_id',
'main_projectupdate': 'project_update_id',
'main_inventoryupdate': 'inventory_update_id',
'main_systemjob': 'system_job_id',
}[tablename]
if enforce_max_bytes:
# detect the length of all stdout for this UnifiedJob, and
# if it exceeds settings.STDOUT_MAX_BYTES_DISPLAY bytes,
# don't bother actually fetching the data
total = self.event_class.objects.filter(**{related_name: self.id}).aggregate(
total = self.get_event_queryset().aggregate(
total=models.Sum(models.Func(models.F('stdout'), function='LENGTH'))
)['total']
if total > max_supported:
@ -981,8 +1000,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
cursor.copy_expert(
"copy (select stdout from {} where {}={} order by start_line) to stdout".format(
tablename + 'event',
related_name,
self._meta.db_table + 'event',
self.event_parent_key,
self.id
),
fd

View File

@ -883,6 +883,7 @@ class BaseTask(LogErrorsTask):
status, rc, tb = 'error', None, ''
output_replacements = []
extra_update_fields = {}
event_ct = 0
try:
kwargs['isolated'] = isolated_host is not None
self.pre_run_hook(instance, **kwargs)
@ -1001,14 +1002,11 @@ class BaseTask(LogErrorsTask):
try:
stdout_handle.flush()
stdout_handle.close()
# If stdout_handle was wrapped with event filter, log data
if hasattr(stdout_handle, '_event_ct'):
logger.info('%s finished running, producing %s events.',
instance.log_format, stdout_handle._event_ct)
else:
logger.info('%s finished running', instance.log_format)
event_ct = getattr(stdout_handle, '_event_ct', 0)
logger.info('%s finished running, producing %s events.',
instance.log_format, event_ct)
except Exception:
pass
logger.exception('Error flushing job stdout and saving event count.')
try:
self.post_run_hook(instance, status, **kwargs)
@ -1020,6 +1018,7 @@ class BaseTask(LogErrorsTask):
instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements,
emitted_events=event_ct,
**extra_update_fields)
try:
self.final_run_hook(instance, status, **kwargs)