mirror of
https://github.com/ansible/awx.git
synced 2026-03-09 05:29:26 -02:30
Merge pull request #1243 from matburt/fix_clustering_isolated
Fix isolated instance clustering implementation
This commit is contained in:
@@ -14,7 +14,7 @@ from django.conf import settings
|
|||||||
|
|
||||||
import awx
|
import awx
|
||||||
from awx.main.expect import run
|
from awx.main.expect import run
|
||||||
from awx.main.utils import OutputEventFilter
|
from awx.main.utils import OutputEventFilter, get_system_task_capacity
|
||||||
from awx.main.queue import CallbackQueueDispatcher
|
from awx.main.queue import CallbackQueueDispatcher
|
||||||
|
|
||||||
logger = logging.getLogger('awx.isolated.manager')
|
logger = logging.getLogger('awx.isolated.manager')
|
||||||
@@ -381,10 +381,14 @@ class IsolatedManager(object):
|
|||||||
logger.error(err_template.format(instance.hostname, instance.version, awx_application_version))
|
logger.error(err_template.format(instance.hostname, instance.version, awx_application_version))
|
||||||
instance.capacity = 0
|
instance.capacity = 0
|
||||||
else:
|
else:
|
||||||
if instance.capacity == 0 and task_result['capacity']:
|
if instance.capacity == 0 and task_result['capacity_cpu']:
|
||||||
logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
|
logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
|
||||||
instance.capacity = int(task_result['capacity'])
|
instance.cpu_capacity = int(task_result['capacity_cpu'])
|
||||||
instance.save(update_fields=['capacity', 'version', 'modified'])
|
instance.mem_capacity = int(task_result['capacity_mem'])
|
||||||
|
instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment,
|
||||||
|
cpu_capacity=int(task_result['capacity_cpu']),
|
||||||
|
mem_capacity=int(task_result['capacity_mem']))
|
||||||
|
instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified'])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def health_check(cls, instance_qs, awx_application_version):
|
def health_check(cls, instance_qs, awx_application_version):
|
||||||
@@ -428,7 +432,7 @@ class IsolatedManager(object):
|
|||||||
task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname]
|
task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname]
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
task_result = {}
|
task_result = {}
|
||||||
if 'capacity' in task_result:
|
if 'capacity_cpu' in task_result and 'capacity_mem' in task_result:
|
||||||
cls.update_capacity(instance, task_result, awx_application_version)
|
cls.update_capacity(instance, task_result, awx_application_version)
|
||||||
elif instance.capacity == 0:
|
elif instance.capacity == 0:
|
||||||
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
|
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(
|
||||||
|
|||||||
@@ -208,20 +208,22 @@ def handle_ha_toplogy_changes(self):
|
|||||||
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
|
logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname))
|
||||||
awx_app = Celery('awx')
|
awx_app = Celery('awx')
|
||||||
awx_app.config_from_object('django.conf:settings', namespace='CELERY')
|
awx_app.config_from_object('django.conf:settings', namespace='CELERY')
|
||||||
(instance, removed_queues, added_queues) = register_celery_worker_queues(awx_app, self.request.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname)
|
||||||
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
|
for instance in instances:
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}"
|
||||||
updated_routes = update_celery_worker_routes(instance, settings)
|
.format(instance.hostname, removed_queues, added_queues))
|
||||||
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
|
updated_routes = update_celery_worker_routes(instance, settings)
|
||||||
.format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES))
|
logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}"
|
||||||
|
.format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES))
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
@worker_ready.connect
|
||||||
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
def handle_ha_toplogy_worker_ready(sender, **kwargs):
|
||||||
logger.debug("Configure celeryd queues task on host {}".format(sender.hostname))
|
logger.debug("Configure celeryd queues task on host {}".format(sender.hostname))
|
||||||
(instance, removed_queues, added_queues) = register_celery_worker_queues(sender.app, sender.hostname)
|
instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname)
|
||||||
logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}"
|
for instance in instances:
|
||||||
.format(instance.hostname, removed_queues, added_queues))
|
logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}"
|
||||||
|
.format(instance.hostname, removed_queues, added_queues))
|
||||||
|
|
||||||
|
|
||||||
@celeryd_init.connect
|
@celeryd_init.connect
|
||||||
|
|||||||
@@ -136,7 +136,8 @@ class TestIsolatedManagementTask:
|
|||||||
|
|
||||||
# Upgrade was completed, health check playbook now reports matching
|
# Upgrade was completed, health check playbook now reports matching
|
||||||
# version, make sure capacity is set.
|
# version, make sure capacity is set.
|
||||||
update_capacity(old_version, {'version': '5.0.0-things', 'capacity':103}, '5.0.0-stuff')
|
update_capacity(old_version, {'version': '5.0.0-things',
|
||||||
|
'capacity_cpu':103, 'capacity_mem':103}, '5.0.0-stuff')
|
||||||
assert old_version.capacity == 103
|
assert old_version.capacity == 103
|
||||||
|
|
||||||
def test_takes_action(self, control_instance, needs_updating):
|
def test_takes_action(self, control_instance, needs_updating):
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ class TestAddRemoveCeleryWorkerQueues():
|
|||||||
instance = instance_generator(groups=groups, hostname=hostname)
|
instance = instance_generator(groups=groups, hostname=hostname)
|
||||||
worker_queues = worker_queues_generator(_worker_queues)
|
worker_queues = worker_queues_generator(_worker_queues)
|
||||||
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):
|
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):
|
||||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname)
|
(added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname)
|
||||||
assert set(added_queues) == set(added_expected)
|
assert set(added_queues) == set(added_expected)
|
||||||
assert set(removed_queues) == set(removed_expected)
|
assert set(removed_queues) == set(removed_expected)
|
||||||
|
|
||||||
|
|||||||
@@ -665,7 +665,7 @@ def get_mem_capacity():
|
|||||||
return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem))
|
return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem))
|
||||||
|
|
||||||
|
|
||||||
def get_system_task_capacity(scale=Decimal(1.0)):
|
def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None):
|
||||||
'''
|
'''
|
||||||
Measure system memory and use it as a baseline for determining the system's capacity
|
Measure system memory and use it as a baseline for determining the system's capacity
|
||||||
'''
|
'''
|
||||||
@@ -678,8 +678,14 @@ def get_system_task_capacity(scale=Decimal(1.0)):
|
|||||||
elif settings_forks:
|
elif settings_forks:
|
||||||
return int(settings_forks)
|
return int(settings_forks)
|
||||||
|
|
||||||
_, cpu_cap = get_cpu_capacity()
|
if cpu_capacity is None:
|
||||||
_, mem_cap = get_mem_capacity()
|
_, cpu_cap = get_cpu_capacity()
|
||||||
|
else:
|
||||||
|
cpu_cap = cpu_capacity
|
||||||
|
if mem_capacity is None:
|
||||||
|
_, mem_cap = get_mem_capacity()
|
||||||
|
else:
|
||||||
|
mem_cap = mem_capacity
|
||||||
return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale)
|
return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,26 +10,27 @@ from django.conf import settings
|
|||||||
from awx.main.models import Instance
|
from awx.main.models import Instance
|
||||||
|
|
||||||
|
|
||||||
def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
|
def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name):
|
||||||
removed_queues = []
|
removed_queues = []
|
||||||
added_queues = []
|
added_queues = []
|
||||||
ig_names = set(instance.rampart_groups.values_list('name', flat=True))
|
ig_names = set(['tower_instance_router'])
|
||||||
ig_names.add("tower_instance_router")
|
hostnames = set([instance.hostname for instance in controlled_instances])
|
||||||
|
for instance in controlled_instances:
|
||||||
|
ig_names.update(instance.rampart_groups.values_list('name', flat=True))
|
||||||
worker_queue_names = set([q['name'] for q in worker_queues])
|
worker_queue_names = set([q['name'] for q in worker_queues])
|
||||||
|
|
||||||
|
|
||||||
# Remove queues that aren't in the instance group
|
# Remove queues that aren't in the instance group
|
||||||
for queue in worker_queues:
|
for queue in worker_queues:
|
||||||
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
|
if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \
|
||||||
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
|
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if queue['name'] not in ig_names | set([instance.hostname]) or not instance.enabled:
|
if queue['name'] not in ig_names | hostnames or not instance.enabled:
|
||||||
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
|
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
|
||||||
removed_queues.append(queue['name'])
|
removed_queues.append(queue['name'])
|
||||||
|
|
||||||
# Add queues for instance and instance groups
|
# Add queues for instance and instance groups
|
||||||
for queue_name in ig_names | set([instance.hostname]):
|
for queue_name in ig_names | hostnames:
|
||||||
if queue_name not in worker_queue_names:
|
if queue_name not in worker_queue_names:
|
||||||
app.control.add_consumer(queue_name, reply=True, destination=[worker_name])
|
app.control.add_consumer(queue_name, reply=True, destination=[worker_name])
|
||||||
added_queues.append(queue_name)
|
added_queues.append(queue_name)
|
||||||
@@ -59,13 +60,19 @@ def update_celery_worker_routes(instance, conf):
|
|||||||
|
|
||||||
def register_celery_worker_queues(app, celery_worker_name):
|
def register_celery_worker_queues(app, celery_worker_name):
|
||||||
instance = Instance.objects.me()
|
instance = Instance.objects.me()
|
||||||
|
controlled_instances = [instance]
|
||||||
|
if instance.is_controller():
|
||||||
|
controlled_instances.extend(Instance.objects.filter(
|
||||||
|
rampart_groups__controller__instances__hostname=instance.hostname
|
||||||
|
))
|
||||||
added_queues = []
|
added_queues = []
|
||||||
removed_queues = []
|
removed_queues = []
|
||||||
|
|
||||||
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
celery_host_queues = app.control.inspect([celery_worker_name]).active_queues()
|
||||||
|
|
||||||
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else []
|
||||||
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name)
|
(added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances,
|
||||||
|
celery_worker_queues, celery_worker_name)
|
||||||
|
|
||||||
return (instance, removed_queues, added_queues)
|
return (controlled_instances, removed_queues, added_queues)
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,30 @@
|
|||||||
from ansible.module_utils.basic import AnsibleModule
|
from ansible.module_utils.basic import AnsibleModule
|
||||||
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import os
|
||||||
|
import psutil
|
||||||
|
|
||||||
|
|
||||||
|
def get_cpu_capacity():
|
||||||
|
env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None)
|
||||||
|
cpu = psutil.cpu_count()
|
||||||
|
|
||||||
|
if env_forkcpu:
|
||||||
|
forkcpu = int(env_forkcpu)
|
||||||
|
else:
|
||||||
|
forkcpu = 4
|
||||||
|
return (cpu, cpu * forkcpu)
|
||||||
|
|
||||||
|
|
||||||
|
def get_mem_capacity():
|
||||||
|
env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None)
|
||||||
|
if env_forkmem:
|
||||||
|
forkmem = int(env_forkmem)
|
||||||
|
else:
|
||||||
|
forkmem = 100
|
||||||
|
|
||||||
|
mem = psutil.virtual_memory().total
|
||||||
|
return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@@ -32,20 +56,13 @@ def main():
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
module.fail_json(msg=str(e))
|
module.fail_json(msg=str(e))
|
||||||
return
|
return
|
||||||
# Duplicated with awx.main.utils.common.get_system_task_capacity
|
# NOTE: Duplicated with awx.main.utils.common capacity utilities
|
||||||
try:
|
_, capacity_cpu = get_cpu_capacity()
|
||||||
out = subprocess.check_output(['free', '-m'])
|
_, capacity_mem = get_mem_capacity()
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
module.fail_json(msg=str(e))
|
|
||||||
return
|
|
||||||
total_mem_value = out.split()[7]
|
|
||||||
if int(total_mem_value) <= 2048:
|
|
||||||
cap = 50
|
|
||||||
else:
|
|
||||||
cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75
|
|
||||||
|
|
||||||
# Module never results in a change
|
# Module never results in a change
|
||||||
module.exit_json(changed=False, capacity=cap, version=version)
|
module.exit_json(changed=False, capacity_cpu=capacity_cpu,
|
||||||
|
capacity_mem=capacity_mem, version=version)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ requirements/requirements_dev_uninstall.txt \
|
|||||||
RUN yum -y update && yum -y install curl epel-release && yum -y install https://centos7.iuscommunity.org/ius-release.rpm
|
RUN yum -y update && yum -y install curl epel-release && yum -y install https://centos7.iuscommunity.org/ius-release.rpm
|
||||||
RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash -
|
RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash -
|
||||||
RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm
|
RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm
|
||||||
RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git2u-core mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nginx nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel rabbitmq-server bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 python-crypto
|
RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git2u-core mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nginx nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel rabbitmq-server bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 python-crypto rsync
|
||||||
RUN pip install virtualenv
|
RUN pip install virtualenv
|
||||||
RUN /usr/bin/ssh-keygen -q -t rsa -N "" -f /root/.ssh/id_rsa
|
RUN /usr/bin/ssh-keygen -q -t rsa -N "" -f /root/.ssh/id_rsa
|
||||||
RUN mkdir -p /data/db
|
RUN mkdir -p /data/db
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ ADD Makefile /tmp/Makefile
|
|||||||
RUN mkdir /tmp/requirements
|
RUN mkdir /tmp/requirements
|
||||||
ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/
|
ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/
|
||||||
RUN yum -y update && yum -y install curl epel-release
|
RUN yum -y update && yum -y install curl epel-release
|
||||||
RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash -
|
|
||||||
RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm
|
|
||||||
RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2
|
RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2
|
||||||
RUN pip install virtualenv
|
RUN pip install virtualenv
|
||||||
WORKDIR /tmp
|
WORKDIR /tmp
|
||||||
|
|||||||
Reference in New Issue
Block a user