From 7ce8907b7b49df5fe9375a81de166bc9f0ce45b2 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 27 Mar 2018 13:51:35 -0400 Subject: [PATCH] reregister node when they come back online * Nodes are marked offline, then deleted; given enough time. Nodes can come back for various reasions (i.e. netsplit). When they come back, have them recreate the node Instance if AWX_AUTO_DEPROVISION_INSTANCES is True. Otherwise, do nothing. The do nothing case will show up in the logs as celery job tracebacks as they fail to be self aware. --- .../management/commands/provision_instance.py | 16 ++++++---------- awx/main/managers.py | 15 +++++++++++++++ awx/main/tasks.py | 16 +++++++++++++--- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/awx/main/management/commands/provision_instance.py b/awx/main/management/commands/provision_instance.py index 4b2ef8f220..e9458ac120 100644 --- a/awx/main/management/commands/provision_instance.py +++ b/awx/main/management/commands/provision_instance.py @@ -2,7 +2,6 @@ # All Rights Reserved from awx.main.models import Instance -from awx.main.utils.pglock import advisory_lock from django.conf import settings from django.db import transaction @@ -27,15 +26,12 @@ class Command(BaseCommand): def _register_hostname(self, hostname): if not hostname: return - with advisory_lock('instance_registration_%s' % hostname): - instance = Instance.objects.filter(hostname=hostname) - if instance.exists(): - print("Instance already registered {}".format(instance[0].hostname)) - return - instance = Instance(uuid=self.uuid, hostname=hostname) - instance.save() - print('Successfully registered instance {}'.format(hostname)) - self.changed = True + (changed, instance) = Instance.objects.register(uuid=self.uuid, hostname=hostname) + if changed: + print('Successfully registered instance {}'.format(hostname)) + else: + print("Instance already registered {}".format(instance.hostname)) + self.changed = changed @transaction.atomic def handle(self, **options): diff --git a/awx/main/managers.py b/awx/main/managers.py index 70c402f672..1683b38c4e 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -8,6 +8,7 @@ from django.db import models from django.conf import settings from awx.main.utils.filters import SmartFilter +from awx.main.utils.pglock import advisory_lock ___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager'] @@ -86,6 +87,20 @@ class InstanceManager(models.Manager): return node[0] raise RuntimeError("No instance found with the current cluster host id") + def register(self, uuid=settings.SYSTEM_UUID, hostname=settings.CLUSTER_HOST_ID): + with advisory_lock('instance_registration_%s' % hostname): + instance = self.filter(hostname=hostname) + if instance.exists(): + return (False, instance[0]) + instance = self.create(uuid=uuid, hostname=hostname) + return (True, instance) + + def get_or_register(self): + if settings.AWX_AUTO_DEPROVISION_INSTANCES: + return self.register() + else: + return (False, self.me()) + def active_count(self): """Return count of active Tower nodes for licensing.""" return self.all().count() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c47101b988..ccef351626 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -204,7 +204,9 @@ def handle_setting_changes(self, setting_keys): @shared_task(bind=True, queue='tower_broadcast_all', base=LogErrorsTask) def handle_ha_toplogy_changes(self): - instance = Instance.objects.me() + (changed, instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname)) logger.debug(six.text_type("Reconfigure celeryd queues task on host {}").format(self.request.hostname)) awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings') @@ -234,7 +236,9 @@ def handle_ha_toplogy_worker_ready(sender, **kwargs): def handle_update_celery_routes(sender=None, conf=None, **kwargs): conf = conf if conf else sender.app.conf logger.debug(six.text_type("Registering celery routes for {}").format(sender)) - instance = Instance.objects.me() + (changed, instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname)) added_routes = update_celery_worker_routes(instance, conf) logger.info(six.text_type("Workers on tower node '{}' added routes {} all routes are now {}") .format(instance.hostname, added_routes, conf.CELERY_ROUTES)) @@ -242,7 +246,9 @@ def handle_update_celery_routes(sender=None, conf=None, **kwargs): @celeryd_after_setup.connect def handle_update_celery_hostname(sender, instance, **kwargs): - tower_instance = Instance.objects.me() + (changed, tower_instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(tower_instance.hostname)) instance.hostname = 'celery@{}'.format(tower_instance.hostname) logger.warn(six.text_type("Set hostname to {}").format(instance.hostname)) @@ -310,6 +316,10 @@ def cluster_node_heartbeat(self): this_inst = None lost_instances = [] + (changed, instance) = Instance.objects.get_or_register() + if changed: + logger.info(six.text_type("Registered tower node '{}'").format(instance.hostname)) + for inst in list(instance_list): if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst