From 4c0096a5240f7e6ce8eb8e0a58bd58590cf8670d Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 5 Apr 2018 10:43:19 -0400 Subject: [PATCH] implement celery failure logging using CELERY_ANNOTATIONS see: https://github.com/ansible/awx/issues/1720 see: https://github.com/ansible/tower/issues/1190 --- awx/main/scheduler/tasks.py | 14 ++++-------- awx/main/tasks.py | 45 ++++++++++++++++++++----------------- awx/settings/defaults.py | 10 +++++++++ 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 89e36f6a93..194b188146 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -3,7 +3,7 @@ import logging # Celery -from celery import Task, shared_task +from celery import shared_task # AWX from awx.main.scheduler import TaskManager @@ -15,23 +15,17 @@ logger = logging.getLogger('awx.main.scheduler') # updated model, the call to schedule() may get stale data. -class LogErrorsTask(Task): - def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) - super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) - - -@shared_task(base=LogErrorsTask) +@shared_task() def run_job_launch(job_id): TaskManager().schedule() -@shared_task(base=LogErrorsTask) +@shared_task() def run_job_complete(job_id): TaskManager().schedule() -@shared_task(base=LogErrorsTask) +@shared_task() def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6f7d855ab6..36fc1fd57e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -80,8 +80,8 @@ Try upgrading OpenSSH or providing your private key in an different format. \ logger = logging.getLogger('awx.main.tasks') -class LogErrorsTask(Task): - def on_failure(self, exc, task_id, args, kwargs, einfo): +def log_celery_failure(self, exc, task_id, args, kwargs, einfo): + try: if getattr(exc, 'is_awx_task_error', False): # Error caused by user / tracked in job output logger.warning(six.text_type("{}").format(exc)) @@ -91,7 +91,12 @@ class LogErrorsTask(Task): .format(get_type_for_model(self.model), args[0])) else: logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc) - super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) + except Exception: + # It's fairly critical that this code _not_ raise exceptions on logging + # If you configure external logging in a way that _it_ fails, there's + # not a lot we can do here; sys.stderr.write is a final hail mary + _, _, tb = sys.exc_info() + traceback.print_tb(tb) @celeryd_init.connect @@ -116,7 +121,6 @@ def task_set_logger_pre_run(*args, **kwargs): cache.close() configure_external_logger(settings, is_startup=False) except Exception: - # General exception because LogErrorsTask not used with celery signals logger.exception('Encountered error on initial log configuration.') @@ -129,11 +133,10 @@ def inform_cluster_of_shutdown(*args, **kwargs): logger.warning(six.text_type('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.').format(this_inst.hostname)) except Exception: - # General exception because LogErrorsTask not used with celery signals logger.exception('Encountered problem with normal shutdown signal.') -@shared_task(bind=True, queue='tower_instance_router', base=LogErrorsTask) +@shared_task(bind=True, queue='tower_instance_router') def apply_cluster_membership_policies(self): with advisory_lock('cluster_policy_lock', wait=True): considered_instances = Instance.objects.all().order_by('id') @@ -195,7 +198,7 @@ def apply_cluster_membership_policies(self): handle_ha_toplogy_changes.apply([]) -@shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) +@shared_task(queue='tower_broadcast_all', bind=True) def handle_setting_changes(self, setting_keys): orig_len = len(setting_keys) for i in range(orig_len): @@ -214,7 +217,7 @@ def handle_setting_changes(self, setting_keys): restart_local_services(['uwsgi']) -@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask) +@shared_task(bind=True, queue='tower_broadcast_all') def handle_ha_toplogy_changes(self): (changed, instance) = Instance.objects.get_or_register() if changed: @@ -265,7 +268,7 @@ def handle_update_celery_hostname(sender, instance, **kwargs): logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) -@shared_task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower') def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -289,7 +292,7 @@ def send_notifications(notification_list, job_id=None): notification.save() -@shared_task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower') def run_administrative_checks(self): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: @@ -311,7 +314,7 @@ def run_administrative_checks(self): fail_silently=True) -@shared_task(bind=True, base=LogErrorsTask) +@shared_task(bind=True) def purge_old_stdout_files(self): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): @@ -320,7 +323,7 @@ def purge_old_stdout_files(self): logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f))) -@shared_task(bind=True, base=LogErrorsTask) +@shared_task(bind=True) def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") nowtime = now() @@ -393,7 +396,7 @@ def cluster_node_heartbeat(self): logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname)) -@shared_task(bind=True, base=LogErrorsTask) +@shared_task(bind=True) def awx_isolated_heartbeat(self): local_hostname = settings.CLUSTER_HOST_ID logger.debug("Controlling node checking for any isolated management tasks.") @@ -417,7 +420,7 @@ def awx_isolated_heartbeat(self): isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) -@shared_task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower') def awx_periodic_scheduler(self): run_now = now() state = TowerScheduleState.get_solo() @@ -452,7 +455,7 @@ def awx_periodic_scheduler(self): state.save() -@shared_task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower') def handle_work_success(self, result, task_actual): try: instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -466,7 +469,7 @@ def handle_work_success(self, result, task_actual): run_job_complete.delay(instance.id) -@shared_task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower') def handle_work_error(task_id, *args, **kwargs): subtasks = kwargs.get('subtasks', None) logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) @@ -507,7 +510,7 @@ def handle_work_error(task_id, *args, **kwargs): pass -@shared_task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -527,7 +530,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): raise -@shared_task(queue='tower', base=LogErrorsTask) +@shared_task(queue='tower') def update_host_smart_inventory_memberships(): try: with transaction.atomic(): @@ -552,7 +555,7 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields(update_groups=False, update_hosts=False) -@shared_task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5) +@shared_task(bind=True, queue='tower', max_retries=5) def delete_inventory(self, inventory_id, user_id): # Delete inventory as user if user_id is None: @@ -597,7 +600,7 @@ def with_path_cleanup(f): return _wrapped -class BaseTask(LogErrorsTask): +class BaseTask(Task): name = None model = None event_model = None @@ -2329,7 +2332,7 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@shared_task(bind=True, queue='tower', base=LogErrorsTask) +@shared_task(bind=True, queue='tower') def deep_copy_model_obj( self, model_module, model_name, obj_pk, new_obj_pk, user_pk, sub_obj_list, permission_check_func=None diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 645947eb60..0b179c8515 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -469,6 +469,16 @@ CELERY_QUEUES = ( ) CELERY_ROUTES = {} + +def log_celery_failure(*args): + # Import annotations lazily to avoid polluting the `awx.settings` namespace + # and causing circular imports + from awx.main.tasks import log_celery_failure + return log_celery_failure(*args) + + +CELERY_ANNOTATIONS = {'*': {'on_failure': log_celery_failure}} + CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERYBEAT_SCHEDULE = {