Merge pull request #5987 from ansible/version_protection_check

Protect cluster nodes after an upgrade
This commit is contained in:
Matthew Jones
2017-04-11 10:37:29 -04:00
committed by GitHub
6 changed files with 61 additions and 20 deletions

View File

@@ -222,7 +222,8 @@ class ApiV1PingView(APIView):
response['instances'] = [] response['instances'] = []
for instance in Instance.objects.all(): for instance in Instance.objects.all():
response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity)) response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified,
capacity=instance.capacity, version=instance.version))
response['instances'].sort() response['instances'].sort()
return Response(response) return Response(response)

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0036_v311_insights'),
]
operations = [
migrations.AddField(
model_name='instance',
name='version',
field=models.CharField(max_length=24, blank=True),
),
]

View File

@@ -26,6 +26,7 @@ class Instance(models.Model):
hostname = models.CharField(max_length=250, unique=True) hostname = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True) created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True) modified = models.DateTimeField(auto_now=True)
version = models.CharField(max_length=24, blank=True)
capacity = models.PositiveIntegerField( capacity = models.PositiveIntegerField(
default=100, default=100,
editable=False, editable=False,

View File

@@ -21,6 +21,7 @@ import traceback
import urlparse import urlparse
import uuid import uuid
from distutils.version import LooseVersion as Version from distutils.version import LooseVersion as Version
from datetime import timedelta
import yaml import yaml
try: try:
import psutil import psutil
@@ -45,6 +46,7 @@ from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache from django.core.cache import cache
# AWX # AWX
from awx import __version__ as tower_application_version
from awx.main.constants import CLOUD_PROVIDERS from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # noqa from awx.main.models import * # noqa
from awx.main.models import UnifiedJob from awx.main.models import UnifiedJob
@@ -53,7 +55,7 @@ from awx.main.task_engine import TaskEnhancer
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot,
get_system_task_capacity, OutputEventFilter, parse_yaml_or_json) get_system_task_capacity, OutputEventFilter, parse_yaml_or_json)
from awx.main.utils.reload import restart_local_services from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.handlers import configure_external_logger from awx.main.utils.handlers import configure_external_logger
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
@@ -174,13 +176,27 @@ def purge_old_stdout_files(self):
@task(bind=True) @task(bind=True)
def cluster_node_heartbeat(self): def cluster_node_heartbeat(self):
logger.debug("Cluster node heartbeat task.") logger.debug("Cluster node heartbeat task.")
nowtime = now()
inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID)
if inst.exists(): if inst.exists():
inst = inst[0] inst = inst[0]
inst.capacity = get_system_task_capacity() inst.capacity = get_system_task_capacity()
inst.version = tower_application_version
inst.save() inst.save()
return else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID)
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in recent_inst:
if other_inst.version == "":
continue
if Version(other_inst.version) > Version(tower_application_version):
logger.error("Host {} reports Tower version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version,
inst.hostname,
inst.version))
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
@task(bind=True, queue='default') @task(bind=True, queue='default')

View File

@@ -4,7 +4,7 @@ from awx.main.utils import reload
def test_produce_supervisor_command(mocker): def test_produce_supervisor_command(mocker):
with mocker.patch.object(reload.subprocess, 'Popen'): with mocker.patch.object(reload.subprocess, 'Popen'):
reload._supervisor_service_restart(['beat', 'callback', 'fact']) reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart")
reload.subprocess.Popen.assert_called_once_with( reload.subprocess.Popen.assert_called_once_with(
['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher']) ['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'])
@@ -14,13 +14,13 @@ def test_routing_of_service_restarts_works(mocker):
This tests that the parent restart method will call the appropriate This tests that the parent restart method will call the appropriate
service restart methods, depending on which services are given in args service restart methods, depending on which services are given in args
''' '''
with mocker.patch.object(reload, '_uwsgi_reload'),\ with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
mocker.patch.object(reload, '_reset_celery_thread_pool'),\ mocker.patch.object(reload, '_reset_celery_thread_pool'),\
mocker.patch.object(reload, '_supervisor_service_restart'): mocker.patch.object(reload, '_supervisor_service_command'):
reload.restart_local_services(['uwsgi', 'celery', 'flower', 'daphne']) reload.restart_local_services(['uwsgi', 'celery', 'flower', 'daphne'])
reload._uwsgi_reload.assert_called_once_with() reload._uwsgi_fifo_command.assert_called_once_with(uwsgi_command="c")
reload._reset_celery_thread_pool.assert_called_once_with() reload._reset_celery_thread_pool.assert_called_once_with()
reload._supervisor_service_restart.assert_called_once_with(['flower', 'daphne']) reload._supervisor_service_command.assert_called_once_with(['flower', 'daphne'], command="restart")
@@ -28,11 +28,11 @@ def test_routing_of_service_restarts_diables(mocker):
''' '''
Test that methods are not called if not in the args Test that methods are not called if not in the args
''' '''
with mocker.patch.object(reload, '_uwsgi_reload'),\ with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
mocker.patch.object(reload, '_reset_celery_thread_pool'),\ mocker.patch.object(reload, '_reset_celery_thread_pool'),\
mocker.patch.object(reload, '_supervisor_service_restart'): mocker.patch.object(reload, '_supervisor_service_command'):
reload.restart_local_services(['flower']) reload.restart_local_services(['flower'])
reload._uwsgi_reload.assert_not_called() reload._uwsgi_fifo_command.assert_not_called()
reload._reset_celery_thread_pool.assert_not_called() reload._reset_celery_thread_pool.assert_not_called()
reload._supervisor_service_restart.assert_called_once_with(['flower']) reload._supervisor_service_command.assert_called_once_with(['flower'], command="restart")

View File

@@ -14,12 +14,12 @@ from celery import current_app
logger = logging.getLogger('awx.main.utils.reload') logger = logging.getLogger('awx.main.utils.reload')
def _uwsgi_reload(): def _uwsgi_fifo_command(uwsgi_command):
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands # http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
logger.warn('Initiating uWSGI chain reload of server') logger.warn('Initiating uWSGI chain reload of server')
TRIGGER_CHAIN_RELOAD = 'c' TRIGGER_COMMAND = uwsgi_command
with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo: with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo:
awxfifo.write(TRIGGER_CHAIN_RELOAD) awxfifo.write(TRIGGER_COMMAND)
def _reset_celery_thread_pool(): def _reset_celery_thread_pool():
@@ -29,7 +29,7 @@ def _reset_celery_thread_pool():
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
def _supervisor_service_restart(service_internal_names): def _supervisor_service_command(service_internal_names, command):
''' '''
Service internal name options: Service internal name options:
- beat - celery - callback - channels - uwsgi - daphne - beat - celery - callback - channels - uwsgi - daphne
@@ -46,7 +46,7 @@ def _supervisor_service_restart(service_internal_names):
for n in service_internal_names: for n in service_internal_names:
if n in name_translation_dict: if n in name_translation_dict:
programs.append('{}:{}'.format(group_name, name_translation_dict[n])) programs.append('{}:{}'.format(group_name, name_translation_dict[n]))
args.extend(['restart']) args.extend([command])
args.extend(programs) args.extend(programs)
logger.debug('Issuing command to restart services, args={}'.format(args)) logger.debug('Issuing command to restart services, args={}'.format(args))
subprocess.Popen(args) subprocess.Popen(args)
@@ -55,14 +55,18 @@ def _supervisor_service_restart(service_internal_names):
def restart_local_services(service_internal_names): def restart_local_services(service_internal_names):
logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names)) logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names))
if 'uwsgi' in service_internal_names: if 'uwsgi' in service_internal_names:
_uwsgi_reload() _uwsgi_fifo_command(uwsgi_command='c')
service_internal_names.remove('uwsgi') service_internal_names.remove('uwsgi')
restart_celery = False restart_celery = False
if 'celery' in service_internal_names: if 'celery' in service_internal_names:
restart_celery = True restart_celery = True
service_internal_names.remove('celery') service_internal_names.remove('celery')
_supervisor_service_restart(service_internal_names) _supervisor_service_command(service_internal_names, command='restart')
if restart_celery: if restart_celery:
# Celery restarted last because this probably includes current process # Celery restarted last because this probably includes current process
_reset_celery_thread_pool() _reset_celery_thread_pool()
def stop_local_services(service_internal_names):
logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names))
_supervisor_service_command(service_internal_names, command='stop')