Implement cluster health checks

* Add a local node queue to execute targeted jobs
* Add a setting for active cluster node id (per-node)
* Base the heartbeat time on the `modified` time on the Instance table
* Add periodic task that calls save() on the instance to update the
  heartbeat time if services are up
* Purge/update any ha/instance management commands
* Fix up CELERY_ROUTES settings data structure
This commit is contained in:
Matthew Jones
2016-10-06 16:05:39 -04:00
parent aabbd48d17
commit babe29ebfa
11 changed files with 43 additions and 243 deletions

View File

@@ -400,7 +400,7 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \ . $(VENV_BASE)/tower/bin/activate; \
fi; \ fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler,$(COMPOSE_HOST)
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver # Run to start the zeromq callback receiver

View File

@@ -169,11 +169,12 @@ class ApiV1PingView(APIView):
response = { response = {
'ha': is_ha_environment(), 'ha': is_ha_environment(),
'version': get_awx_version(), 'version': get_awx_version(),
'active_node': settings.CLUSTER_HOST_ID,
} }
response['instances'] = [] response['instances'] = []
for instance in Instance.objects.all(): for instance in Instance.objects.all():
response['instances'].append(instance.hostname) response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified))
response['instances'].sort() response['instances'].sort()
return Response(response) return Response(response)

View File

@@ -1,112 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import socket
from optparse import make_option
from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
class OptionEnforceError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class BaseCommandInstance(BaseCommand):
#option_list = BaseCommand.option_list
def __init__(self):
super(BaseCommandInstance, self).__init__()
self.enforce_hostname_set = False
self.enforce_unique_find = False
self.option_hostname = None
self.option_uuid = None
self.UUID = settings.SYSTEM_UUID
self.unique_fields = {}
@staticmethod
def generate_option_hostname():
return make_option('--hostname',
dest='hostname',
default=socket.gethostname(),
help='Find instance by specified hostname.')
@staticmethod
def generate_option_hostname_set():
return make_option('--hostname',
dest='hostname',
default=socket.gethostname(),
help='Hostname to assign to the new instance.')
@staticmethod
def generate_option_uuid():
#TODO: Likely deprecated, maybe uuid becomes the cluster ident?
return make_option('--uuid',
dest='uuid',
default='',
help='Find instance by specified uuid.')
def include_option_hostname_set(self):
BaseCommand.option_list += ( BaseCommandInstance.generate_option_hostname_set(), )
self.enforce_hostname_set = True
def include_option_hostname_uuid_find(self):
BaseCommand.option_list += ( BaseCommandInstance.generate_option_hostname(), BaseCommandInstance.generate_option_uuid(), )
self.enforce_unique_find = True
def get_option_hostname(self):
return self.option_hostname
def get_option_uuid(self):
return self.option_uuid
def get_UUID(self):
return self.UUID
# for the enforce_unique_find policy
def get_unique_fields(self):
return self.unique_fields
@property
def usage_error(self):
if self.enforce_hostname_set:
return CommandError('--hostname is required.')
def handle(self, *args, **options):
if self.enforce_hostname_set and self.enforce_unique_find:
raise OptionEnforceError('Can not enforce --hostname as a setter and --hostname as a getter')
if self.enforce_hostname_set:
if options['hostname']:
self.option_hostname = options['hostname']
else:
raise self.usage_error
if self.enforce_unique_find:
if options['hostname']:
self.unique_fields['hostname'] = self.option_hostname = options['hostname']
if options['uuid']:
self.unique_fields['uuid'] = self.option_uuid = options['uuid']
if len(self.unique_fields) == 0:
self.unique_fields['uuid'] = self.get_UUID()
@staticmethod
def __instance_str(instance, fields):
string = '('
for field in fields:
string += '%s="%s",' % (field, getattr(instance, field))
if len(fields) > 0:
string = string[:-1]
string += ')'
return string
@staticmethod
def instance_str(instance):
return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname'))

View File

@@ -1,12 +1,10 @@
# Copyright (c) 2015 Ansible, Inc. # Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved # All Rights Reserved
from awx.main.management.commands._base_instance import BaseCommandInstance
from awx.main.models import Instance from awx.main.models import Instance
from django.core.management.base import NoArgsCommand
instance_str = BaseCommandInstance.instance_str class Command(NoArgsCommand):
class Command(BaseCommandInstance):
"""List instances from the Tower database """List instances from the Tower database
""" """
@@ -14,5 +12,4 @@ class Command(BaseCommandInstance):
super(Command, self).__init__() super(Command, self).__init__()
for instance in Instance.objects.all(): for instance in Instance.objects.all():
print("uuid: %s; hostname: %s; primary: %s; created: %s; modified: %s" % print("hostname: {}; created: {}; heartbeat: {}".format(instance.hostname, instance.created, instance.modified))
(instance.uuid, instance.hostname, instance.primary, instance.created, instance.modified))

View File

@@ -1,31 +1,30 @@
# Copyright (c) 2015 Ansible, Inc. # Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved # All Rights Reserved
from awx.main.management.commands._base_instance import BaseCommandInstance
from awx.main.models import Instance from awx.main.models import Instance
from django.conf import settings
instance_str = BaseCommandInstance.instance_str from django.core.management.base import CommandError, NoArgsCommand
class Command(BaseCommandInstance): class Command(NoArgsCommand):
""" """
Internal tower command. Internal tower command.
Regsiter this instance with the database for HA tracking. Regsiter this instance with the database for HA tracking.
This command is idempotent.
""" """
def __init__(self):
super(Command, self).__init__() option_list = NoArgsCommand.option_list + (
self.include_option_hostname_set() make_option('--hostname', dest='hostname', type='string',
help='Hostname used during provisioning')
)
def handle(self, *args, **options): def handle(self, *args, **options):
super(Command, self).handle(*args, **options) super(Command, self).handle(**options)
uuid = settings.SYSTEM_UUID
uuid = self.get_UUID() instance = Instance.objects.filter(hostname=options.get('hostname'))
instance = Instance.objects.filter(hostname=self.get_option_hostname())
if instance.exists(): if instance.exists():
print("Instance already registered %s" % instance_str(instance[0])) print("Instance already registered %s" % instance_str(instance[0]))
return return
instance = Instance(uuid=uuid, hostname=self.get_option_hostname()) instance = Instance(uuid=uuid, hostname=options.get('hostname'))
instance.save() instance.save()
print('Successfully registered instance %s.' % instance_str(instance)) print('Successfully registered instance %s.' % instance_str(instance))

View File

@@ -1,43 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from django.core.management.base import CommandError
from awx.main.management.commands._base_instance import BaseCommandInstance
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""Internal tower command.
Remove an existing instance from the HA instance table.
This command is idempotent.
This command will error out in the following conditions:
* Attempting to remove a primary instance.
"""
def __init__(self):
super(Command, self).__init__()
self.include_option_hostname_uuid_find()
def handle(self, *args, **options):
super(Command, self).handle(*args, **options)
# Is there an existing record for this machine? If so, retrieve that record and look for issues.
try:
# Get the instance.
instance = Instance.objects.get(**self.get_unique_fields())
# Sanity check: Do not remove the primary instance.
if instance.primary:
raise CommandError('Cannot remove primary instance %s. Another instance must be promoted to primary first.' % instance_str(instance))
# Remove the instance.
instance.delete()
print('Successfully removed instance %s.' % instance_str(instance))
except Instance.DoesNotExist:
print('No matching instance found to remove.')

View File

@@ -1,65 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from django.core.management.base import CommandError
from django.db import transaction
from awx.main.management.commands._base_instance import BaseCommandInstance
from awx.conf.license import feature_enabled
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""Set an already registered instance to primary or secondary for HA
tracking.
This command is idempotent. Settings a new primary instance when a
primary instance already exists will result in the existing primary
instance set to secondary and the new primary set to primary.
This command will error out under the following circumstances:
* Attempting to update a secondary instance with no primary instances.
* When a matching instance is not found.
"""
def __init__(self):
super(Command, self).__init__()
self.include_option_primary_role()
self.include_option_hostname_uuid_find()
@transaction.atomic
def handle(self, *args, **options):
super(Command, self).handle(*args, **options)
# You can only promote/demote if your license allows HA
if not feature_enabled('ha'):
raise CommandError('Your Tower license does not permit promoting a secondary instance')
# Is there an existing record for this machine? If so, retrieve that record and look for issues.
try:
instance = Instance.objects.get(**self.get_unique_fields())
except Instance.DoesNotExist:
raise CommandError('No matching instance found to update.')
# Get a status on primary machines (excluding this one, regardless of its status).
other_instances = Instance.objects.exclude(**self.get_unique_fields())
primaries = other_instances.filter(primary=True).count()
# If this is a primary machine and there is another primary machine, it must be de-primary-ified.
if self.is_option_primary() and primaries:
for old_primary in other_instances.filter(primary=True):
old_primary.primary = False
old_primary.save()
# Okay, we've checked for appropriate errata; perform the registration.
instance.primary = self.is_option_primary()
instance.save()
# If this is a primary instance, update projects.
if self.is_option_primary():
self.update_projects(instance)
# Done!
print('Successfully updated instance role %s' % instance_str(instance))

View File

@@ -125,6 +125,15 @@ def run_administrative_checks(self):
def cleanup_authtokens(self): def cleanup_authtokens(self):
AuthToken.objects.filter(expires__lt=now()).delete() AuthToken.objects.filter(expires__lt=now()).delete()
@task(bind=True)
def cluster_node_heartbeat(self):
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
if inst.exists():
inst = inst[0]
inst.save()
return
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
@task(bind=True, queue='default') @task(bind=True, queue='default')
def tower_periodic_scheduler(self): def tower_periodic_scheduler(self):
def get_last_run(): def get_last_run():
@@ -154,6 +163,7 @@ def tower_periodic_scheduler(self):
# Sanity check: If this is a secondary machine, there is nothing # Sanity check: If this is a secondary machine, there is nothing
# on the schedule. # on the schedule.
# TODO: Fix for clustering/ha
if Instance.objects.my_role() == 'secondary': if Instance.objects.my_role() == 'secondary':
return return

View File

@@ -359,7 +359,7 @@ CELERY_QUEUES = (
# Projects use a fanout queue, this isn't super well supported # Projects use a fanout queue, this isn't super well supported
Broadcast('projects'), Broadcast('projects'),
) )
CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs',
'routing_key': 'jobs'}, 'routing_key': 'jobs'},
'awx.main.tasks.run_project_update': {'queue': 'projects'}, 'awx.main.tasks.run_project_update': {'queue': 'projects'},
'awx.main.tasks.run_inventory_update': {'queue': 'jobs', 'awx.main.tasks.run_inventory_update': {'queue': 'jobs',
@@ -371,7 +371,10 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs',
'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler',
'routing_key': 'scheduler.job.launch'}, 'routing_key': 'scheduler.job.launch'},
'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler',
'routing_key': 'scheduler.job.complete'},}) 'routing_key': 'scheduler.job.complete'},
'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default',
'routing_key': 'cluster.heartbeat'},
}
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {
'tower_scheduler': { 'tower_scheduler': {
@@ -386,6 +389,10 @@ CELERYBEAT_SCHEDULE = {
'task': 'awx.main.tasks.cleanup_authtokens', 'task': 'awx.main.tasks.cleanup_authtokens',
'schedule': timedelta(days=30) 'schedule': timedelta(days=30)
}, },
'cluster_heartbeat': {
'task': 'awx.main.tasks.cluster_node_heartbeat',
'schedule': timedelta(seconds=60)
},
} }
# Django Caching Configuration # Django Caching Configuration

View File

@@ -4,6 +4,7 @@
# Development settings for AWX project. # Development settings for AWX project.
# Python # Python
import socket
import copy import copy
import sys import sys
import traceback import traceback
@@ -106,3 +107,7 @@ try:
except ImportError: except ImportError:
traceback.print_exc() traceback.print_exc()
sys.exit(1) sys.exit(1)
CLUSTER_HOST_ID = socket.gethostname()
CELERY_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}

View File

@@ -10,6 +10,7 @@ ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=${RABBITMQ_HOST} port=5
# TODO: FIX # TODO: FIX
#/etc/init.d/ssh start #/etc/init.d/ssh start
ansible -i "127.0.0.1," -c local -v -m postgresql_user -U postgres -a "name=awx-dev password=AWXsome1 login_user=postgres login_host=postgres" all ansible -i "127.0.0.1," -c local -v -m postgresql_user -U postgres -a "name=awx-dev password=AWXsome1 login_user=postgres login_host=postgres" all
ansible -i "127.0.0.1," -c local -v -m postgresql_db -U postgres -a "name=awx-dev owner=awx-dev login_user=postgres login_host=postgres" all ansible -i "127.0.0.1," -c local -v -m postgresql_db -U postgres -a "name=awx-dev owner=awx-dev login_user=postgres login_host=postgres" all