From ffe5a92eb9bf34f4355ee47fd3e3f330983fc9c4 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 13 Feb 2018 16:35:33 -0500 Subject: [PATCH] Update isolated instance capacity calculaltion --- awx/main/expect/isolated_manager.py | 14 ++++++---- awx/main/utils/common.py | 12 ++++++-- awx/plugins/isolated/awx_capacity.py | 41 ++++++++++++++++++++-------- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 8ed03e9e44..71ce262fef 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -14,7 +14,7 @@ from django.conf import settings import awx from awx.main.expect import run -from awx.main.utils import OutputEventFilter +from awx.main.utils import OutputEventFilter, get_system_task_capacity from awx.main.queue import CallbackQueueDispatcher logger = logging.getLogger('awx.isolated.manager') @@ -381,10 +381,14 @@ class IsolatedManager(object): logger.error(err_template.format(instance.hostname, instance.version, awx_application_version)) instance.capacity = 0 else: - if instance.capacity == 0 and task_result['capacity']: + if instance.capacity == 0 and task_result['capacity_cpu']: logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname)) - instance.capacity = int(task_result['capacity']) - instance.save(update_fields=['capacity', 'version', 'modified']) + instance.cpu_capacity = int(task_result['capacity_cpu']) + instance.mem_capacity = int(task_result['capacity_mem']) + instance.capacity = get_system_task_capacity(scale=instance.capacity_adjustment, + cpu_capacity=int(task_result['capacity_cpu']), + mem_capacity=int(task_result['capacity_mem'])) + instance.save(update_fields=['cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified']) @classmethod def health_check(cls, instance_qs, awx_application_version): @@ -428,7 +432,7 @@ class IsolatedManager(object): task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] except (KeyError, IndexError): task_result = {} - if 'capacity' in task_result: + if 'capacity_cpu' in task_result and 'capacity_mem' in task_result: cls.update_capacity(instance, task_result, awx_application_version) elif instance.capacity == 0: logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format( diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 47d1c424b4..2bda33956f 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -664,7 +664,7 @@ def get_mem_capacity(): return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem)) -def get_system_task_capacity(scale=Decimal(1.0)): +def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None): ''' Measure system memory and use it as a baseline for determining the system's capacity ''' @@ -677,8 +677,14 @@ def get_system_task_capacity(scale=Decimal(1.0)): elif settings_forks: return int(settings_forks) - _, cpu_cap = get_cpu_capacity() - _, mem_cap = get_mem_capacity() + if cpu_capacity is None: + _, cpu_cap = get_cpu_capacity() + else: + cpu_cap = cpu_capacity + if mem_capacity is None: + _, mem_cap = get_mem_capacity() + else: + mem_cap = mem_capacity return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale) diff --git a/awx/plugins/isolated/awx_capacity.py b/awx/plugins/isolated/awx_capacity.py index cf370fb56e..e4f8cc46dc 100644 --- a/awx/plugins/isolated/awx_capacity.py +++ b/awx/plugins/isolated/awx_capacity.py @@ -18,6 +18,30 @@ from ansible.module_utils.basic import AnsibleModule import subprocess +import os +import psutil + + +def get_cpu_capacity(): + env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) + cpu = psutil.cpu_count() + + if env_forkcpu: + forkcpu = int(env_forkcpu) + else: + forkcpu = 4 + return (cpu, cpu * forkcpu) + + +def get_mem_capacity(): + env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + if env_forkmem: + forkmem = int(env_forkmem) + else: + forkmem = 100 + + mem = psutil.virtual_memory().total + return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem)) def main(): @@ -32,20 +56,13 @@ def main(): except subprocess.CalledProcessError as e: module.fail_json(msg=str(e)) return - # Duplicated with awx.main.utils.common.get_system_task_capacity - try: - out = subprocess.check_output(['free', '-m']) - except subprocess.CalledProcessError as e: - module.fail_json(msg=str(e)) - return - total_mem_value = out.split()[7] - if int(total_mem_value) <= 2048: - cap = 50 - else: - cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75 + # NOTE: Duplicated with awx.main.utils.common capacity utilities + _, capacity_cpu = get_cpu_capacity() + _, capacity_mem = get_mem_capacity() # Module never results in a change - module.exit_json(changed=False, capacity=cap, version=version) + module.exit_json(changed=False, capacity_cpu=capacity_cpu, + capacity_mem=capacity_mem, version=version) if __name__ == '__main__':