From ea8b78ca49c851f288553ee1127b40e16d6ca668 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 10 Apr 2017 13:08:34 -0400 Subject: [PATCH] Protect cluster nodes after an upgrade * Modify instance model to container a version number for the node * Update that version number during the heartbeat * If during a heartbeat any of the nodes are of a newer version then shutdown the current node. The idea behind this is that if all nodes were upgraded at the same time then at the moment of the healthcheck they should all be at the newer version. Otherwise we put the system in a state where it can receive the upgrade but stay down until that happens. During setup playbook run the services will be fully restarted. --- awx/api/views.py | 3 ++- .../migrations/0037_v313_instance_version.py | 19 ++++++++++++++++ awx/main/models/ha.py | 1 + awx/main/tasks.py | 22 ++++++++++++++++--- awx/main/utils/reload.py | 18 +++++++++------ 5 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 awx/main/migrations/0037_v313_instance_version.py diff --git a/awx/api/views.py b/awx/api/views.py index 33ef14826a..f159dc2e43 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -222,7 +222,8 @@ class ApiV1PingView(APIView): response['instances'] = [] for instance in Instance.objects.all(): - response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity)) + response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, + capacity=instance.capacity, version=instance.version)) response['instances'].sort() return Response(response) diff --git a/awx/main/migrations/0037_v313_instance_version.py b/awx/main/migrations/0037_v313_instance_version.py new file mode 100644 index 0000000000..64c520ea85 --- /dev/null +++ b/awx/main/migrations/0037_v313_instance_version.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0036_v311_insights'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='version', + field=models.CharField(max_length=24, blank=True), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index cb01d03722..2a75f1440a 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -26,6 +26,7 @@ class Instance(models.Model): hostname = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) + version = models.CharField(max_length=24, blank=True) capacity = models.PositiveIntegerField( default=100, editable=False, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d60d936531..aa5b9a0f0f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -21,6 +21,7 @@ import traceback import urlparse import uuid from distutils.version import LooseVersion as Version +from datetime import timedelta import yaml try: import psutil @@ -45,6 +46,7 @@ from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache # AWX +from awx import __version__ as tower_application_version from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob @@ -53,7 +55,7 @@ from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, get_system_task_capacity, OutputEventFilter, parse_yaml_or_json) -from awx.main.utils.reload import restart_local_services +from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification @@ -174,13 +176,27 @@ def purge_old_stdout_files(self): @task(bind=True) def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") + nowtime = now() inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) if inst.exists(): inst = inst[0] inst.capacity = get_system_task_capacity() + inst.version = tower_application_version inst.save() - return - raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID) + # IFF any node has a greater version than we do, then we'll shutdown services + for other_inst in recent_inst: + if other_inst.version == "": + continue + if Version(other_inst.version) > Version(tower_application_version): + logger.error("Host {} reports Tower version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, + other_inst.version, + inst.hostname, + inst.version)) + stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact']) + @task(bind=True, queue='default') diff --git a/awx/main/utils/reload.py b/awx/main/utils/reload.py index 729a33a703..6a88062ab5 100644 --- a/awx/main/utils/reload.py +++ b/awx/main/utils/reload.py @@ -14,12 +14,12 @@ from celery import current_app logger = logging.getLogger('awx.main.utils.reload') -def _uwsgi_reload(): +def _uwsgi_fifo_command(uwsgi_command): # http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands logger.warn('Initiating uWSGI chain reload of server') - TRIGGER_CHAIN_RELOAD = 'c' + TRIGGER_COMMAND = uwsgi_command with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo: - awxfifo.write(TRIGGER_CHAIN_RELOAD) + awxfifo.write(TRIGGER_COMMAND) def _reset_celery_thread_pool(): @@ -29,7 +29,7 @@ def _reset_celery_thread_pool(): destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) -def _supervisor_service_restart(service_internal_names): +def _supervisor_service_command(service_internal_names, command): ''' Service internal name options: - beat - celery - callback - channels - uwsgi - daphne @@ -46,7 +46,7 @@ def _supervisor_service_restart(service_internal_names): for n in service_internal_names: if n in name_translation_dict: programs.append('{}:{}'.format(group_name, name_translation_dict[n])) - args.extend(['restart']) + args.extend([command]) args.extend(programs) logger.debug('Issuing command to restart services, args={}'.format(args)) subprocess.Popen(args) @@ -55,14 +55,18 @@ def _supervisor_service_restart(service_internal_names): def restart_local_services(service_internal_names): logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names)) if 'uwsgi' in service_internal_names: - _uwsgi_reload() + _uwsgi_fifo_command(uwsgi_command='c') service_internal_names.remove('uwsgi') restart_celery = False if 'celery' in service_internal_names: restart_celery = True service_internal_names.remove('celery') - _supervisor_service_restart(service_internal_names) + _supervisor_service_command(service_internal_names, command='restart') if restart_celery: # Celery restarted last because this probably includes current process _reset_celery_thread_pool() + +def stop_local_services(service_internal_names): + logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names)) + _supervisor_service_command(service_internal_names, command='stop')