mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
consolidate celery init signals to avoid an instance registration race
This commit is contained in:
@@ -32,7 +32,7 @@ except Exception:
|
|||||||
from kombu import Queue, Exchange
|
from kombu import Queue, Exchange
|
||||||
from kombu.common import Broadcast
|
from kombu.common import Broadcast
|
||||||
from celery import Task, shared_task
|
from celery import Task, shared_task
|
||||||
from celery.signals import celeryd_init, worker_shutdown, celeryd_after_setup
|
from celery.signals import celeryd_init, worker_shutdown
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -108,6 +108,31 @@ def log_celery_failure(self, exc, task_id, args, kwargs, einfo):
|
|||||||
|
|
||||||
@celeryd_init.connect
|
@celeryd_init.connect
|
||||||
def celery_startup(conf=None, **kwargs):
|
def celery_startup(conf=None, **kwargs):
|
||||||
|
#
|
||||||
|
# When celeryd starts, if the instance cannot be found in the database,
|
||||||
|
# automatically register it. This is mostly useful for openshift-based
|
||||||
|
# deployments where:
|
||||||
|
#
|
||||||
|
# 2 Instances come online
|
||||||
|
# Instance B encounters a network blip, Instance A notices, and
|
||||||
|
# deprovisions it
|
||||||
|
# Instance B's connectivity is restored, celeryd starts, and it
|
||||||
|
# re-registers itself
|
||||||
|
#
|
||||||
|
# In traditional container-less deployments, instances don't get
|
||||||
|
# deprovisioned when they miss their heartbeat, so this code is mostly a
|
||||||
|
# no-op.
|
||||||
|
#
|
||||||
|
if kwargs['instance'].hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID):
|
||||||
|
error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format(
|
||||||
|
instance.hostname, settings.CLUSTER_HOST_ID
|
||||||
|
)
|
||||||
|
logger.error(error)
|
||||||
|
raise RuntimeError(error)
|
||||||
|
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||||
|
if changed:
|
||||||
|
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
||||||
|
|
||||||
startup_logger = logging.getLogger('awx.main.tasks')
|
startup_logger = logging.getLogger('awx.main.tasks')
|
||||||
startup_logger.info("Syncing Schedules")
|
startup_logger.info("Syncing Schedules")
|
||||||
for sch in Schedule.objects.all():
|
for sch in Schedule.objects.all():
|
||||||
@@ -268,34 +293,6 @@ def handle_setting_changes(self, setting_keys):
|
|||||||
cache.delete_many(cache_keys)
|
cache.delete_many(cache_keys)
|
||||||
|
|
||||||
|
|
||||||
@celeryd_after_setup.connect
|
|
||||||
def auto_register_ha_instance(sender, instance, **kwargs):
|
|
||||||
#
|
|
||||||
# When celeryd starts, if the instance cannot be found in the database,
|
|
||||||
# automatically register it. This is mostly useful for openshift-based
|
|
||||||
# deployments where:
|
|
||||||
#
|
|
||||||
# 2 Instances come online
|
|
||||||
# Instance B encounters a network blip, Instance A notices, and
|
|
||||||
# deprovisions it
|
|
||||||
# Instance B's connectivity is restored, celeryd starts, and it
|
|
||||||
# re-registers itself
|
|
||||||
#
|
|
||||||
# In traditional container-less deployments, instances don't get
|
|
||||||
# deprovisioned when they miss their heartbeat, so this code is mostly a
|
|
||||||
# no-op.
|
|
||||||
#
|
|
||||||
if instance.hostname != 'celery@{}'.format(settings.CLUSTER_HOST_ID):
|
|
||||||
error = six.text_type('celery -n {} does not match settings.CLUSTER_HOST_ID={}').format(
|
|
||||||
instance.hostname, settings.CLUSTER_HOST_ID
|
|
||||||
)
|
|
||||||
logger.error(error)
|
|
||||||
raise RuntimeError(error)
|
|
||||||
(changed, tower_instance) = Instance.objects.get_or_register()
|
|
||||||
if changed:
|
|
||||||
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
@shared_task(queue=settings.CELERY_DEFAULT_QUEUE)
|
||||||
def send_notifications(notification_list, job_id=None):
|
def send_notifications(notification_list, job_id=None):
|
||||||
if not isinstance(notification_list, list):
|
if not isinstance(notification_list, list):
|
||||||
|
|||||||
Reference in New Issue
Block a user