diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1172027653..4259b3cc96 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -74,6 +74,12 @@ Try upgrading OpenSSH or providing your private key in an different format. \ logger = logging.getLogger('awx.main.tasks') +class LogErrorsTask(Task): + def on_failure(self, exc, task_id, args, kwargs, einfo): + logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) + super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + @celeryd_init.connect def celery_startup(conf=None, **kwargs): # Re-init all schedules @@ -86,8 +92,8 @@ def celery_startup(conf=None, **kwargs): from awx.main.signals import disable_activity_stream with disable_activity_stream(): sch.save() - except Exception as e: - logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) + except: + logger.exception("Failed to rebuild schedule {}.".format(sch)) @worker_process_init.connect @@ -96,7 +102,7 @@ def task_set_logger_pre_run(*args, **kwargs): configure_external_logger(settings, is_startup=False) -@task(queue='tower_broadcast_all', bind=True) +@task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) def handle_setting_changes(self, setting_keys): orig_len = len(setting_keys) for i in range(orig_len): @@ -113,7 +119,7 @@ def handle_setting_changes(self, setting_keys): break -@task(queue='tower') +@task(queue='tower', base=LogErrorsTask) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -137,7 +143,7 @@ def send_notifications(notification_list, job_id=None): notification.save() -@task(bind=True, queue='tower') +@task(bind=True, queue='tower', base=LogErrorsTask) def run_administrative_checks(self): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: @@ -159,13 +165,13 @@ def run_administrative_checks(self): fail_silently=True) -@task(bind=True, queue='tower') +@task(bind=True, queue='tower', base=LogErrorsTask) def cleanup_authtokens(self): logger.warn("Cleaning up expired authtokens.") AuthToken.objects.filter(expires__lt=now()).delete() -@task(bind=True) +@task(bind=True, base=LogErrorsTask) def purge_old_stdout_files(self): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): @@ -174,7 +180,7 @@ def purge_old_stdout_files(self): logger.info("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f))) -@task(bind=True) +@task(bind=True, base=LogErrorsTask) def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") nowtime = now() @@ -206,7 +212,7 @@ def cluster_node_heartbeat(self): raise RuntimeError("Shutting down.") -@task(bind=True) +@task(bind=True, base=LogErrorsTask) def tower_isolated_heartbeat(self): local_hostname = settings.CLUSTER_HOST_ID logger.debug("Controlling node checking for any isolated management tasks.") @@ -230,7 +236,7 @@ def tower_isolated_heartbeat(self): isolated_manager.IsolatedManager.health_check(isolated_instance_qs) -@task(bind=True, queue='tower') +@task(bind=True, queue='tower', base=LogErrorsTask) def tower_periodic_scheduler(self): run_now = now() state = TowerScheduleState.get_solo() @@ -280,7 +286,7 @@ def _send_notification_templates(instance, status_str): job_id=instance.id) -@task(bind=True, queue='tower') +@task(bind=True, queue='tower', base=LogErrorsTask) def handle_work_success(self, result, task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -296,7 +302,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@task(bind=True, queue='tower') +@task(bind=True, queue='tower', base=LogErrorsTask) def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -336,7 +342,7 @@ def handle_work_error(self, task_id, subtasks=None): pass -@task(queue='tower') +@task(queue='tower', base=LogErrorsTask) def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -350,7 +356,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): i.update_computed_fields(update_hosts=should_update_hosts) -@task(queue='tower') +@task(queue='tower', base=LogErrorsTask) def update_host_smart_inventory_memberships(): try: with transaction.atomic(): @@ -366,7 +372,7 @@ def update_host_smart_inventory_memberships(): return -@task(queue='tower') +@task(queue='tower', base=LogErrorsTask) def delete_inventory(inventory_id): with ignore_inventory_computed_fields(), \ ignore_inventory_group_removal(): @@ -401,7 +407,7 @@ def with_path_cleanup(f): return _wrapped -class BaseTask(Task): +class BaseTask(LogErrorsTask): name = None model = None abstract = True