Merge pull request #1252 from ryanpetrello/celery-failure-handler

implement celery failure logging using CELERY_ANNOTATIONS
This commit is contained in:
Ryan Petrello
2018-04-06 13:40:34 -04:00
committed by GitHub
3 changed files with 38 additions and 31 deletions

View File

@@ -3,7 +3,7 @@
import logging import logging
# Celery # Celery
from celery import Task, shared_task from celery import shared_task
# AWX # AWX
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager
@@ -15,23 +15,17 @@ logger = logging.getLogger('awx.main.scheduler')
# updated model, the call to schedule() may get stale data. # updated model, the call to schedule() may get stale data.
class LogErrorsTask(Task): @shared_task()
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@shared_task(base=LogErrorsTask)
def run_job_launch(job_id): def run_job_launch(job_id):
TaskManager().schedule() TaskManager().schedule()
@shared_task(base=LogErrorsTask) @shared_task()
def run_job_complete(job_id): def run_job_complete(job_id):
TaskManager().schedule() TaskManager().schedule()
@shared_task(base=LogErrorsTask) @shared_task()
def run_task_manager(): def run_task_manager():
logger.debug("Running Tower task manager.") logger.debug("Running Tower task manager.")
TaskManager().schedule() TaskManager().schedule()

View File

@@ -80,8 +80,8 @@ Try upgrading OpenSSH or providing your private key in an different format. \
logger = logging.getLogger('awx.main.tasks') logger = logging.getLogger('awx.main.tasks')
class LogErrorsTask(Task): def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
def on_failure(self, exc, task_id, args, kwargs, einfo): try:
if getattr(exc, 'is_awx_task_error', False): if getattr(exc, 'is_awx_task_error', False):
# Error caused by user / tracked in job output # Error caused by user / tracked in job output
logger.warning(six.text_type("{}").format(exc)) logger.warning(six.text_type("{}").format(exc))
@@ -91,7 +91,12 @@ class LogErrorsTask(Task):
.format(get_type_for_model(self.model), args[0])) .format(get_type_for_model(self.model), args[0]))
else: else:
logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc) logger.exception(six.text_type('Task {} encountered exception.').format(self.name), exc_info=exc)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) except Exception:
# It's fairly critical that this code _not_ raise exceptions on logging
# If you configure external logging in a way that _it_ fails, there's
# not a lot we can do here; sys.stderr.write is a final hail mary
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
@celeryd_init.connect @celeryd_init.connect
@@ -116,7 +121,6 @@ def task_set_logger_pre_run(*args, **kwargs):
cache.close() cache.close()
configure_external_logger(settings, is_startup=False) configure_external_logger(settings, is_startup=False)
except Exception: except Exception:
# General exception because LogErrorsTask not used with celery signals
logger.exception('Encountered error on initial log configuration.') logger.exception('Encountered error on initial log configuration.')
@@ -129,11 +133,10 @@ def inform_cluster_of_shutdown(*args, **kwargs):
logger.warning(six.text_type('Normal shutdown signal for instance {}, ' logger.warning(six.text_type('Normal shutdown signal for instance {}, '
'removed self from capacity pool.').format(this_inst.hostname)) 'removed self from capacity pool.').format(this_inst.hostname))
except Exception: except Exception:
# General exception because LogErrorsTask not used with celery signals
logger.exception('Encountered problem with normal shutdown signal.') logger.exception('Encountered problem with normal shutdown signal.')
@shared_task(bind=True, queue='tower_instance_router', base=LogErrorsTask) @shared_task(bind=True, queue='tower_instance_router')
def apply_cluster_membership_policies(self): def apply_cluster_membership_policies(self):
with advisory_lock('cluster_policy_lock', wait=True): with advisory_lock('cluster_policy_lock', wait=True):
considered_instances = Instance.objects.all().order_by('id') considered_instances = Instance.objects.all().order_by('id')
@@ -195,7 +198,7 @@ def apply_cluster_membership_policies(self):
handle_ha_toplogy_changes.apply([]) handle_ha_toplogy_changes.apply([])
@shared_task(queue='tower_broadcast_all', bind=True, base=LogErrorsTask) @shared_task(queue='tower_broadcast_all', bind=True)
def handle_setting_changes(self, setting_keys): def handle_setting_changes(self, setting_keys):
orig_len = len(setting_keys) orig_len = len(setting_keys)
for i in range(orig_len): for i in range(orig_len):
@@ -214,7 +217,7 @@ def handle_setting_changes(self, setting_keys):
restart_local_services(['uwsgi']) restart_local_services(['uwsgi'])
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask) @shared_task(bind=True, queue='tower_broadcast_all')
def handle_ha_toplogy_changes(self): def handle_ha_toplogy_changes(self):
(changed, instance) = Instance.objects.get_or_register() (changed, instance) = Instance.objects.get_or_register()
if changed: if changed:
@@ -265,7 +268,7 @@ def handle_update_celery_hostname(sender, instance, **kwargs):
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
@shared_task(queue='tower', base=LogErrorsTask) @shared_task(queue='tower')
def send_notifications(notification_list, job_id=None): def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list): if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list") raise TypeError("notification_list should be of type list")
@@ -289,7 +292,7 @@ def send_notifications(notification_list, job_id=None):
notification.save() notification.save()
@shared_task(bind=True, queue='tower', base=LogErrorsTask) @shared_task(bind=True, queue='tower')
def run_administrative_checks(self): def run_administrative_checks(self):
logger.warn("Running administrative checks.") logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS: if not settings.TOWER_ADMIN_ALERTS:
@@ -311,7 +314,7 @@ def run_administrative_checks(self):
fail_silently=True) fail_silently=True)
@shared_task(bind=True, base=LogErrorsTask) @shared_task(bind=True)
def purge_old_stdout_files(self): def purge_old_stdout_files(self):
nowtime = time.time() nowtime = time.time()
for f in os.listdir(settings.JOBOUTPUT_ROOT): for f in os.listdir(settings.JOBOUTPUT_ROOT):
@@ -320,7 +323,7 @@ def purge_old_stdout_files(self):
logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f))) logger.info(six.text_type("Removing {}").format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
@shared_task(bind=True, base=LogErrorsTask) @shared_task(bind=True)
def cluster_node_heartbeat(self): def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.") logger.debug("Cluster node heartbeat task.")
nowtime = now() nowtime = now()
@@ -393,7 +396,7 @@ def cluster_node_heartbeat(self):
logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname)) logger.exception(six.text_type('Error marking {} as lost').format(other_inst.hostname))
@shared_task(bind=True, base=LogErrorsTask) @shared_task(bind=True)
def awx_isolated_heartbeat(self): def awx_isolated_heartbeat(self):
local_hostname = settings.CLUSTER_HOST_ID local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.") logger.debug("Controlling node checking for any isolated management tasks.")
@@ -417,7 +420,7 @@ def awx_isolated_heartbeat(self):
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version) isolated_manager.IsolatedManager.health_check(isolated_instance_qs, awx_application_version)
@shared_task(bind=True, queue='tower', base=LogErrorsTask) @shared_task(bind=True, queue='tower')
def awx_periodic_scheduler(self): def awx_periodic_scheduler(self):
run_now = now() run_now = now()
state = TowerScheduleState.get_solo() state = TowerScheduleState.get_solo()
@@ -452,7 +455,7 @@ def awx_periodic_scheduler(self):
state.save() state.save()
@shared_task(bind=True, queue='tower', base=LogErrorsTask) @shared_task(bind=True, queue='tower')
def handle_work_success(self, result, task_actual): def handle_work_success(self, result, task_actual):
try: try:
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
@@ -466,7 +469,7 @@ def handle_work_success(self, result, task_actual):
run_job_complete.delay(instance.id) run_job_complete.delay(instance.id)
@shared_task(queue='tower', base=LogErrorsTask) @shared_task(queue='tower')
def handle_work_error(task_id, *args, **kwargs): def handle_work_error(task_id, *args, **kwargs):
subtasks = kwargs.get('subtasks', None) subtasks = kwargs.get('subtasks', None)
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
@@ -507,7 +510,7 @@ def handle_work_error(task_id, *args, **kwargs):
pass pass
@shared_task(queue='tower', base=LogErrorsTask) @shared_task(queue='tower')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True): def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
''' '''
Signal handler and wrapper around inventory.update_computed_fields to Signal handler and wrapper around inventory.update_computed_fields to
@@ -527,7 +530,7 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
raise raise
@shared_task(queue='tower', base=LogErrorsTask) @shared_task(queue='tower')
def update_host_smart_inventory_memberships(): def update_host_smart_inventory_memberships():
try: try:
with transaction.atomic(): with transaction.atomic():
@@ -552,7 +555,7 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields(update_groups=False, update_hosts=False) smart_inventory.update_computed_fields(update_groups=False, update_hosts=False)
@shared_task(bind=True, queue='tower', base=LogErrorsTask, max_retries=5) @shared_task(bind=True, queue='tower', max_retries=5)
def delete_inventory(self, inventory_id, user_id): def delete_inventory(self, inventory_id, user_id):
# Delete inventory as user # Delete inventory as user
if user_id is None: if user_id is None:
@@ -597,7 +600,7 @@ def with_path_cleanup(f):
return _wrapped return _wrapped
class BaseTask(LogErrorsTask): class BaseTask(Task):
name = None name = None
model = None model = None
event_model = None event_model = None
@@ -2329,7 +2332,7 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save() new_obj.save()
@shared_task(bind=True, queue='tower', base=LogErrorsTask) @shared_task(bind=True, queue='tower')
def deep_copy_model_obj( def deep_copy_model_obj(
self, model_module, model_name, obj_pk, new_obj_pk, self, model_module, model_name, obj_pk, new_obj_pk,
user_pk, sub_obj_list, permission_check_func=None user_pk, sub_obj_list, permission_check_func=None

View File

@@ -468,6 +468,16 @@ CELERY_QUEUES = (
) )
CELERY_ROUTES = {} CELERY_ROUTES = {}
def log_celery_failure(*args):
# Import annotations lazily to avoid polluting the `awx.settings` namespace
# and causing circular imports
from awx.main.tasks import log_celery_failure
return log_celery_failure(*args)
CELERY_ANNOTATIONS = {'*': {'on_failure': log_celery_failure}}
CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler'
CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERYBEAT_MAX_LOOP_INTERVAL = 60
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {