diff --git a/Makefile b/Makefile index 0679a11911..c0c8f62d75 100644 --- a/Makefile +++ b/Makefile @@ -428,7 +428,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver diff --git a/awx/conf/apps.py b/awx/conf/apps.py index 62ad0085df..6e09545236 100644 --- a/awx/conf/apps.py +++ b/awx/conf/apps.py @@ -16,7 +16,10 @@ class ConfConfig(AppConfig): from .settings import SettingsWrapper SettingsWrapper.initialize() if settings.LOG_AGGREGATOR_ENABLED: - LOGGING = settings.LOGGING - LOGGING['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' - configure_logging(settings.LOGGING_CONFIG, LOGGING) + LOGGING_DICT = settings.LOGGING + LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' + if 'awx' in settings.LOG_AGGREGATOR_LOGGERS: + if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']: + LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver'] + configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) # checks.register(SettingsWrapper._check_settings) diff --git a/awx/conf/signals.py b/awx/conf/signals.py index 78411b1435..9d1813843e 100644 --- a/awx/conf/signals.py +++ b/awx/conf/signals.py @@ -13,7 +13,7 @@ import awx.main.signals from awx.conf import settings_registry from awx.conf.models import Setting from awx.conf.serializers import SettingSerializer -from awx.main.tasks import clear_cache_keys +from awx.main.tasks import process_cache_changes logger = logging.getLogger('awx.conf.signals') @@ -32,7 +32,7 @@ def handle_setting_change(key, for_delete=False): cache_keys = set([Setting.get_cache_key(k) for k in setting_keys]) logger.debug('sending signals to delete cache keys(%r)', cache_keys) cache.delete_many(cache_keys) - clear_cache_keys.delay(list(cache_keys)) + process_cache_changes.delay(list(cache_keys)) # Send setting_changed signal with new value for each setting. for setting_key in setting_keys: diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 5c4c821606..6e169224b7 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -34,11 +34,13 @@ def run_job_complete(job_id): @task def run_task_manager(): + logger.debug("Running Tower task manager.") TaskManager().schedule() @task def run_fail_inconsistent_running_jobs(): + logger.debug("Running task to fail inconsistent running jobs.") with transaction.atomic(): # Lock try: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0f47d080bb..c9737ec375 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,8 @@ import pexpect # Celery from celery import Task, task -from celery.signals import celeryd_init +from celery.signals import celeryd_init, worker_process_init +from celery import current_app # Django from django.conf import settings @@ -75,7 +76,8 @@ logger = logging.getLogger('awx.main.tasks') def celery_startup(conf=None, **kwargs): # Re-init all schedules # NOTE: Rework this during the Rampart work - logger.info("Syncing Tower Schedules") + startup_logger = logging.getLogger('awx.main.tasks') + startup_logger.info("Syncing Tower Schedules") for sch in Schedule.objects.all(): try: sch.update_computed_fields() @@ -84,7 +86,28 @@ def celery_startup(conf=None, **kwargs): logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) -def uwsgi_reload(): +def _setup_tower_logger(): + global logger + from django.utils.log import configure_logging + LOGGING_DICT = settings.LOGGING + if settings.LOG_AGGREGATOR_ENABLED: + LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler' + LOGGING_DICT['handlers']['http_receiver']['async'] = False + if 'awx' in settings.LOG_AGGREGATOR_LOGGERS: + if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']: + LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver'] + configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT) + logger = logging.getLogger('awx.main.tasks') + + +@worker_process_init.connect +def task_set_logger_pre_run(*args, **kwargs): + if settings.LOG_AGGREGATOR_ENABLED: + _setup_tower_logger() + logger.debug('Custom Tower logger configured for worker process.') + + +def _uwsgi_reload(): # http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands logger.warn('Initiating uWSGI chain reload of server') TRIGGER_CHAIN_RELOAD = 'c' @@ -92,14 +115,28 @@ def uwsgi_reload(): awxfifo.write(TRIGGER_CHAIN_RELOAD) -@task(queue='broadcast_all') -def clear_cache_keys(cache_keys): - set_of_keys = set([key for key in cache_keys]) +def _reset_celery_logging(): + # Worker logger reloaded, now send signal to restart pool + app = current_app._get_current_object() + app.control.broadcast('pool_restart', arguments={'reload': True}, + destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) + + +def _clear_cache_keys(set_of_keys): logger.debug('cache delete_many(%r)', set_of_keys) cache.delete_many(set_of_keys) + + +@task(queue='broadcast_all') +def process_cache_changes(cache_keys): + logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( + process_cache_changes.request)) + set_of_keys = set([key for key in cache_keys]) + _clear_cache_keys(set_of_keys) for setting_key in set_of_keys: if setting_key.startswith('LOG_AGGREGATOR_'): - uwsgi_reload() + _uwsgi_reload() + _reset_celery_logging() break @@ -129,6 +166,7 @@ def send_notifications(notification_list, job_id=None): @task(bind=True, queue='default') def run_administrative_checks(self): + logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: return validation_info = TaskEnhancer().validate_enhancements() @@ -150,11 +188,13 @@ def run_administrative_checks(self): @task(bind=True, queue='default') def cleanup_authtokens(self): + logger.warn("Cleaning up expired authtokens.") AuthToken.objects.filter(expires__lt=now()).delete() @task(bind=True) def cluster_node_heartbeat(self): + logger.debug("Cluster node heartbeat task.") inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) if inst.exists(): inst = inst[0] diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index 28b7c26af7..71176cbb1a 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -30,6 +30,7 @@ PARAM_NAMES = { 'password': 'LOG_AGGREGATOR_PASSWORD', 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', + 'enabled_flag': 'LOG_AGGREGATOR_ENABLED', } @@ -48,6 +49,7 @@ class HTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(HTTPSHandler, self).__init__() self.fqdn = fqdn + self.async = kwargs.get('async', True) for fd in PARAM_NAMES: # settings values take precedence over the input params settings_name = PARAM_NAMES[fd] @@ -100,11 +102,21 @@ class HTTPSHandler(logging.Handler): payload_str = json.dumps(payload_input) else: payload_str = payload_input - return dict(data=payload_str, background_callback=unused_callback) + if self.async: + return dict(data=payload_str, background_callback=unused_callback) + else: + return dict(data=payload_str) + + def skip_log(self, logger_name): + if self.host == '' or (not self.enabled_flag): + return True + if not logger_name.startswith('awx.analytics'): + # Tower log emission is only turned off by enablement setting + return False + return self.enabled_loggers is None or logger_name.split('.')[-1] not in self.enabled_loggers def emit(self, record): - if (self.host == '' or self.enabled_loggers is None or - record.name.split('.')[-1] not in self.enabled_loggers): + if self.skip_log(record.name): return try: payload = self.format(record) @@ -123,7 +135,10 @@ class HTTPSHandler(logging.Handler): self.session.post(host, **self.get_post_kwargs(fact_payload)) return - self.session.post(host, **self.get_post_kwargs(payload)) + if self.async: + self.session.post(host, **self.get_post_kwargs(payload)) + else: + requests.post(host, auth=requests.auth.HTTPBasicAuth(self.username, self.password), **self.get_post_kwargs(payload)) except (KeyboardInterrupt, SystemExit): raise except: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 48338e09d8..8e67e9a025 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -376,6 +376,7 @@ CELERY_ACCEPT_CONTENT = ['json'] CELERY_TRACK_STARTED = True CELERYD_TASK_TIME_LIMIT = None CELERYD_TASK_SOFT_TIME_LIMIT = None +CELERYD_POOL_RESTARTS = True CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' @@ -878,7 +879,7 @@ LOGGING = { }, 'http_receiver': { 'class': 'awx.main.utils.handlers.HTTPSNullHandler', - 'level': 'INFO', + 'level': 'DEBUG', 'formatter': 'json', 'host': '', }, @@ -977,7 +978,7 @@ LOGGING = { 'handlers': ['callback_receiver'], }, 'awx.main.tasks': { - 'handlers': ['task_system'] + 'handlers': ['task_system'], }, 'awx.main.scheduler': { 'handlers': ['task_system'], @@ -1005,18 +1006,6 @@ LOGGING = { 'level': 'INFO', 'propagate': False }, - 'awx.analytics.job_events': { - 'handlers': ['null'], - 'level': 'INFO' - }, - 'awx.analytics.activity_stream': { - 'handlers': ['null'], - 'level': 'INFO' - }, - 'awx.analytics.system_tracking': { - 'handlers': ['null'], - 'level': 'INFO' - }, 'django_auth_ldap': { 'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG',