New awx.main.utils directory, distributed task to invalidate settings

This commit is contained in:
AlanCoding 2016-12-02 14:36:04 -05:00
parent db21178b14
commit 7848198b9f
10 changed files with 50 additions and 71 deletions

View File

@ -433,7 +433,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \
fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,$(COMPOSE_HOST)
$(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,broadcast_all,$(COMPOSE_HOST)
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver

View File

@ -3,17 +3,16 @@ import logging
# Django
from django.conf import settings
from django.core.cache import cache
from django.core.signals import setting_changed
from django.db.models.signals import post_save, pre_delete, post_delete
from django.dispatch import receiver
from django.utils.log import configure_logging
# Tower
import awx.main.signals
from awx.conf import settings_registry
from awx.conf.models import Setting
from awx.conf.serializers import SettingSerializer
from awx.main.tasks import clear_cache_keys
logger = logging.getLogger('awx.conf.signals')
@ -26,12 +25,16 @@ def handle_setting_change(key, for_delete=False):
# When a setting changes or is deleted, remove its value from cache along
# with any other settings that depend on it.
setting_keys = [key]
setting_key_dict = {}
setting_key_dict[key] = key
for dependent_key in settings_registry.get_dependent_settings(key):
# Note: Doesn't handle multiple levels of dependencies!
setting_keys.append(dependent_key)
setting_key_dict[dependent_key] = dependent_key
cache_keys = set([Setting.get_cache_key(k) for k in setting_keys])
logger.debug('cache delete_many(%r)', cache_keys)
cache.delete_many(cache_keys)
logger.debug('sending signals to delete cache keys(%r)', cache_keys)
# cache.delete_many(cache_keys)
clear_cache_keys.delay(setting_key_dict)
# Send setting_changed signal with new value for each setting.
for setting_key in setting_keys:
@ -41,11 +44,6 @@ def handle_setting_change(key, for_delete=False):
value=getattr(settings, setting_key, None),
enter=not bool(for_delete),
)
# TODO: Move logic to task to run on all cluster nodes
if setting_key.startswith('LOG_AGGREGATOR_'):
configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
# settings.LOGGING_CONFIG = None
# logging.config.dictConfig(settings.LOGGING)
@receiver(post_save, sender=Setting)

View File

@ -1,2 +0,0 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.

View File

@ -1,57 +0,0 @@
import os
import yaml
def parse_config_file():
"""
Find the .splunk_logger config file in the current directory, or in the
user's home and parse it. The one in the current directory has precedence.
:return: A tuple with:
- project_id
- access_token
"""
for filename in ('.splunk_logger', os.path.expanduser('~/.splunk_logger')):
project_id, access_token, api_domain = _parse_config_file_impl(filename)
if project_id is not None \
and access_token is not None \
and api_domain is not None:
return project_id, access_token, api_domain
else:
return None, None, None
def _parse_config_file_impl(filename):
"""
Format for the file is:
credentials:
project_id: ...
access_token: ...
api_domain: ...
:param filename: The filename to parse
:return: A tuple with:
- project_id
- access_token
- api_domain
"""
try:
doc = yaml.load(file(filename).read())
project_id = doc["credentials"]["project_id"]
access_token = doc["credentials"]["access_token"]
api_domain = doc["credentials"]["api_domain"]
return project_id, access_token, api_domain
except:
return None, None, None
def get_config_from_env():
return (os.environ.get('SPLUNK_PROJECT_ID', None),
os.environ.get('SPLUNK_ACCESS_TOKEN', None),
os.environ.get('SPLUNK_API_DOMAIN', None))

View File

@ -42,6 +42,8 @@ from django.utils.encoding import smart_str
from django.core.mail import send_mail
from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
from django.utils.log import configure_logging
# AWX
from awx.main.constants import CLOUD_PROVIDERS
@ -83,6 +85,17 @@ def celery_startup(conf=None, **kwargs):
logger.error("Failed to rebuild schedule {}: {}".format(sch, e))
@task(queue='broadcast_all')
def clear_cache_keys(cache_keys):
set_of_keys = set([key for key in cache_keys])
logger.debug('cache delete_many(%r)', set_of_keys)
cache.delete_many(set_of_keys)
for setting_key in set_of_keys:
if setting_key.startswith('LOG_AGGREGATOR_'):
configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
break
@task(queue='default')
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):

View File

@ -0,0 +1,25 @@
# Copyright (c) 2017 Ansible Tower by Red Hat
# All Rights Reserved.
# AWX
from awx.main.utils.common import * # noqa
# Fields that didn't get included in __all__
# TODO: after initial commit of file move to devel, these can be added
# to common.py __all__ and removed here
from awx.main.utils.common import ( # noqa
RequireDebugTrueOrTest,
encrypt_field,
parse_yaml_or_json,
decrypt_field,
build_url,
timestamp_apiformat,
model_instance_diff,
model_to_dict,
check_proot_installed,
build_proot_temp_dir,
wrap_args_with_proot,
get_system_task_capacity,
decrypt_field_value
)

View File

@ -9,6 +9,7 @@ import djcelery
from datetime import timedelta
from kombu import Queue, Exchange
from kombu.common import Broadcast
# global settings
from django.conf import global_settings
@ -373,6 +374,7 @@ CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('jobs', Exchange('jobs'), routing_key='jobs'),
Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False),
Broadcast('broadcast_all')
# Projects use a fanout queue, this isn't super well supported
)
CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs',
@ -843,7 +845,7 @@ LOGGING = {
'format': '%(asctime)s %(levelname)-8s %(name)s %(message)s',
},
'json': {
'()': 'awx.main.log_utils.formatters.LogstashFormatter'
'()': 'awx.main.utils.formatters.LogstashFormatter'
}
},
'handlers': {
@ -867,7 +869,7 @@ LOGGING = {
'formatter': 'simple',
},
'http_receiver': {
'class': 'awx.main.log_utils.handlers.HTTPSHandler',
'class': 'awx.main.utils.handlers.HTTPSHandler',
'level': 'INFO',
'formatter': 'json',
'host': '',