mirror of
https://github.com/ansible/awx.git
synced 2026-03-19 01:47:31 -02:30
added cache-clear service. update dispatcher queues
Signed-off-by: Jessica Mack <jmack@redhat.com>
This commit is contained in:
6
Makefile
6
Makefile
@@ -70,6 +70,12 @@ I18N_FLAG_FILE = .i18n_built
|
|||||||
VERSION PYTHON_VERSION docker-compose-sources \
|
VERSION PYTHON_VERSION docker-compose-sources \
|
||||||
.git/hooks/pre-commit github_ci_setup github_ci_runner
|
.git/hooks/pre-commit github_ci_setup github_ci_runner
|
||||||
|
|
||||||
|
cache-clear:
|
||||||
|
@if [ "$(VENV_BASE)" ]; then \
|
||||||
|
. $(VENV_BASE)/awx/bin/activate; \
|
||||||
|
fi; \
|
||||||
|
$(PYTHON) manage.py run_cache_clear
|
||||||
|
|
||||||
clean-tmp:
|
clean-tmp:
|
||||||
rm -rf tmp/
|
rm -rf tmp/
|
||||||
|
|
||||||
|
|||||||
39
awx/main/management/commands/run_cache_clear.py
Normal file
39
awx/main/management/commands/run_cache_clear.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
from django.conf import settings
|
||||||
|
from django.core.cache import cache
|
||||||
|
from awx.main.dispatch import pg_bus_conn
|
||||||
|
from awx.conf import settings_registry
|
||||||
|
|
||||||
|
logger = logging.getLogger('awx.main.cache_clear')
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
"""
|
||||||
|
Cache Clear
|
||||||
|
Runs as a management command and starts a daemon that listens for a pg_notify message to clear the cache.
|
||||||
|
"""
|
||||||
|
|
||||||
|
help = 'Launch the cache clear daemon'
|
||||||
|
|
||||||
|
def handle(self, *arg, **options):
|
||||||
|
try:
|
||||||
|
with pg_bus_conn(new_connection=True) as conn:
|
||||||
|
conn.listen("tower_settings_change")
|
||||||
|
for e in conn.events(yield_timeouts=True):
|
||||||
|
if e is not None:
|
||||||
|
logger.info("Cache clear request received. Clearing now")
|
||||||
|
# clear the cache of the keys in the payload
|
||||||
|
setting_keys = e.payload
|
||||||
|
orig_len = len(setting_keys)
|
||||||
|
for i in range(orig_len):
|
||||||
|
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
|
||||||
|
setting_keys.append(dependent_key)
|
||||||
|
cache_keys = set(setting_keys)
|
||||||
|
logger.info('cache delete_many(%r)', cache_keys)
|
||||||
|
cache.delete_many(cache_keys)
|
||||||
|
except Exception:
|
||||||
|
# Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata
|
||||||
|
logger.exception('Encountered unhandled error in cache clear main loop')
|
||||||
|
raise
|
||||||
@@ -76,7 +76,7 @@ class Command(BaseCommand):
|
|||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
queues = ['tower_broadcast_all', get_local_queuename()]
|
queues = ['tower_broadcast_all', 'tower_settings_change', 'rsyslog_configurer', get_local_queuename()]
|
||||||
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
|
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
|
||||||
consumer.run()
|
consumer.run()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@@ -61,10 +61,10 @@ from awx.main.utils.common import (
|
|||||||
|
|
||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
|
from awx.main.utils.external_logging import send_pg_notify
|
||||||
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
|
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main import analytics
|
from awx.main import analytics
|
||||||
from awx.conf import settings_registry
|
|
||||||
from awx.main.analytics.subsystem_metrics import Metrics
|
from awx.main.analytics.subsystem_metrics import Metrics
|
||||||
|
|
||||||
from rest_framework.exceptions import PermissionDenied
|
from rest_framework.exceptions import PermissionDenied
|
||||||
@@ -240,15 +240,10 @@ def apply_cluster_membership_policies():
|
|||||||
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
||||||
|
|
||||||
|
|
||||||
@task(queue='tower_broadcast_all')
|
@task(queue='tower_settings_change')
|
||||||
def clear_setting_cache(setting_keys):
|
def clear_setting_cache(setting_keys):
|
||||||
orig_len = len(setting_keys)
|
# notify the service to clear the cache
|
||||||
for i in range(orig_len):
|
send_pg_notify('tower_settings_change', setting_keys)
|
||||||
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
|
|
||||||
setting_keys.append(dependent_key)
|
|
||||||
cache_keys = set(setting_keys)
|
|
||||||
logger.debug('cache delete_many(%r)', cache_keys)
|
|
||||||
cache.delete_many(cache_keys)
|
|
||||||
|
|
||||||
|
|
||||||
@task(queue='tower_broadcast_all')
|
@task(queue='tower_broadcast_all')
|
||||||
|
|||||||
@@ -868,6 +868,7 @@ LOGGING = {
|
|||||||
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
|
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
|
||||||
'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']},
|
'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']},
|
||||||
'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']},
|
'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']},
|
||||||
|
'awx.main.cache_clear': {'handlers': ['cache_clear']},
|
||||||
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
|
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
|
||||||
'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False},
|
'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False},
|
||||||
'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
||||||
@@ -899,6 +900,7 @@ handler_config = {
|
|||||||
'rbac_migrations': {'filename': 'tower_rbac_migrations.log'},
|
'rbac_migrations': {'filename': 'tower_rbac_migrations.log'},
|
||||||
'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'},
|
'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'},
|
||||||
'rsyslog_configurer': {'filename': 'rsyslog_configurer.log'},
|
'rsyslog_configurer': {'filename': 'rsyslog_configurer.log'},
|
||||||
|
'cache_clear': {'filename': 'cache_clear.log'},
|
||||||
}
|
}
|
||||||
|
|
||||||
# If running on a VM, we log to files. When running in a container, we log to stdout.
|
# If running on a VM, we log to files. When running in a container, we log to stdout.
|
||||||
|
|||||||
@@ -44,6 +44,16 @@ stdout_logfile_maxbytes=0
|
|||||||
stderr_logfile=/dev/stderr
|
stderr_logfile=/dev/stderr
|
||||||
stderr_logfile_maxbytes=0
|
stderr_logfile_maxbytes=0
|
||||||
|
|
||||||
|
[program:awx-cache-clear]
|
||||||
|
command = make cache-clear
|
||||||
|
autorestart = true
|
||||||
|
stopasgroup=true
|
||||||
|
killasgroup=true
|
||||||
|
stdout_logfile=/dev/stdout
|
||||||
|
stdout_logfile_maxbytes=0
|
||||||
|
stderr_logfile=/dev/stderr
|
||||||
|
stderr_logfile_maxbytes=0
|
||||||
|
|
||||||
[program:awx-uwsgi]
|
[program:awx-uwsgi]
|
||||||
command = make uwsgi
|
command = make uwsgi
|
||||||
autorestart = true
|
autorestart = true
|
||||||
|
|||||||
Reference in New Issue
Block a user