From cc4aa49556cf493ca30427997f36c2b5b55e4376 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Wed, 14 Dec 2016 10:13:39 -0500 Subject: [PATCH 1/3] pool restart with thread-based external log config for CTiT enablement of celery task Tower logs --- Makefile | 2 +- awx/conf/apps.py | 6 ++--- awx/conf/signals.py | 4 +-- awx/main/conf.py | 2 +- awx/main/scheduler/tasks.py | 2 ++ awx/main/tasks.py | 50 ++++++++++++++++++++++++++++++++----- awx/main/utils/handlers.py | 23 ++++++++++++++--- awx/settings/defaults.py | 17 +++---------- 8 files changed, 75 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 0679a11911..c0c8f62d75 100644 --- a/Makefile +++ b/Makefile @@ -428,7 +428,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver diff --git a/awx/conf/apps.py b/awx/conf/apps.py index 62ad0085df..4f76e13d40 100644 --- a/awx/conf/apps.py +++ b/awx/conf/apps.py @@ -16,7 +16,7 @@ class ConfConfig(AppConfig): from .settings import SettingsWrapper SettingsWrapper.initialize() if settings.LOG_AGGREGATOR_ENABLED: - LOGGING = settings.LOGGING - LOGGING['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' - configure_logging(settings.LOGGING_CONFIG, LOGGING) + LOGGING_DICT = settings.LOGGING + LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' + configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) # checks.register(SettingsWrapper._check_settings) diff --git a/awx/conf/signals.py b/awx/conf/signals.py index 78411b1435..9d1813843e 100644 --- a/awx/conf/signals.py +++ b/awx/conf/signals.py @@ -13,7 +13,7 @@ import awx.main.signals from awx.conf import settings_registry from awx.conf.models import Setting from awx.conf.serializers import SettingSerializer -from awx.main.tasks import clear_cache_keys +from awx.main.tasks import process_cache_changes logger = logging.getLogger('awx.conf.signals') @@ -32,7 +32,7 @@ def handle_setting_change(key, for_delete=False): cache_keys = set([Setting.get_cache_key(k) for k in setting_keys]) logger.debug('sending signals to delete cache keys(%r)', cache_keys) cache.delete_many(cache_keys) - clear_cache_keys.delay(list(cache_keys)) + process_cache_changes.delay(list(cache_keys)) # Send setting_changed signal with new value for each setting. for setting_key in setting_keys: diff --git a/awx/main/conf.py b/awx/main/conf.py index c51205eaa7..1af962cd87 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -274,7 +274,7 @@ register( register( 'LOG_AGGREGATOR_LOGGERS', field_class=fields.StringListField, - default=['awx', 'activity_stream', 'job_events', 'system_tracking'], + default=['activity_stream', 'job_events', 'system_tracking'], label=_('Loggers to send data to the log aggregator from'), help_text=_('List of loggers that will send HTTP logs to the collector, these can ' 'include any or all of: \n' diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 5c4c821606..6e169224b7 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -34,11 +34,13 @@ def run_job_complete(job_id): @task def run_task_manager(): + logger.debug("Running Tower task manager.") TaskManager().schedule() @task def run_fail_inconsistent_running_jobs(): + logger.debug("Running task to fail inconsistent running jobs.") with transaction.atomic(): # Lock try: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 02af9239d9..5b7724ea31 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,8 @@ import pexpect # Celery from celery import Task, task -from celery.signals import celeryd_init +from celery.signals import celeryd_init, worker_process_init +from celery import current_app # Django from django.conf import settings @@ -75,7 +76,8 @@ logger = logging.getLogger('awx.main.tasks') def celery_startup(conf=None, **kwargs): # Re-init all schedules # NOTE: Rework this during the Rampart work - logger.info("Syncing Tower Schedules") + startup_logger = logging.getLogger('awx.main.tasks') + startup_logger.info("Syncing Tower Schedules") for sch in Schedule.objects.all(): try: sch.update_computed_fields() @@ -84,7 +86,25 @@ def celery_startup(conf=None, **kwargs): logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) -def uwsgi_reload(): +def _setup_tower_logger(): + global logger + from django.utils.log import configure_logging + LOGGING_DICT = settings.LOGGING + if settings.LOG_AGGREGATOR_ENABLED: + LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' + LOGGING_DICT['handlers']['http_receiver']['async'] = False + configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) + logger = logging.getLogger('awx.main.tasks') + + +@worker_process_init.connect +def task_set_logger_pre_run(*args, **kwargs): + if settings.LOG_AGGREGATOR_ENABLED: + _setup_tower_logger() + logger.debug('Custom Tower logger configured for worker process.') + + +def _uwsgi_reload(): # http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands logger.warn('Initiating uWSGI chain reload of server') TRIGGER_CHAIN_RELOAD = 'c' @@ -92,14 +112,29 @@ def uwsgi_reload(): awxfifo.write(TRIGGER_CHAIN_RELOAD) -@task(queue='broadcast_all') -def clear_cache_keys(cache_keys): +def _reset_celery_logging(): + # Worker logger reloaded, now send signal to restart pool + app = current_app._get_current_object() + app.control.broadcast('pool_restart', arguments={'reload': True}, + destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) + + +def _clear_cache_keys(cache_keys): set_of_keys = set([key for key in cache_keys]) logger.debug('cache delete_many(%r)', set_of_keys) cache.delete_many(set_of_keys) + + +@task(queue='broadcast_all') +def process_cache_changes(cache_keys): + logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( + _clear_cache_keys.request)) + clear_cache_keys(cache_keys) + set_of_keys = set([key for key in cache_keys]) for setting_key in set_of_keys: if setting_key.startswith('LOG_AGGREGATOR_'): - uwsgi_reload() + _uwsgi_reload() + _reset_celery_logging() break @@ -129,6 +164,7 @@ def send_notifications(notification_list, job_id=None): @task(bind=True, queue='default') def run_administrative_checks(self): + logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: return validation_info = TaskEnhancer().validate_enhancements() @@ -150,11 +186,13 @@ def run_administrative_checks(self): @task(bind=True, queue='default') def cleanup_authtokens(self): + logger.warn("Cleaning up expired authtokens.") AuthToken.objects.filter(expires__lt=now()).delete() @task(bind=True) def cluster_node_heartbeat(self): + logger.debug("Cluster node heartbeat task.") inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) if inst.exists(): inst = inst[0] diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index 28b7c26af7..71176cbb1a 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -30,6 +30,7 @@ PARAM_NAMES = { 'password': 'LOG_AGGREGATOR_PASSWORD', 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', + 'enabled_flag': 'LOG_AGGREGATOR_ENABLED', } @@ -48,6 +49,7 @@ class HTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(HTTPSHandler, self).__init__() self.fqdn = fqdn + self.async = kwargs.get('async', True) for fd in PARAM_NAMES: # settings values take precedence over the input params settings_name = PARAM_NAMES[fd] @@ -100,11 +102,21 @@ class HTTPSHandler(logging.Handler): payload_str = json.dumps(payload_input) else: payload_str = payload_input - return dict(data=payload_str, background_callback=unused_callback) + if self.async: + return dict(data=payload_str, background_callback=unused_callback) + else: + return dict(data=payload_str) + + def skip_log(self, logger_name): + if self.host == '' or (not self.enabled_flag): + return True + if not logger_name.startswith('awx.analytics'): + # Tower log emission is only turned off by enablement setting + return False + return self.enabled_loggers is None or logger_name.split('.')[-1] not in self.enabled_loggers def emit(self, record): - if (self.host == '' or self.enabled_loggers is None or - record.name.split('.')[-1] not in self.enabled_loggers): + if self.skip_log(record.name): return try: payload = self.format(record) @@ -123,7 +135,10 @@ class HTTPSHandler(logging.Handler): self.session.post(host, **self.get_post_kwargs(fact_payload)) return - self.session.post(host, **self.get_post_kwargs(payload)) + if self.async: + self.session.post(host, **self.get_post_kwargs(payload)) + else: + requests.post(host, auth=requests.auth.HTTPBasicAuth(self.username, self.password), **self.get_post_kwargs(payload)) except (KeyboardInterrupt, SystemExit): raise except: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 439c23783b..16b1484d54 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -372,6 +372,7 @@ CELERY_ACCEPT_CONTENT = ['json'] CELERY_TRACK_STARTED = True CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None +CELERYD_POOL_RESTARTS = True CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' @@ -874,7 +875,7 @@ LOGGING = { }, 'http_receiver': { 'class': 'awx.main.utils.handlers.HTTPSNullHandler', - 'level': 'INFO', + 'level': 'DEBUG', 'formatter': 'json', 'host': '', }, @@ -973,7 +974,7 @@ LOGGING = { 'handlers': ['callback_receiver'], }, 'awx.main.tasks': { - 'handlers': ['task_system'] + 'handlers': ['task_system'], }, 'awx.main.scheduler': { 'handlers': ['task_system'], @@ -1001,18 +1002,6 @@ LOGGING = { 'level': 'INFO', 'propagate': False }, - 'awx.analytics.job_events': { - 'handlers': ['null'], - 'level': 'INFO' - }, - 'awx.analytics.activity_stream': { - 'handlers': ['null'], - 'level': 'INFO' - }, - 'awx.analytics.system_tracking': { - 'handlers': ['null'], - 'level': 'INFO' - }, 'django_auth_ldap': { 'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG', From f76680b625e6e4f34a5832df0a1f551ced191302 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 5 Jan 2017 10:28:23 -0500 Subject: [PATCH 2/3] add back in the Tower logger, fix bug --- awx/conf/apps.py | 3 +++ awx/main/conf.py | 2 +- awx/main/tasks.py | 7 +++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/awx/conf/apps.py b/awx/conf/apps.py index 4f76e13d40..6e09545236 100644 --- a/awx/conf/apps.py +++ b/awx/conf/apps.py @@ -18,5 +18,8 @@ class ConfConfig(AppConfig): if settings.LOG_AGGREGATOR_ENABLED: LOGGING_DICT = settings.LOGGING LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' + if 'awx' in settings.LOG_AGGREGATOR_LOGGERS: + if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']: + LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver'] configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) # checks.register(SettingsWrapper._check_settings) diff --git a/awx/main/conf.py b/awx/main/conf.py index 1af962cd87..c51205eaa7 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -274,7 +274,7 @@ register( register( 'LOG_AGGREGATOR_LOGGERS', field_class=fields.StringListField, - default=['activity_stream', 'job_events', 'system_tracking'], + default=['awx', 'activity_stream', 'job_events', 'system_tracking'], label=_('Loggers to send data to the log aggregator from'), help_text=_('List of loggers that will send HTTP logs to the collector, these can ' 'include any or all of: \n' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5b7724ea31..f991037a3e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -93,6 +93,9 @@ def _setup_tower_logger(): if settings.LOG_AGGREGATOR_ENABLED: LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' LOGGING_DICT['handlers']['http_receiver']['async'] = False + if 'awx' in settings.LOG_AGGREGATOR_LOGGERS: + if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']: + LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver'] configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) logger = logging.getLogger('awx.main.tasks') @@ -128,8 +131,8 @@ def _clear_cache_keys(cache_keys): @task(queue='broadcast_all') def process_cache_changes(cache_keys): logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( - _clear_cache_keys.request)) - clear_cache_keys(cache_keys) + process_cache_changes.request)) + _clear_cache_keys(cache_keys) set_of_keys = set([key for key in cache_keys]) for setting_key in set_of_keys: if setting_key.startswith('LOG_AGGREGATOR_'): From df903285318e3359d7dd9ca18526d60774da4a21 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 5 Jan 2017 15:08:52 -0500 Subject: [PATCH 3/3] remove duplicate conversion of keys into set --- awx/main/tasks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index f991037a3e..1ff6b3216e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -122,8 +122,7 @@ def _reset_celery_logging(): destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) -def _clear_cache_keys(cache_keys): - set_of_keys = set([key for key in cache_keys]) +def _clear_cache_keys(set_of_keys): logger.debug('cache delete_many(%r)', set_of_keys) cache.delete_many(set_of_keys) @@ -132,8 +131,8 @@ def _clear_cache_keys(cache_keys): def process_cache_changes(cache_keys): logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( process_cache_changes.request)) - _clear_cache_keys(cache_keys) set_of_keys = set([key for key in cache_keys]) + _clear_cache_keys(set_of_keys) for setting_key in set_of_keys: if setting_key.startswith('LOG_AGGREGATOR_'): _uwsgi_reload()