mirror of
https://github.com/ansible/awx.git
synced 2026-02-03 02:28:12 -03:30
Merge branch 'release_3.1.0' into multi_worker_callbacks
This commit is contained in:
@@ -32,7 +32,8 @@ import pexpect
|
||||
|
||||
# Celery
|
||||
from celery import Task, task
|
||||
from celery.signals import celeryd_init
|
||||
from celery.signals import celeryd_init, worker_process_init
|
||||
from celery import current_app
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
@@ -75,7 +76,8 @@ logger = logging.getLogger('awx.main.tasks')
|
||||
def celery_startup(conf=None, **kwargs):
|
||||
# Re-init all schedules
|
||||
# NOTE: Rework this during the Rampart work
|
||||
logger.info("Syncing Tower Schedules")
|
||||
startup_logger = logging.getLogger('awx.main.tasks')
|
||||
startup_logger.info("Syncing Tower Schedules")
|
||||
for sch in Schedule.objects.all():
|
||||
try:
|
||||
sch.update_computed_fields()
|
||||
@@ -84,7 +86,28 @@ def celery_startup(conf=None, **kwargs):
|
||||
logger.error("Failed to rebuild schedule {}: {}".format(sch, e))
|
||||
|
||||
|
||||
def uwsgi_reload():
|
||||
def _setup_tower_logger():
|
||||
global logger
|
||||
from django.utils.log import configure_logging
|
||||
LOGGING_DICT = settings.LOGGING
|
||||
if settings.LOG_AGGREGATOR_ENABLED:
|
||||
LOGGING_DICT['handlers']['http_receiver']['class'] = 'awx.main.utils.handlers.HTTPSHandler'
|
||||
LOGGING_DICT['handlers']['http_receiver']['async'] = False
|
||||
if 'awx' in settings.LOG_AGGREGATOR_LOGGERS:
|
||||
if 'http_receiver' not in LOGGING_DICT['loggers']['awx']['handlers']:
|
||||
LOGGING_DICT['loggers']['awx']['handlers'] += ['http_receiver']
|
||||
configure_logging(settings.LOGGING_CONFIG, LOGGING_DICT)
|
||||
logger = logging.getLogger('awx.main.tasks')
|
||||
|
||||
|
||||
@worker_process_init.connect
|
||||
def task_set_logger_pre_run(*args, **kwargs):
|
||||
if settings.LOG_AGGREGATOR_ENABLED:
|
||||
_setup_tower_logger()
|
||||
logger.debug('Custom Tower logger configured for worker process.')
|
||||
|
||||
|
||||
def _uwsgi_reload():
|
||||
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
|
||||
logger.warn('Initiating uWSGI chain reload of server')
|
||||
TRIGGER_CHAIN_RELOAD = 'c'
|
||||
@@ -92,14 +115,28 @@ def uwsgi_reload():
|
||||
awxfifo.write(TRIGGER_CHAIN_RELOAD)
|
||||
|
||||
|
||||
@task(queue='broadcast_all')
|
||||
def clear_cache_keys(cache_keys):
|
||||
set_of_keys = set([key for key in cache_keys])
|
||||
def _reset_celery_logging():
|
||||
# Worker logger reloaded, now send signal to restart pool
|
||||
app = current_app._get_current_object()
|
||||
app.control.broadcast('pool_restart', arguments={'reload': True},
|
||||
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
|
||||
|
||||
|
||||
def _clear_cache_keys(set_of_keys):
|
||||
logger.debug('cache delete_many(%r)', set_of_keys)
|
||||
cache.delete_many(set_of_keys)
|
||||
|
||||
|
||||
@task(queue='broadcast_all')
|
||||
def process_cache_changes(cache_keys):
|
||||
logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format(
|
||||
process_cache_changes.request))
|
||||
set_of_keys = set([key for key in cache_keys])
|
||||
_clear_cache_keys(set_of_keys)
|
||||
for setting_key in set_of_keys:
|
||||
if setting_key.startswith('LOG_AGGREGATOR_'):
|
||||
uwsgi_reload()
|
||||
_uwsgi_reload()
|
||||
_reset_celery_logging()
|
||||
break
|
||||
|
||||
|
||||
@@ -129,6 +166,7 @@ def send_notifications(notification_list, job_id=None):
|
||||
|
||||
@task(bind=True, queue='default')
|
||||
def run_administrative_checks(self):
|
||||
logger.warn("Running administrative checks.")
|
||||
if not settings.TOWER_ADMIN_ALERTS:
|
||||
return
|
||||
validation_info = TaskEnhancer().validate_enhancements()
|
||||
@@ -150,11 +188,13 @@ def run_administrative_checks(self):
|
||||
|
||||
@task(bind=True, queue='default')
|
||||
def cleanup_authtokens(self):
|
||||
logger.warn("Cleaning up expired authtokens.")
|
||||
AuthToken.objects.filter(expires__lt=now()).delete()
|
||||
|
||||
|
||||
@task(bind=True)
|
||||
def cluster_node_heartbeat(self):
|
||||
logger.debug("Cluster node heartbeat task.")
|
||||
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
|
||||
if inst.exists():
|
||||
inst = inst[0]
|
||||
@@ -1832,7 +1872,7 @@ class RunSystemJob(BaseTask):
|
||||
if 'days' in json_vars and system_job.job_type != 'cleanup_facts':
|
||||
args.extend(['--days', str(json_vars.get('days', 60))])
|
||||
if system_job.job_type == 'cleanup_jobs':
|
||||
args.extend(['--jobs', '--project-updates', '--inventory-updates', '--management-jobs', '--ad-hoc-commands'])
|
||||
args.extend(['--jobs', '--project-updates', '--inventory-updates', '--management-jobs', '--ad-hoc-commands', '--workflow-jobs'])
|
||||
if system_job.job_type == 'cleanup_facts':
|
||||
if 'older_than' in json_vars:
|
||||
args.extend(['--older_than', str(json_vars['older_than'])])
|
||||
|
||||
Reference in New Issue
Block a user