Merge pull request #4538 from AlanCoding/celery_log_reset

Celery log reset on CTiT change / task start
This commit is contained in:
Alan Rominger 2017-01-06 15:36:45 -05:00 committed by GitHub
commit 6921cbfdd0
7 changed files with 80 additions and 31 deletions

View File

@ -428,7 +428,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \
fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST)
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST)
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver

View File

@ -16,7 +16,10 @@ class ConfConfig(AppConfig):
from .settings import SettingsWrapper
SettingsWrapper.initialize()
if settings.LOG_AGGREGATOR_ENABLED:
LOGGING = settings.LOGGING
LOGGING['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler'
configure_logging(settings.LOGGING_CONFIG, LOGGING)
LOGGING_DICT = settings.LOGGING
LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler'
if 'awx' in settings.LOG_AGGREGATOR_LOGGERS:
if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']:
LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver']
configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT)
# checks.register(SettingsWrapper._check_settings)

View File

@ -13,7 +13,7 @@ import awx.main.signals
from awx.conf import settings_registry
from awx.conf.models import Setting
from awx.conf.serializers import SettingSerializer
from awx.main.tasks import clear_cache_keys
from awx.main.tasks import process_cache_changes
logger = logging.getLogger('awx.conf.signals')
@ -32,7 +32,7 @@ def handle_setting_change(key, for_delete=False):
cache_keys = set([Setting.get_cache_key(k) for k in setting_keys])
logger.debug('sending signals to delete cache keys(%r)', cache_keys)
cache.delete_many(cache_keys)
clear_cache_keys.delay(list(cache_keys))
process_cache_changes.delay(list(cache_keys))
# Send setting_changed signal with new value for each setting.
for setting_key in setting_keys:

View File

@ -34,11 +34,13 @@ def run_job_complete(job_id):
@task
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()
@task
def run_fail_inconsistent_running_jobs():
logger.debug("Running task to fail inconsistent running jobs.")
with transaction.atomic():
# Lock
try:

View File

@ -32,7 +32,8 @@ import pexpect
# Celery
from celery import Task, task
from celery.signals import celeryd_init
from celery.signals import celeryd_init, worker_process_init
from celery import current_app
# Django
from django.conf import settings
@ -75,7 +76,8 @@ logger = logging.getLogger('awx.main.tasks')
def celery_startup(conf=None, **kwargs):
# Re-init all schedules
# NOTE: Rework this during the Rampart work
logger.info("Syncing Tower Schedules")
startup_logger = logging.getLogger('awx.main.tasks')
startup_logger.info("Syncing Tower Schedules")
for sch in Schedule.objects.all():
try:
sch.update_computed_fields()
@ -84,7 +86,28 @@ def celery_startup(conf=None, **kwargs):
logger.error("Failed to rebuild schedule {}: {}".format(sch, e))
def uwsgi_reload():
def _setup_tower_logger():
global logger
from django.utils.log import configure_logging
LOGGING_DICT = settings.LOGGING
if settings.LOG_AGGREGATOR_ENABLED:
LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler'
LOGGING_DICT['handlers']['http_receiver']['async'] = False
if 'awx' in settings.LOG_AGGREGATOR_LOGGERS:
if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']:
LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver']
configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT)
logger = logging.getLogger('awx.main.tasks')
@worker_process_init.connect
def task_set_logger_pre_run(*args, **kwargs):
if settings.LOG_AGGREGATOR_ENABLED:
_setup_tower_logger()
logger.debug('Custom Tower logger configured for worker process.')
def _uwsgi_reload():
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
logger.warn('Initiating uWSGI chain reload of server')
TRIGGER_CHAIN_RELOAD = 'c'
@ -92,14 +115,28 @@ def uwsgi_reload():
awxfifo.write(TRIGGER_CHAIN_RELOAD)
@task(queue='broadcast_all')
def clear_cache_keys(cache_keys):
set_of_keys = set([key for key in cache_keys])
def _reset_celery_logging():
# Worker logger reloaded, now send signal to restart pool
app = current_app._get_current_object()
app.control.broadcast('pool_restart', arguments={'reload': True},
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
def _clear_cache_keys(set_of_keys):
logger.debug('cache delete_many(%r)', set_of_keys)
cache.delete_many(set_of_keys)
@task(queue='broadcast_all')
def process_cache_changes(cache_keys):
logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format(
process_cache_changes.request))
set_of_keys = set([key for key in cache_keys])
_clear_cache_keys(set_of_keys)
for setting_key in set_of_keys:
if setting_key.startswith('LOG_AGGREGATOR_'):
uwsgi_reload()
_uwsgi_reload()
_reset_celery_logging()
break
@ -129,6 +166,7 @@ def send_notifications(notification_list, job_id=None):
@task(bind=True, queue='default')
def run_administrative_checks(self):
logger.warn("Running administrative checks.")
if not settings.TOWER_ADMIN_ALERTS:
return
validation_info = TaskEnhancer().validate_enhancements()
@ -150,11 +188,13 @@ def run_administrative_checks(self):
@task(bind=True, queue='default')
def cleanup_authtokens(self):
logger.warn("Cleaning up expired authtokens.")
AuthToken.objects.filter(expires__lt=now()).delete()
@task(bind=True)
def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.")
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
if inst.exists():
inst = inst[0]

View File

@ -30,6 +30,7 @@ PARAM_NAMES = {
'password': 'LOG_AGGREGATOR_PASSWORD',
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
}
@ -48,6 +49,7 @@ class HTTPSHandler(logging.Handler):
def __init__(self, fqdn=False, **kwargs):
super(HTTPSHandler, self).__init__()
self.fqdn = fqdn
self.async = kwargs.get('async', True)
for fd in PARAM_NAMES:
# settings values take precedence over the input params
settings_name = PARAM_NAMES[fd]
@ -100,11 +102,21 @@ class HTTPSHandler(logging.Handler):
payload_str = json.dumps(payload_input)
else:
payload_str = payload_input
return dict(data=payload_str, background_callback=unused_callback)
if self.async:
return dict(data=payload_str, background_callback=unused_callback)
else:
return dict(data=payload_str)
def skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag):
return True
if not logger_name.startswith('awx.analytics'):
# Tower log emission is only turned off by enablement setting
return False
return self.enabled_loggers is None or logger_name.split('.')[-1] not in self.enabled_loggers
def emit(self, record):
if (self.host == '' or self.enabled_loggers is None or
record.name.split('.')[-1] not in self.enabled_loggers):
if self.skip_log(record.name):
return
try:
payload = self.format(record)
@ -123,7 +135,10 @@ class HTTPSHandler(logging.Handler):
self.session.post(host, **self.get_post_kwargs(fact_payload))
return
self.session.post(host, **self.get_post_kwargs(payload))
if self.async:
self.session.post(host, **self.get_post_kwargs(payload))
else:
requests.post(host, auth=requests.auth.HTTPBasicAuth(self.username, self.password), **self.get_post_kwargs(payload))
except (KeyboardInterrupt, SystemExit):
raise
except:

View File

@ -376,6 +376,7 @@ CELERY_ACCEPT_CONTENT = ['json']
CELERY_TRACK_STARTED = True
CELERYD_TASK_TIME_LIMIT = None
CELERYD_TASK_SOFT_TIME_LIMIT = None
CELERYD_POOL_RESTARTS = True
CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler'
CELERYBEAT_MAX_LOOP_INTERVAL = 60
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
@ -878,7 +879,7 @@ LOGGING = {
},
'http_receiver': {
'class': 'awx.main.utils.handlers.HTTPSNullHandler',
'level': 'INFO',
'level': 'DEBUG',
'formatter': 'json',
'host': '',
},
@ -977,7 +978,7 @@ LOGGING = {
'handlers': ['callback_receiver'],
},
'awx.main.tasks': {
'handlers': ['task_system']
'handlers': ['task_system'],
},
'awx.main.scheduler': {
'handlers': ['task_system'],
@ -1005,18 +1006,6 @@ LOGGING = {
'level': 'INFO',
'propagate': False
},
'awx.analytics.job_events': {
'handlers': ['null'],
'level': 'INFO'
},
'awx.analytics.activity_stream': {
'handlers': ['null'],
'level': 'INFO'
},
'awx.analytics.system_tracking': {
'handlers': ['null'],
'level': 'INFO'
},
'django_auth_ldap': {
'handlers': ['console', 'file', 'tower_warnings'],
'level': 'DEBUG',