Protect cluster nodes after an upgrade

* Modify instance model to container a version number for the node
* Update that version number during the heartbeat
* If during a heartbeat any of the nodes are of a newer version then
  shutdown the current node.

The idea behind this is that if all nodes were upgraded at the same
time then at the moment of the healthcheck they should all be at the
newer version. Otherwise we put the system in a state where it can
receive the upgrade but stay down until that happens. During setup
playbook run the services will be fully restarted.
This commit is contained in:
Matthew Jones 2017-04-10 13:08:34 -04:00
parent f0102ef0ff
commit ea8b78ca49
5 changed files with 52 additions and 11 deletions

View File

@ -222,7 +222,8 @@ class ApiV1PingView(APIView):
response['instances'] = []
for instance in Instance.objects.all():
response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity))
response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified,
capacity=instance.capacity, version=instance.version))
response['instances'].sort()
return Response(response)

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0036_v311_insights'),
]
operations = [
migrations.AddField(
model_name='instance',
name='version',
field=models.CharField(max_length=24, blank=True),
),
]

View File

@ -26,6 +26,7 @@ class Instance(models.Model):
hostname = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
version = models.CharField(max_length=24, blank=True)
capacity = models.PositiveIntegerField(
default=100,
editable=False,

View File

@ -21,6 +21,7 @@ import traceback
import urlparse
import uuid
from distutils.version import LooseVersion as Version
from datetime import timedelta
import yaml
try:
import psutil
@ -45,6 +46,7 @@ from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
# AWX
from awx import __version__ as tower_application_version
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa
from awx.main.models import UnifiedJob
@ -53,7 +55,7 @@ from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
get_system_task_capacity, OutputEventFilter, parse_yaml_or_json)
from awx.main.utils.reload import restart_local_services
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification
@ -174,13 +176,27 @@ def purge_old_stdout_files(self):
@task(bind=True)
def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.")
nowtime = now()
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
if inst.exists():
inst = inst[0]
inst.capacity = get_system_task_capacity()
inst.version = tower_application_version
inst.save()
return
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID)
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in recent_inst:
if other_inst.version == "":
continue
if Version(other_inst.version) > Version(tower_application_version):
logger.error("Host {} reports Tower version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version,
inst.hostname,
inst.version))
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
@task(bind=True, queue='default')

View File

@ -14,12 +14,12 @@ from celery import current_app
logger = logging.getLogger('awx.main.utils.reload')
def _uwsgi_reload():
def _uwsgi_fifo_command(uwsgi_command):
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
logger.warn('Initiating uWSGI chain reload of server')
TRIGGER_CHAIN_RELOAD = 'c'
TRIGGER_COMMAND = uwsgi_command
with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo:
awxfifo.write(TRIGGER_CHAIN_RELOAD)
awxfifo.write(TRIGGER_COMMAND)
def _reset_celery_thread_pool():
@ -29,7 +29,7 @@ def _reset_celery_thread_pool():
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
def _supervisor_service_restart(service_internal_names):
def _supervisor_service_command(service_internal_names, command):
'''
Service internal name options:
- beat - celery - callback - channels - uwsgi - daphne
@ -46,7 +46,7 @@ def _supervisor_service_restart(service_internal_names):
for n in service_internal_names:
if n in name_translation_dict:
programs.append('{}:{}'.format(group_name, name_translation_dict[n]))
args.extend(['restart'])
args.extend([command])
args.extend(programs)
logger.debug('Issuing command to restart services, args={}'.format(args))
subprocess.Popen(args)
@ -55,14 +55,18 @@ def _supervisor_service_restart(service_internal_names):
def restart_local_services(service_internal_names):
logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names))
if 'uwsgi' in service_internal_names:
_uwsgi_reload()
_uwsgi_fifo_command(uwsgi_command='c')
service_internal_names.remove('uwsgi')
restart_celery = False
if 'celery' in service_internal_names:
restart_celery = True
service_internal_names.remove('celery')
_supervisor_service_restart(service_internal_names)
_supervisor_service_command(service_internal_names, command='restart')
if restart_celery:
# Celery restarted last because this probably includes current process
_reset_celery_thread_pool()
def stop_local_services(service_internal_names):
logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names))
_supervisor_service_command(service_internal_names, command='stop')