From b803a6e55783686ac0c9748af769cb90d7b445c3 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 12 Mar 2018 14:39:28 -0400 Subject: [PATCH] Track emitted events on model --- awx/api/serializers.py | 14 +++++-- awx/api/views.py | 10 +++++ .../migrations/0026_v330_emitted_events.py | 20 +++++++++ awx/main/models/events.py | 6 +++ awx/main/models/unified_jobs.py | 41 ++++++++++++++----- awx/main/tasks.py | 13 +++--- 6 files changed, 82 insertions(+), 22 deletions(-) create mode 100644 awx/main/migrations/0026_v330_emitted_events.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 2a98884d52..4bb51b6bb0 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -685,13 +685,18 @@ class UnifiedJobTemplateSerializer(BaseSerializer): class UnifiedJobSerializer(BaseSerializer): show_capabilities = ['start', 'delete'] + events_processed = serializers.BooleanField( + help_text=_('Indicates whether all of the events generated by this ' + 'unified job have been saved to the database.'), + read_only=True + ) class Meta: model = UnifiedJob fields = ('*', 'unified_job_template', 'launch_type', 'status', 'failed', 'started', 'finished', 'elapsed', 'job_args', 'job_cwd', 'job_env', 'job_explanation', 'execution_node', - 'result_traceback') + 'result_traceback', 'events_processed') extra_kwargs = { 'unified_job_template': { 'source': 'unified_job_template_id', @@ -781,13 +786,13 @@ class UnifiedJobSerializer(BaseSerializer): class UnifiedJobListSerializer(UnifiedJobSerializer): class Meta: - fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback') + fields = ('*', '-job_args', '-job_cwd', '-job_env', '-result_traceback', '-events_processed') def get_field_names(self, declared_fields, info): field_names = super(UnifiedJobListSerializer, self).get_field_names(declared_fields, info) # Meta multiple inheritance and -field_name options don't seem to be # taking effect above, so remove the undesired fields here. - return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback')) + return tuple(x for x in field_names if x not in ('job_args', 'job_cwd', 'job_env', 'result_traceback', 'events_processed')) def get_types(self): if type(self) is UnifiedJobListSerializer: @@ -3503,7 +3508,8 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class Meta: model = WorkflowJob - fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', '-execution_node',) + fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', + '-execution_node', '-events_processed',) def get_related(self, obj): res = super(WorkflowJobSerializer, self).get_related(obj) diff --git a/awx/api/views.py b/awx/api/views.py index 7515e9ce94..9d60056b58 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -145,6 +145,16 @@ class UnifiedJobDeletionMixin(object): # Still allow deletion of new status, because these can be manually created if obj.status in ACTIVE_STATES and obj.status != 'new': raise PermissionDenied(detail=_("Cannot delete running job resource.")) + elif not obj.events_processed: + # Prohibit deletion if job events are still coming in + if obj.finished and now() < obj.finished + dateutil.relativedelta.relativedelta(minutes=1): + # less than 1 minute has passed since job finished and events are not in + return Response({"error": _("Job has not finished processing events.")}, + status=status.HTTP_400_BAD_REQUEST) + else: + # if it has been > 1 minute, events are probably lost + logger.warning('Allowing deletion of {} through the API without all events ' + 'processed.'.format(obj.log_format)) obj.delete() return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/awx/main/migrations/0026_v330_emitted_events.py b/awx/main/migrations/0026_v330_emitted_events.py new file mode 100644 index 0000000000..cfd995c751 --- /dev/null +++ b/awx/main/migrations/0026_v330_emitted_events.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-03-12 17:47 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0025_v330_delete_authtoken'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='emitted_events', + field=models.PositiveIntegerField(default=0, editable=False), + ), + ] diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 2a1e7ebc7f..09da2ffb20 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -570,6 +570,12 @@ class BaseCommandEvent(CreatedModifiedModel): return self.objects.create(**kwargs) + def get_event_display(self): + ''' + Needed for __unicode__ + ''' + return self.event + class AdHocCommandEvent(BaseCommandEvent): diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 0dca683ee2..d17d392434 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -547,6 +547,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique default=None, editable=False, ) + emitted_events = models.PositiveIntegerField( + default=0, + editable=False, + ) unified_job_template = models.ForeignKey( 'UnifiedJobTemplate', null=True, # Some jobs can be run without a template. @@ -905,6 +909,29 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique related.result_stdout_text = value related.save() + @property + def event_parent_key(self): + tablename = self._meta.db_table + return { + 'main_job': 'job_id', + 'main_adhoccommand': 'ad_hoc_command_id', + 'main_projectupdate': 'project_update_id', + 'main_inventoryupdate': 'inventory_update_id', + 'main_systemjob': 'system_job_id', + }[tablename] + + def get_event_queryset(self): + return self.event_class.objects.filter(**{self.event_parent_key: self.id}) + + @property + def events_processed(self): + ''' + Returns True / False, whether all events from job have been saved + ''' + if self.status in ACTIVE_STATES: + return False # tally of events is only available at end of run + return self.emitted_events == self.get_event_queryset().count() + def result_stdout_raw_handle(self, enforce_max_bytes=True): """ This method returns a file-like object ready to be read which contains @@ -960,20 +987,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # (`stdout`) directly to a file with connection.cursor() as cursor: - tablename = self._meta.db_table - related_name = { - 'main_job': 'job_id', - 'main_adhoccommand': 'ad_hoc_command_id', - 'main_projectupdate': 'project_update_id', - 'main_inventoryupdate': 'inventory_update_id', - 'main_systemjob': 'system_job_id', - }[tablename] if enforce_max_bytes: # detect the length of all stdout for this UnifiedJob, and # if it exceeds settings.STDOUT_MAX_BYTES_DISPLAY bytes, # don't bother actually fetching the data - total = self.event_class.objects.filter(**{related_name: self.id}).aggregate( + total = self.get_event_queryset().aggregate( total=models.Sum(models.Func(models.F('stdout'), function='LENGTH')) )['total'] if total > max_supported: @@ -981,8 +1000,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique cursor.copy_expert( "copy (select stdout from {} where {}={} order by start_line) to stdout".format( - tablename + 'event', - related_name, + self._meta.db_table + 'event', + self.event_parent_key, self.id ), fd diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1f5a966def..3afb479a25 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -883,6 +883,7 @@ class BaseTask(LogErrorsTask): status, rc, tb = 'error', None, '' output_replacements = [] extra_update_fields = {} + event_ct = 0 try: kwargs['isolated'] = isolated_host is not None self.pre_run_hook(instance, **kwargs) @@ -1001,14 +1002,11 @@ class BaseTask(LogErrorsTask): try: stdout_handle.flush() stdout_handle.close() - # If stdout_handle was wrapped with event filter, log data - if hasattr(stdout_handle, '_event_ct'): - logger.info('%s finished running, producing %s events.', - instance.log_format, stdout_handle._event_ct) - else: - logger.info('%s finished running', instance.log_format) + event_ct = getattr(stdout_handle, '_event_ct', 0) + logger.info('%s finished running, producing %s events.', + instance.log_format, event_ct) except Exception: - pass + logger.exception('Error flushing job stdout and saving event count.') try: self.post_run_hook(instance, status, **kwargs) @@ -1020,6 +1018,7 @@ class BaseTask(LogErrorsTask): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements, + emitted_events=event_ct, **extra_update_fields) try: self.final_run_hook(instance, status, **kwargs)