mirror of
https://github.com/ansible/awx.git
synced 2026-03-06 11:11:07 -03:30
reregister node when they come back online
* Nodes are marked offline, then deleted; given enough time. Nodes can come back for various reasions (i.e. netsplit). When they come back, have them recreate the node Instance if AWX_AUTO_DEPROVISION_INSTANCES is True. Otherwise, do nothing. The do nothing case will show up in the logs as celery job tracebacks as they fail to be self aware.
This commit is contained in:
@@ -2,7 +2,6 @@
|
|||||||
# All Rights Reserved
|
# All Rights Reserved
|
||||||
|
|
||||||
from awx.main.models import Instance
|
from awx.main.models import Instance
|
||||||
from awx.main.utils.pglock import advisory_lock
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
@@ -27,15 +26,12 @@ class Command(BaseCommand):
|
|||||||
def _register_hostname(self, hostname):
|
def _register_hostname(self, hostname):
|
||||||
if not hostname:
|
if not hostname:
|
||||||
return
|
return
|
||||||
with advisory_lock('instance_registration_%s' % hostname):
|
(changed, instance) = Instance.objects.register(uuid=self.uuid, hostname=hostname)
|
||||||
instance = Instance.objects.filter(hostname=hostname)
|
if changed:
|
||||||
if instance.exists():
|
print('Successfully registered instance {}'.format(hostname))
|
||||||
print("Instance already registered {}".format(instance[0].hostname))
|
else:
|
||||||
return
|
print("Instance already registered {}".format(instance.hostname))
|
||||||
instance = Instance(uuid=self.uuid, hostname=hostname)
|
self.changed = changed
|
||||||
instance.save()
|
|
||||||
print('Successfully registered instance {}'.format(hostname))
|
|
||||||
self.changed = True
|
|
||||||
|
|
||||||
@transaction.atomic
|
@transaction.atomic
|
||||||
def handle(self, **options):
|
def handle(self, **options):
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ from django.db import models
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
from awx.main.utils.filters import SmartFilter
|
from awx.main.utils.filters import SmartFilter
|
||||||
|
from awx.main.utils.pglock import advisory_lock
|
||||||
|
|
||||||
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager']
|
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager']
|
||||||
|
|
||||||
@@ -86,6 +87,20 @@ class InstanceManager(models.Manager):
|
|||||||
return node[0]
|
return node[0]
|
||||||
raise RuntimeError("No instance found with the current cluster host id")
|
raise RuntimeError("No instance found with the current cluster host id")
|
||||||
|
|
||||||
|
def register(self, uuid=settings.SYSTEM_UUID, hostname=settings.CLUSTER_HOST_ID):
|
||||||
|
with advisory_lock('instance_registration_%s' % hostname):
|
||||||
|
instance = self.filter(hostname=hostname)
|
||||||
|
if instance.exists():
|
||||||
|
return (False, instance[0])
|
||||||
|
instance = self.create(uuid=uuid, hostname=hostname)
|
||||||
|
return (True, instance)
|
||||||
|
|
||||||
|
def get_or_register(self):
|
||||||
|
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
|
return self.register()
|
||||||
|
else:
|
||||||
|
return (False, self.me())
|
||||||
|
|
||||||
def active_count(self):
|
def active_count(self):
|
||||||
"""Return count of active Tower nodes for licensing."""
|
"""Return count of active Tower nodes for licensing."""
|
||||||
return self.all().count()
|
return self.all().count()
|
||||||
|
|||||||
@@ -204,7 +204,9 @@ def handle_setting_changes(self, setting_keys):
|
|||||||
|
|
||||||
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
|
@shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask)
|
||||||
def handle_ha_toplogy_changes(self):
|
def handle_ha_toplogy_changes(self):
|
||||||
instance = Instance.objects.me()
|
(changed, instance) = Instance.objects.get_or_register()
|
||||||
|
if changed:
|
||||||
|
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
||||||
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname))
|
||||||
awx_app = Celery('awx')
|
awx_app = Celery('awx')
|
||||||
awx_app.config_from_object('django.conf:settings')
|
awx_app.config_from_object('django.conf:settings')
|
||||||
@@ -234,7 +236,9 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
|||||||
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
||||||
conf = conf if conf else sender.app.conf
|
conf = conf if conf else sender.app.conf
|
||||||
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
|
logger.debug(six.text_type("Registering celery routes for {}").format(sender))
|
||||||
instance = Instance.objects.me()
|
(changed, instance) = Instance.objects.get_or_register()
|
||||||
|
if changed:
|
||||||
|
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
||||||
added_routes = update_celery_worker_routes(instance, conf)
|
added_routes = update_celery_worker_routes(instance, conf)
|
||||||
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
|
logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}")
|
||||||
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
.format(instance.hostname, added_routes, conf.CELERY_ROUTES))
|
||||||
@@ -242,7 +246,9 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs):
|
|||||||
|
|
||||||
@celeryd_after_setup.connect
|
@celeryd_after_setup.connect
|
||||||
def handle_update_celery_hostname(sender, instance, **kwargs):
|
def handle_update_celery_hostname(sender, instance, **kwargs):
|
||||||
tower_instance = Instance.objects.me()
|
(changed, tower_instance) = Instance.objects.get_or_register()
|
||||||
|
if changed:
|
||||||
|
logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname))
|
||||||
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
instance.hostname = 'celery@{}'.format(tower_instance.hostname)
|
||||||
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
logger.warn(six.text_type("Set hostname to {}").format(instance.hostname))
|
||||||
|
|
||||||
@@ -310,6 +316,10 @@ def cluster_node_heartbeat(self):
|
|||||||
this_inst = None
|
this_inst = None
|
||||||
lost_instances = []
|
lost_instances = []
|
||||||
|
|
||||||
|
(changed, instance) = Instance.objects.get_or_register()
|
||||||
|
if changed:
|
||||||
|
logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname))
|
||||||
|
|
||||||
for inst in list(instance_list):
|
for inst in list(instance_list):
|
||||||
if inst.hostname == settings.CLUSTER_HOST_ID:
|
if inst.hostname == settings.CLUSTER_HOST_ID:
|
||||||
this_inst = inst
|
this_inst = inst
|
||||||
|
|||||||
Reference in New Issue
Block a user