diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 8ed03e9e44..71ce262fef 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -14,7 +14,7 @@ from django.conf import settings import awx from awx.main.expect import run -from awx.main.utils import OutputEventFilter +from awx.main.utils import OutputEventFilter, get_system_task_capacity from awx.main.queue import CallbackQueueDispatcher logger = logging.getLogger('awx.isolated.manager') @@ -381,10 +381,14 @@ class IsolatedManager(object): logger.error(err_template.format(instance.hostname, instance.version, awx_application_version)) instance.capacity = 0 else: - if instance.capacity == 0 and task_result['capacity']: + if instance.capacity == 0 and task_result['capacity_cpu']: logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname)) - instance.capacity = int(task_result['capacity']) - instance.save(update_fields=['capacity', 'version', 'modified']) + instance.cpu_capacity = int(task_result['capacity_cpu']) + instance.mem_capacity = int(task_result['capacity_mem']) + instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment, + cpu_capacity=int(task_result['capacity_cpu']), + mem_capacity=int(task_result['capacity_mem'])) + instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) @classmethod def health_check(cls, instance_qs, awx_application_version): @@ -428,7 +432,7 @@ class IsolatedManager(object): task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] except (KeyError, IndexError): task_result = {} - if 'capacity' in task_result: + if 'capacity_cpu' in task_result and 'capacity_mem' in task_result: cls.update_capacity(instance, task_result, awx_application_version) elif instance.capacity == 0: logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 02da90efc4..e8c4001b67 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -208,20 +208,22 @@ def handle_ha_toplogy_changes(self): logger.debug("Reconfigure celeryd queues task on host {}".format(self.request.hostname)) awx_app = Celery('awx') awx_app.config_from_object('django.conf:settings', namespace='CELERY') - (instance, removed_queues, added_queues) = register_celery_worker_queues(awx_app, self.request.hostname) - logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}" - .format(instance.hostname, removed_queues, added_queues)) - updated_routes = update_celery_worker_routes(instance, settings) - logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}" - .format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES)) + instances, removed_queues, added_queues = register_celery_worker_queues(awx_app, self.request.hostname) + for instance in instances: + logger.info("Workers on tower node '{}' removed from queues {} and added to queues {}" + .format(instance.hostname, removed_queues, added_queues)) + updated_routes = update_celery_worker_routes(instance, settings) + logger.info("Worker on tower node '{}' updated celery routes {} all routes are now {}" + .format(instance.hostname, updated_routes, self.app.conf.CELERY_TASK_ROUTES)) @worker_ready.connect def handle_ha_toplogy_worker_ready(sender, **kwargs): logger.debug("Configure celeryd queues task on host {}".format(sender.hostname)) - (instance, removed_queues, added_queues) = register_celery_worker_queues(sender.app, sender.hostname) - logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}" - .format(instance.hostname, removed_queues, added_queues)) + instances, removed_queues, added_queues = register_celery_worker_queues(sender.app, sender.hostname) + for instance in instances: + logger.info("Workers on tower node '{}' unsubscribed from queues {} and subscribed to queues {}" + .format(instance.hostname, removed_queues, added_queues)) @celeryd_init.connect diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index 2cfdcae849..84d93534c1 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -136,7 +136,8 @@ class TestIsolatedManagementTask: # Upgrade was completed, health check playbook now reports matching # version, make sure capacity is set. - update_capacity(old_version, {'version': '5.0.0-things', 'capacity':103}, '5.0.0-stuff') + update_capacity(old_version, {'version': '5.0.0-things', + 'capacity_cpu':103, 'capacity_mem':103}, '5.0.0-stuff') assert old_version.capacity == 103 def test_takes_action(self, control_instance, needs_updating): diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index 3dd9adfc35..8e3281e1f8 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -64,7 +64,7 @@ class TestAddRemoveCeleryWorkerQueues(): instance = instance_generator(groups=groups, hostname=hostname) worker_queues = worker_queues_generator(_worker_queues) with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues): - (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, instance, worker_queues, hostname) + (added_queues, removed_queues) = _add_remove_celery_worker_queues(mock_app, [instance], worker_queues, hostname) assert set(added_queues) == set(added_expected) assert set(removed_queues) == set(removed_expected) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 128b959d17..7572eb3f1c 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -665,7 +665,7 @@ def get_mem_capacity(): return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem)) -def get_system_task_capacity(scale=Decimal(1.0)): +def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None): ''' Measure system memory and use it as a baseline for determining the system's capacity ''' @@ -678,8 +678,14 @@ def get_system_task_capacity(scale=Decimal(1.0)): elif settings_forks: return int(settings_forks) - _, cpu_cap = get_cpu_capacity() - _, mem_cap = get_mem_capacity() + if cpu_capacity is None: + _, cpu_cap = get_cpu_capacity() + else: + cpu_cap = cpu_capacity + if mem_capacity is None: + _, mem_cap = get_mem_capacity() + else: + mem_cap = mem_capacity return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale) diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index bb3a0a73cc..6acf3d7067 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -10,26 +10,27 @@ from django.conf import settings from awx.main.models import Instance -def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): +def _add_remove_celery_worker_queues(app, controlled_instances, worker_queues, worker_name): removed_queues = [] added_queues = [] - ig_names = set(instance.rampart_groups.values_list('name', flat=True)) - ig_names.add("tower_instance_router") + ig_names = set(['tower_instance_router']) + hostnames = set([instance.hostname for instance in controlled_instances]) + for instance in controlled_instances: + ig_names.update(instance.rampart_groups.values_list('name', flat=True)) worker_queue_names = set([q['name'] for q in worker_queues]) - # Remove queues that aren't in the instance group for queue in worker_queues: if queue['name'] in settings.AWX_CELERY_QUEUES_STATIC or \ queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC: continue - if queue['name'] not in ig_names | set([instance.hostname]) or not instance.enabled: + if queue['name'] not in ig_names | hostnames or not instance.enabled: app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name]) removed_queues.append(queue['name']) # Add queues for instance and instance groups - for queue_name in ig_names | set([instance.hostname]): + for queue_name in ig_names | hostnames: if queue_name not in worker_queue_names: app.control.add_consumer(queue_name, reply=True, destination=[worker_name]) added_queues.append(queue_name) @@ -59,13 +60,19 @@ def update_celery_worker_routes(instance, conf): def register_celery_worker_queues(app, celery_worker_name): instance = Instance.objects.me() + controlled_instances = [instance] + if instance.is_controller(): + controlled_instances.extend(Instance.objects.filter( + rampart_groups__controller__instances__hostname=instance.hostname + )) added_queues = [] removed_queues = [] celery_host_queues = app.control.inspect([celery_worker_name]).active_queues() celery_worker_queues = celery_host_queues[celery_worker_name] if celery_host_queues else [] - (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, instance, celery_worker_queues, celery_worker_name) + (added_queues, removed_queues) = _add_remove_celery_worker_queues(app, controlled_instances, + celery_worker_queues, celery_worker_name) - return (instance, removed_queues, added_queues) + return (controlled_instances, removed_queues, added_queues) diff --git a/awx/plugins/isolated/awx_capacity.py b/awx/plugins/isolated/awx_capacity.py index cf370fb56e..e4f8cc46dc 100644 --- a/awx/plugins/isolated/awx_capacity.py +++ b/awx/plugins/isolated/awx_capacity.py @@ -18,6 +18,30 @@ from ansible.module_utils.basic import AnsibleModule import subprocess +import os +import psutil + + +def get_cpu_capacity(): + env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) + cpu = psutil.cpu_count() + + if env_forkcpu: + forkcpu = int(env_forkcpu) + else: + forkcpu = 4 + return (cpu, cpu * forkcpu) + + +def get_mem_capacity(): + env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + if env_forkmem: + forkmem = int(env_forkmem) + else: + forkmem = 100 + + mem = psutil.virtual_memory().total + return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem)) def main(): @@ -32,20 +56,13 @@ def main(): except subprocess.CalledProcessError as e: module.fail_json(msg=str(e)) return - # Duplicated with awx.main.utils.common.get_system_task_capacity - try: - out = subprocess.check_output(['free', '-m']) - except subprocess.CalledProcessError as e: - module.fail_json(msg=str(e)) - return - total_mem_value = out.split()[7] - if int(total_mem_value) <= 2048: - cap = 50 - else: - cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75 + # NOTE: Duplicated with awx.main.utils.common capacity utilities + _, capacity_cpu = get_cpu_capacity() + _, capacity_mem = get_mem_capacity() # Module never results in a change - module.exit_json(changed=False, capacity=cap, version=version) + module.exit_json(changed=False, capacity_cpu=capacity_cpu, + capacity_mem=capacity_mem, version=version) if __name__ == '__main__': diff --git a/tools/docker-compose/Dockerfile b/tools/docker-compose/Dockerfile index 0591fb0e62..a513301b4d 100644 --- a/tools/docker-compose/Dockerfile +++ b/tools/docker-compose/Dockerfile @@ -14,7 +14,7 @@ requirements/requirements_dev_uninstall.txt \ RUN yum -y update && yum -y install curl epel-release && yum -y install https://centos7.iuscommunity.org/ius-release.rpm RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm -RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git2u-core mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nginx nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel rabbitmq-server bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 python-crypto +RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git2u-core mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nginx nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel rabbitmq-server bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 python-crypto rsync RUN pip install virtualenv RUN /usr/bin/ssh-keygen -q -t rsa -N "" -f /root/.ssh/id_rsa RUN mkdir -p /data/db diff --git a/tools/docker-isolated/Dockerfile b/tools/docker-isolated/Dockerfile index a7313e00b9..69af7526cc 100644 --- a/tools/docker-isolated/Dockerfile +++ b/tools/docker-isolated/Dockerfile @@ -5,8 +5,6 @@ ADD Makefile /tmp/Makefile RUN mkdir /tmp/requirements ADD requirements/requirements_ansible.txt requirements/requirements_ansible_git.txt requirements/requirements_ansible_uninstall.txt requirements/requirements_isolated.txt /tmp/requirements/ RUN yum -y update && yum -y install curl epel-release -RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - -RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git python-devel python-psycopg2 make python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel bubblewrap zanata-python-client gettext gcc-c++ libcurl-devel python-pycurl bzip2 RUN pip install virtualenv WORKDIR /tmp