From 799968460d4794bcd9959f57a2b97846b9a00bb7 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Tue, 15 Feb 2022 14:08:24 -0500 Subject: [PATCH] Fixup conversion of memory and cpu settings to support k8s resource request format (#11725) fix memory and cpu settings to suport k8s resource request format * fix conversion of memory setting to bytes This setting has not been getting set by default, and needed some fixing up to be compatible with setting the memory in the same way as we set it in the operator, as well as with other changes from last year which assume that ansible runner is returning memory in bytes. This way we can start setting this setting in the operator, and get a more accurate reflection of how much memory is available to the control pod in k8s. On platforms where services are all sharing memory, we deduct a penalty from the memory available. On k8s we don't need to do this because the web, redis, and task containers each have memory allocated to them. * Support CPU setting expressed in units used by k8s This setting has not been getting set by default, and needed some fixing up to be compatible with setting the CPU resource request/limits in the same way as we set it in the resource requests/limits. This way we can start setting this setting in the operator, and get a more accurate reflection of how much cpu is available to the control pod in k8s. Because cpu on k8s can be partial cores, migrate cpu field to decimal. k8s does not allow granularity of less than 100m (equivalent to 0.1 cores), so only store up to 1 decimal place. fix analytics to deal with decimal cpu need to use DjangoJSONEncoder when Decimal fields in data passed to json.dumps --- awx/main/analytics/collectors.py | 2 +- awx/main/analytics/core.py | 4 +- awx/main/dispatch/pool.py | 4 +- .../0158_make_instance_cpu_decimal.py | 19 +++ awx/main/models/ha.py | 6 +- .../settings/test_k8s_resource_setttings.py | 61 +++++++ awx/main/utils/common.py | 151 +++++++++++++----- 7 files changed, 204 insertions(+), 43 deletions(-) create mode 100644 awx/main/migrations/0158_make_instance_cpu_decimal.py create mode 100644 awx/main/tests/unit/settings/test_k8s_resource_setttings.py diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 9fd8005a91..f8456ca2f1 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -211,7 +211,7 @@ def projects_by_scm_type(since, **kwargs): return counts -@register('instance_info', '1.1', description=_('Cluster topology and capacity')) +@register('instance_info', '1.2', description=_('Cluster topology and capacity')) def instance_info(since, include_hostnames=False, **kwargs): info = {} instances = models.Instance.objects.values_list('hostname').values( diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index d63afdfbf3..6aa2f5090e 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -90,7 +90,7 @@ def package(target, data, timestamp): if isinstance(item, str): f.add(item, arcname=f'./{name}') else: - buf = json.dumps(item).encode('utf-8') + buf = json.dumps(item, cls=DjangoJSONEncoder).encode('utf-8') info = tarfile.TarInfo(f'./{name}') info.size = len(buf) info.mtime = timestamp.timestamp() @@ -230,7 +230,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti try: last_entry = max(last_entries.get(key) or last_gather, until - timedelta(weeks=4)) results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__) - json.dumps(results) # throwaway check to see if the data is json-serializable + json.dumps(results, cls=DjangoJSONEncoder) # throwaway check to see if the data is json-serializable data[filename] = results except Exception: logger.exception("Could not generate metric {}".format(filename)) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 97e2fa630a..3d08ca3fd7 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,6 +22,7 @@ import psutil from awx.main.models import UnifiedJob from awx.main.dispatch import reaper +from awx.main.utils.common import convert_mem_str_to_bytes if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -319,7 +320,8 @@ class AutoscalePool(WorkerPool): if self.max_workers is None: settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) if settings_absmem is not None: - total_memory_gb = int(settings_absmem) + # There are 1073741824 bytes in a gigabyte. Convert bytes to gigabytes by dividing by 2**30 + total_memory_gb = convert_mem_str_to_bytes(settings_absmem) // 2**30 else: total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up # 5 workers per GB of total memory diff --git a/awx/main/migrations/0158_make_instance_cpu_decimal.py b/awx/main/migrations/0158_make_instance_cpu_decimal.py new file mode 100644 index 0000000000..b78ff1b754 --- /dev/null +++ b/awx/main/migrations/0158_make_instance_cpu_decimal.py @@ -0,0 +1,19 @@ +# Generated by Django 2.2.24 on 2022-02-14 17:37 + +from decimal import Decimal +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0157_inventory_labels'), + ] + + operations = [ + migrations.AlterField( + model_name='instance', + name='cpu', + field=models.DecimalField(decimal_places=1, default=Decimal('0'), editable=False, max_digits=4), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 43c1567119..add2564015 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -82,8 +82,10 @@ class Instance(HasPolicyEditsMixin, BaseModel): modified = models.DateTimeField(auto_now=True) # Fields defined in health check or heartbeat version = models.CharField(max_length=120, blank=True) - cpu = models.IntegerField( - default=0, + cpu = models.DecimalField( + default=Decimal(0.0), + max_digits=4, + decimal_places=1, editable=False, ) memory = models.BigIntegerField( diff --git a/awx/main/tests/unit/settings/test_k8s_resource_setttings.py b/awx/main/tests/unit/settings/test_k8s_resource_setttings.py new file mode 100644 index 0000000000..a2899a8561 --- /dev/null +++ b/awx/main/tests/unit/settings/test_k8s_resource_setttings.py @@ -0,0 +1,61 @@ +import pytest + +from unittest import mock + +from awx.main.utils.common import ( + convert_mem_str_to_bytes, + get_mem_effective_capacity, + get_corrected_memory, + convert_cpu_str_to_decimal_cpu, + get_cpu_effective_capacity, + get_corrected_cpu, +) + + +@pytest.mark.parametrize( + "value,converted_value,mem_capacity", + [ + ('2G', 2000000000, 19), + ('4G', 4000000000, 38), + ('2Gi', 2147483648, 20), + ('2.1G', 1, 1), # expressing memory with non-integers is not supported, and we'll fall back to 1 fork for memory capacity. + ('4Gi', 4294967296, 40), + ('2M', 2000000, 1), + ('3M', 3000000, 1), + ('2Mi', 2097152, 1), + ('2048Mi', 2147483648, 20), + ('4096Mi', 4294967296, 40), + ('64G', 64000000000, 610), + ('64Garbage', 1, 1), + ], +) +def test_SYSTEM_TASK_ABS_MEM_conversion(value, converted_value, mem_capacity): + with mock.patch('django.conf.settings') as mock_settings: + mock_settings.SYSTEM_TASK_ABS_MEM = value + mock_settings.SYSTEM_TASK_FORKS_MEM = 100 + mock_settings.IS_K8S = True + assert convert_mem_str_to_bytes(value) == converted_value + assert get_corrected_memory(-1) == converted_value + assert get_mem_effective_capacity(-1) == mem_capacity + + +@pytest.mark.parametrize( + "value,converted_value,cpu_capacity", + [ + ('2', 2.0, 8), + ('1.5', 1.5, 6), + ('100m', 0.1, 1), + ('2000m', 2.0, 8), + ('4MillionCPUm', 1.0, 4), # Any suffix other than 'm' is not supported, we fall back to 1 CPU + ('Random', 1.0, 4), # Any setting value other than integers, floats millicores (e.g 1, 1.0, or 1000m) is not supported, fall back to 1 CPU + ('2505m', 2.5, 10), + ('1.55', 1.6, 6), + ], +) +def test_SYSTEM_TASK_ABS_CPU_conversion(value, converted_value, cpu_capacity): + with mock.patch('django.conf.settings') as mock_settings: + mock_settings.SYSTEM_TASK_ABS_CPU = value + mock_settings.SYSTEM_TASK_FORKS_CPU = 4 + assert convert_cpu_str_to_decimal_cpu(value) == converted_value + assert get_corrected_cpu(-1) == converted_value + assert get_cpu_effective_capacity(-1) == cpu_capacity diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 45f3ae66c6..49885d70c7 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -692,28 +692,33 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -def get_cpu_effective_capacity(cpu_count): - from django.conf import settings +def convert_cpu_str_to_decimal_cpu(cpu_str): + """Convert a string indicating cpu units to decimal. - settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) - env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None) + Useful for dealing with cpu setting that may be expressed in units compatible with + kubernetes. - if env_abscpu is not None: - return int(env_abscpu) - elif settings_abscpu is not None: - return int(settings_abscpu) + See https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units + """ + cpu = cpu_str + millicores = False - settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) - env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) + if cpu_str[-1] == 'm': + cpu = cpu_str[:-1] + millicores = True - if env_forkcpu: - forkcpu = int(env_forkcpu) - elif settings_forkcpu: - forkcpu = int(settings_forkcpu) - else: - forkcpu = 4 + try: + cpu = float(cpu) + except ValueError: + cpu = 1.0 + millicores = False + logger.warning(f"Could not convert SYSTEM_TASK_ABS_CPU {cpu_str} to a decimal number, falling back to default of 1 cpu") - return cpu_count * forkcpu + if millicores: + cpu = cpu / 1000 + + # Per kubernetes docs, fractional CPU less than .1 are not allowed + return max(0.1, round(cpu, 1)) def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity @@ -725,34 +730,70 @@ def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None) - if env_abscpu is not None or settings_abscpu is not None: - return 0 + if env_abscpu is not None: + return convert_cpu_str_to_decimal_cpu(env_abscpu) + elif settings_abscpu is not None: + return convert_cpu_str_to_decimal_cpu(settings_abscpu) return cpu_count # no correction -def get_mem_effective_capacity(mem_mb): +def get_cpu_effective_capacity(cpu_count): from django.conf import settings - settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) - env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None) + cpu_count = get_corrected_cpu(cpu_count) - if env_absmem is not None: - return int(env_absmem) - elif settings_absmem is not None: - return int(settings_absmem) + settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) + env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) - settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) - env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) - - if env_forkmem: - forkmem = int(env_forkmem) - elif settings_forkmem: - forkmem = int(settings_forkmem) + if env_forkcpu: + forkcpu = int(env_forkcpu) + elif settings_forkcpu: + forkcpu = int(settings_forkcpu) else: - forkmem = 100 + forkcpu = 4 - return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem) + return max(1, int(cpu_count * forkcpu)) + + +def convert_mem_str_to_bytes(mem_str): + """Convert string with suffix indicating units to memory in bytes (base 2) + + Useful for dealing with memory setting that may be expressed in units compatible with + kubernetes. + + See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory + """ + # If there is no suffix, the memory sourced from the request is in bytes + if mem_str.isdigit(): + return int(mem_str) + + conversions = { + 'Ei': lambda x: x * 2**60, + 'E': lambda x: x * 10**18, + 'Pi': lambda x: x * 2**50, + 'P': lambda x: x * 10**15, + 'Ti': lambda x: x * 2**40, + 'T': lambda x: x * 10**12, + 'Gi': lambda x: x * 2**30, + 'G': lambda x: x * 10**9, + 'Mi': lambda x: x * 2**20, + 'M': lambda x: x * 10**6, + 'Ki': lambda x: x * 2**10, + 'K': lambda x: x * 10**3, + } + mem = 0 + mem_unit = None + for i, char in enumerate(mem_str): + if not char.isdigit(): + mem_unit = mem_str[i:] + mem = int(mem_str[:i]) + break + if not mem_unit or mem_unit not in conversions.keys(): + error = f"Unsupported value for SYSTEM_TASK_ABS_MEM: {mem_str}, memory must be expressed in bytes or with known suffix: {conversions.keys()}. Falling back to 1 byte" + logger.warning(error) + return 1 + return max(1, conversions[mem_unit](mem)) def get_corrected_memory(memory): @@ -761,12 +802,48 @@ def get_corrected_memory(memory): settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None) - if env_absmem is not None or settings_absmem is not None: - return 0 + # Runner returns memory in bytes + # so we convert memory from settings to bytes as well. + if env_absmem is not None: + return convert_mem_str_to_bytes(env_absmem) + elif settings_absmem is not None: + return convert_mem_str_to_bytes(settings_absmem) return memory +def get_mem_effective_capacity(mem_bytes): + from django.conf import settings + + mem_bytes = get_corrected_memory(mem_bytes) + + settings_mem_mb_per_fork = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) + env_mem_mb_per_fork = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + + if env_mem_mb_per_fork: + mem_mb_per_fork = int(env_mem_mb_per_fork) + elif settings_mem_mb_per_fork: + mem_mb_per_fork = int(settings_mem_mb_per_fork) + else: + mem_mb_per_fork = 100 + + # Per docs, deduct 2GB of memory from the available memory + # to cover memory consumption of background tasks when redis/web etc are colocated with + # the other control processes + memory_penalty_bytes = 2147483648 + if settings.IS_K8S: + # In k8s, this is dealt with differently because + # redis and the web containers have their own memory allocation + memory_penalty_bytes = 0 + + # convert memory to megabytes because our setting of how much memory we + # should allocate per fork is in megabytes + mem_mb = (mem_bytes - memory_penalty_bytes) // 2**20 + max_forks_based_on_memory = mem_mb // mem_mb_per_fork + + return max(1, max_forks_based_on_memory) + + _inventory_updates = threading.local() _task_manager = threading.local()