From 5dfe0b205f7830291aa438c8ed3ef0e3ac154e0a Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 11 Aug 2017 14:11:28 -0400 Subject: [PATCH] pass over job-run exceptions & log ID --- awx/main/models/unified_jobs.py | 17 +++++++----- awx/main/tasks.py | 46 ++++++++++++++++++++------------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index b6fb28a27b..5fb5e762b4 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -912,14 +912,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return websocket_data def _websocket_emit_status(self, status): - status_data = dict(unified_job_id=self.id, status=status) - status_data.update(self.websocket_emit_data()) - status_data['group_name'] = 'jobs' - emit_channel_notification('jobs-status_changed', status_data) + try: + status_data = dict(unified_job_id=self.id, status=status) + status_data.update(self.websocket_emit_data()) + status_data['group_name'] = 'jobs' + emit_channel_notification('jobs-status_changed', status_data) - if self.spawned_by_workflow: - status_data['group_name'] = "workflow_events" - emit_channel_notification('workflow_events-' + str(self.workflow_job_id), status_data) + if self.spawned_by_workflow: + status_data['group_name'] = "workflow_events" + emit_channel_notification('workflow_events-' + str(self.workflow_job_id), status_data) + except IOError: # includes socket errors + logger.exception('%s failed to emit channel msg about status change', self.log_format) def websocket_emit_status(self, status): connection.on_commit(lambda: self._websocket_emit_status(status)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 4cec08997d..3c07f21131 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -55,7 +55,8 @@ from awx.main.isolated import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, get_licenser, wrap_args_with_proot, get_system_task_capacity, OutputEventFilter, - parse_yaml_or_json, ignore_inventory_computed_fields, ignore_inventory_group_removal) + parse_yaml_or_json, ignore_inventory_computed_fields, ignore_inventory_group_removal, + get_type_for_model) from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification @@ -79,7 +80,12 @@ logger = logging.getLogger('awx.main.tasks') class LogErrorsTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) + if isinstance(self, BaseTask): + logger.exception( + '%s %s execution encountered exception.', + get_type_for_model(self.model), args[0]) + else: + logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) @@ -352,12 +358,12 @@ def handle_work_error(self, task_id, subtasks=None): for each_task in subtasks: try: instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id']) + if not instance: + # Unknown task type + logger.warn("Unknown task type: {}".format(each_task['type'])) + continue except ObjectDoesNotExist: - logger.warning('Missing {} `{}` in success callback.'.format(each_task['type'], each_task['id'])) - instance = None - if not instance: - # Unknown task type - logger.warn("Unknown task type: {}".format(each_task['type'])) + logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id'])) continue if first_instance is None: @@ -866,7 +872,7 @@ class BaseTask(LogErrorsTask): if status != 'canceled': tb = traceback.format_exc() if settings.DEBUG: - logger.exception('exception occurred while running task') + logger.exception('%s Exception occurred while running task', instance.log_format) finally: try: stdout_handle.flush() @@ -877,7 +883,7 @@ class BaseTask(LogErrorsTask): try: self.post_run_hook(instance, status, **kwargs) except Exception: - logger.exception('Post run hook of unified job {} errored.'.format(instance.pk)) + logger.exception('{} Post run hook errored.'.format(instance.log_format)) instance = self.update_model(pk) if instance.cancel_flag: status = 'canceled' @@ -885,16 +891,19 @@ class BaseTask(LogErrorsTask): instance = self.update_model(pk, status=status, result_traceback=tb, output_replacements=output_replacements, **extra_update_fields) - self.final_run_hook(instance, status, **kwargs) + try: + self.final_run_hook(instance, status, **kwargs) + except: + logger.exception('%s Final run hook errored.', instance.log_format) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute if status == 'canceled': - raise Exception("Task %s(pk:%s) was canceled (rc=%s)" % (str(self.model.__class__), str(pk), str(rc))) + raise Exception("%s was canceled (rc=%s)" % (instance.log_format, str(rc))) else: - raise Exception("Task %s(pk:%s) encountered an error (rc=%s), please see task stdout for details." % - (str(self.model.__class__), str(pk), str(rc))) + raise Exception("%s encountered an error (rc=%s), please see task stdout for details." % + (instance.log_format, str(rc))) if not hasattr(settings, 'CELERY_UNIT_TEST'): self.signal_finished(pk) @@ -1503,7 +1512,8 @@ class RunProjectUpdate(BaseTask): task_instance.request.id = project_request_id task_instance.run(local_inv_update.id) except Exception: - logger.exception('Encountered unhandled exception updating dependent SCM inventory sources.') + logger.exception('%s Unhandled exception updating dependent SCM inventory sources.', + project_update.log_format) try: project_update.refresh_from_db() @@ -1513,7 +1523,7 @@ class RunProjectUpdate(BaseTask): try: local_inv_update.refresh_from_db() except InventoryUpdate.DoesNotExist: - logger.warning('Inventory update deleted during execution.') + logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format) continue if project_update.cancel_flag or local_inv_update.cancel_flag: if not project_update.cancel_flag: @@ -1573,7 +1583,7 @@ class RunProjectUpdate(BaseTask): if lines: p.scm_revision = lines[0].strip() else: - logger.info("Could not find scm revision in check") + logger.info("%s Could not find scm revision in check", instance.log_format) p.playbook_files = p.playbooks p.inventory_files = p.inventories p.save() @@ -2196,8 +2206,8 @@ class RunSystemJob(BaseTask): args.extend(['--older_than', str(json_vars['older_than'])]) if 'granularity' in json_vars: args.extend(['--granularity', str(json_vars['granularity'])]) - except Exception as e: - logger.error("Failed to parse system job: " + str(e)) + except Exception: + logger.exception("%s Failed to parse system job", instance.log_format) return args def get_stdout_handle(self, instance):