diff --git a/awx/api/views.py b/awx/api/views.py index 33ef14826a..f159dc2e43 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -222,7 +222,8 @@ class ApiV1PingView(APIView): response['instances'] = [] for instance in Instance.objects.all(): - response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity)) + response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, + capacity=instance.capacity, version=instance.version)) response['instances'].sort() return Response(response) diff --git a/awx/main/migrations/0037_v313_instance_version.py b/awx/main/migrations/0037_v313_instance_version.py new file mode 100644 index 0000000000..64c520ea85 --- /dev/null +++ b/awx/main/migrations/0037_v313_instance_version.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0036_v311_insights'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='version', + field=models.CharField(max_length=24, blank=True), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index cb01d03722..2a75f1440a 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -26,6 +26,7 @@ class Instance(models.Model): hostname = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) + version = models.CharField(max_length=24, blank=True) capacity = models.PositiveIntegerField( default=100, editable=False, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d60d936531..aa5b9a0f0f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -21,6 +21,7 @@ import traceback import urlparse import uuid from distutils.version import LooseVersion as Version +from datetime import timedelta import yaml try: import psutil @@ -45,6 +46,7 @@ from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache # AWX +from awx import __version__ as tower_application_version from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob @@ -53,7 +55,7 @@ from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, get_system_task_capacity, OutputEventFilter, parse_yaml_or_json) -from awx.main.utils.reload import restart_local_services +from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.handlers import configure_external_logger from awx.main.consumers import emit_channel_notification @@ -174,13 +176,27 @@ def purge_old_stdout_files(self): @task(bind=True) def cluster_node_heartbeat(self): logger.debug("Cluster node heartbeat task.") + nowtime = now() inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) if inst.exists(): inst = inst[0] inst.capacity = get_system_task_capacity() + inst.version = tower_application_version inst.save() - return - raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + recent_inst = Instance.objects.filter(modified__gt=nowtime - timedelta(seconds=70)).exclude(hostname=settings.CLUSTER_HOST_ID) + # IFF any node has a greater version than we do, then we'll shutdown services + for other_inst in recent_inst: + if other_inst.version == "": + continue + if Version(other_inst.version) > Version(tower_application_version): + logger.error("Host {} reports Tower version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, + other_inst.version, + inst.hostname, + inst.version)) + stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact']) + @task(bind=True, queue='default') diff --git a/awx/main/utils/reload.py b/awx/main/utils/reload.py index 729a33a703..6a88062ab5 100644 --- a/awx/main/utils/reload.py +++ b/awx/main/utils/reload.py @@ -14,12 +14,12 @@ from celery import current_app logger = logging.getLogger('awx.main.utils.reload') -def _uwsgi_reload(): +def _uwsgi_fifo_command(uwsgi_command): # http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands logger.warn('Initiating uWSGI chain reload of server') - TRIGGER_CHAIN_RELOAD = 'c' + TRIGGER_COMMAND = uwsgi_command with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo: - awxfifo.write(TRIGGER_CHAIN_RELOAD) + awxfifo.write(TRIGGER_COMMAND) def _reset_celery_thread_pool(): @@ -29,7 +29,7 @@ def _reset_celery_thread_pool(): destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False) -def _supervisor_service_restart(service_internal_names): +def _supervisor_service_command(service_internal_names, command): ''' Service internal name options: - beat - celery - callback - channels - uwsgi - daphne @@ -46,7 +46,7 @@ def _supervisor_service_restart(service_internal_names): for n in service_internal_names: if n in name_translation_dict: programs.append('{}:{}'.format(group_name, name_translation_dict[n])) - args.extend(['restart']) + args.extend([command]) args.extend(programs) logger.debug('Issuing command to restart services, args={}'.format(args)) subprocess.Popen(args) @@ -55,14 +55,18 @@ def _supervisor_service_restart(service_internal_names): def restart_local_services(service_internal_names): logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names)) if 'uwsgi' in service_internal_names: - _uwsgi_reload() + _uwsgi_fifo_command(uwsgi_command='c') service_internal_names.remove('uwsgi') restart_celery = False if 'celery' in service_internal_names: restart_celery = True service_internal_names.remove('celery') - _supervisor_service_restart(service_internal_names) + _supervisor_service_command(service_internal_names, command='restart') if restart_celery: # Celery restarted last because this probably includes current process _reset_celery_thread_pool() + +def stop_local_services(service_internal_names): + logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names)) + _supervisor_service_command(service_internal_names, command='stop')