Beat and celery clustering fixes

* use embedded beat rather than standalone
* dynamically set celeryd hostname at runtime
* add embeded beat flag to celery startup
* Embedded beat mode routes will piggyback off of celery worker setup
signal
This commit is contained in:
Chris Meyers
2017-10-10 12:34:49 -04:00
committed by Matthew Jones
parent 624289bed7
commit 0e97dc4b84
4 changed files with 11 additions and 17 deletions

View File

@@ -26,7 +26,7 @@ except Exception:
# Celery
from celery import Task, shared_task
from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, beat_init
from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, celeryd_after_setup
# Django
from django.conf import settings
@@ -168,7 +168,6 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
.format(instance.hostname, removed_queues, added_queues))
@beat_init.connect
@celeryd_init.connect
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
conf = conf if conf else sender.app.conf
@@ -179,6 +178,13 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
@celeryd_after_setup.connect
def handle_update_celery_hostname(sender, instance, **kwargs):
tower_instance = Instance.objects.me()
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
logger.warn("Set hostname to {}".format(instance.hostname))
@shared_task(queue='tower', base=LogErrorsTask)
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):