From 928c35ede559ce48bd990c6b5097352fa0de3c21 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 17 Aug 2021 11:52:57 -0400 Subject: [PATCH] Model changes for instance last_seen field to replace modified (#10870) * Model changes for instance last_seen field to replace modified * Break up refresh_capacity into smaller units * Rename execution node methods, fix last_seen clustering * Use update_fields to make it clear save only affects capacity * Restructing to pass unit tests * Fix bug where a PATCH did not update capacity value --- awx/api/views/__init__.py | 9 ++ awx/main/managers.py | 2 +- .../migrations/0153_instance_last_seen.py | 27 +++++ awx/main/models/ha.py | 109 +++++++++++++----- awx/main/tasks.py | 95 ++++++++------- .../tests/functional/api/test_instance.py | 32 +++++ awx/main/tests/functional/test_jobs.py | 19 +-- awx/main/tests/unit/models/test_ha.py | 15 ++- awx/main/utils/common.py | 52 +++------ awx/settings/defaults.py | 10 +- .../sources/templates/receptor-awx.conf.j2 | 3 +- 11 files changed, 246 insertions(+), 127 deletions(-) create mode 100644 awx/main/migrations/0153_instance_last_seen.py create mode 100644 awx/main/tests/functional/api/test_instance.py diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8ff25c10f9..2223c590e9 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -370,6 +370,15 @@ class InstanceDetail(RetrieveUpdateAPIView): model = models.Instance serializer_class = serializers.InstanceSerializer + def update(self, request, *args, **kwargs): + r = super(InstanceDetail, self).update(request, *args, **kwargs) + if status.is_success(r.status_code): + obj = self.get_object() + obj.set_capacity_value() + obj.save(update_fields=['capacity']) + r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj) + return r + class InstanceUnifiedJobsList(SubListAPIView): diff --git a/awx/main/managers.py b/awx/main/managers.py index 01fb223df5..05ffb3ecbb 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -105,7 +105,7 @@ class InstanceManager(models.Manager): """Return the currently active instance.""" # If we are running unit tests, return a stub record. if settings.IS_TESTING(sys.argv) or hasattr(sys, '_called_from_test'): - return self.model(id=1, hostname='localhost', uuid='00000000-0000-0000-0000-000000000000') + return self.model(id=1, hostname=settings.CLUSTER_HOST_ID, uuid='00000000-0000-0000-0000-000000000000') node = self.filter(hostname=settings.CLUSTER_HOST_ID) if node.exists(): diff --git a/awx/main/migrations/0153_instance_last_seen.py b/awx/main/migrations/0153_instance_last_seen.py new file mode 100644 index 0000000000..408146cda8 --- /dev/null +++ b/awx/main/migrations/0153_instance_last_seen.py @@ -0,0 +1,27 @@ +# Generated by Django 2.2.20 on 2021-08-12 13:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0152_instance_node_type'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='last_seen', + field=models.DateTimeField( + editable=False, + help_text='Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.', + null=True, + ), + ), + migrations.AlterField( + model_name='instance', + name='memory', + field=models.BigIntegerField(default=0, editable=False, help_text='Total system memory of this instance in bytes.'), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index aaf0b990ec..ebd1ad44b0 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -21,7 +21,7 @@ from awx.main.managers import InstanceManager, InstanceGroupManager from awx.main.fields import JSONField from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search from awx.main.models.unified_jobs import UnifiedJob -from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity +from awx.main.utils.common import measure_cpu, get_corrected_cpu, get_cpu_effective_capacity, measure_memory, get_corrected_memory, get_mem_effective_capacity from awx.main.models.mixins import RelatedJobsMixin __all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState') @@ -52,6 +52,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): objects = InstanceManager() + # Fields set in instance registration uuid = models.CharField(max_length=40) hostname = models.CharField(max_length=250, unique=True) ip_address = models.CharField( @@ -61,16 +62,11 @@ class Instance(HasPolicyEditsMixin, BaseModel): max_length=50, unique=True, ) + # Auto-fields, implementation is different from BaseModel created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) + # Fields defined in health check or heartbeat version = models.CharField(max_length=120, blank=True) - capacity = models.PositiveIntegerField( - default=100, - editable=False, - ) - capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)]) - enabled = models.BooleanField(default=True) - managed_by_policy = models.BooleanField(default=True) cpu = models.IntegerField( default=0, editable=False, @@ -78,7 +74,22 @@ class Instance(HasPolicyEditsMixin, BaseModel): memory = models.BigIntegerField( default=0, editable=False, + help_text=_('Total system memory of this instance in bytes.'), ) + last_seen = models.DateTimeField( + null=True, + editable=False, + help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'), + ) + # Capacity management + capacity = models.PositiveIntegerField( + default=100, + editable=False, + ) + capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)]) + enabled = models.BooleanField(default=True) + managed_by_policy = models.BooleanField(default=True) + cpu_capacity = models.IntegerField( default=0, editable=False, @@ -126,39 +137,83 @@ class Instance(HasPolicyEditsMixin, BaseModel): return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True)) def is_lost(self, ref_time=None): + if self.last_seen is None: + return True if ref_time is None: ref_time = now() - grace_period = 120 - return self.modified < ref_time - timedelta(seconds=grace_period) + grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 + if self.node_type == 'execution': + grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD + return self.last_seen < ref_time - timedelta(seconds=grace_period) - def mark_offline(self, on_good_terms=False): - self.cpu = self.cpu_capacity = self.memory = self.mem_capacity = self.capacity = 0 - update_fields = ['capacity', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'] - if on_good_terms: - update_fields.append('modified') - self.save() + def mark_offline(self, update_last_seen=False, perform_save=True): + if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen): + return + self.cpu_capacity = self.mem_capacity = self.capacity = 0 + if update_last_seen: + self.last_seen = now() - def refresh_capacity(self): - cpu = get_cpu_capacity() - mem = get_mem_capacity() + if perform_save: + update_fields = ['capacity', 'cpu_capacity', 'mem_capacity'] + if update_last_seen: + update_fields += ['last_seen'] + self.save(update_fields=update_fields) + + def set_capacity_value(self): + """Sets capacity according to capacity adjustment rule (no save)""" if self.enabled: - self.capacity = get_system_task_capacity(self.capacity_adjustment) + lower_cap = min(self.mem_capacity, self.cpu_capacity) + higher_cap = max(self.mem_capacity, self.cpu_capacity) + self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment else: self.capacity = 0 + def refresh_capacity_fields(self): + """Update derived capacity fields from cpu and memory (no save)""" + self.cpu_capacity = get_cpu_effective_capacity(self.cpu) + self.mem_capacity = get_mem_effective_capacity(self.memory) + self.set_capacity_value() + + def save_health_data(self, version, cpu, memory, last_seen=None, has_error=False): + update_fields = [] + + if last_seen is not None and self.last_seen != last_seen: + self.last_seen = last_seen + update_fields.append('last_seen') + + if self.version != version: + self.version = version + update_fields.append('version') + + new_cpu = get_corrected_cpu(cpu) + if new_cpu != self.cpu: + self.cpu = new_cpu + update_fields.append('cpu') + + new_memory = get_corrected_memory(memory) + if new_memory != self.memory: + self.memory = new_memory + update_fields.append('memory') + + if not has_error: + self.refresh_capacity_fields() + else: + self.mark_offline(perform_save=False) + update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity']) + + self.save(update_fields=update_fields) + + def local_health_check(self): + """Only call this method on the instance that this record represents""" + has_error = False try: # if redis is down for some reason, that means we can't persist # playbook event data; we should consider this a zero capacity event redis.Redis.from_url(settings.BROKER_URL).ping() except redis.ConnectionError: - self.capacity = 0 + has_error = True - self.cpu = cpu[0] - self.memory = mem[0] - self.cpu_capacity = cpu[1] - self.mem_capacity = mem[1] - self.version = awx_application_version - self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) + self.save_health_data(awx_application_version, measure_cpu(), measure_memory(), last_seen=now(), has_error=has_error) class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 56f10dacc0..9dc0065459 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -98,9 +98,6 @@ from awx.main.utils.common import ( parse_yaml_or_json, cleanup_new_process, create_partition, - get_cpu_effective_capacity, - get_mem_effective_capacity, - get_system_task_capacity, ) from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path from awx.main.utils.ansible import read_ansible_config @@ -180,7 +177,7 @@ def dispatch_startup(): def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) - this_inst.mark_offline(on_good_terms=True) # No thank you to new jobs while shut down + this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down try: reaper.reap(this_inst) except Exception: @@ -403,7 +400,7 @@ def cleanup_execution_environment_images(): @task(queue=get_local_queuename) -def check_heartbeat(node): +def execution_node_health_check(node): try: instance = Instance.objects.get(hostname=node) except Instance.DoesNotExist: @@ -411,64 +408,66 @@ def check_heartbeat(node): return data = worker_info(node) + prior_capacity = instance.capacity + + instance.save_health_data( + 'ansible-runner-' + data.get('Version', '???'), + data.get('CPU Capacity', 0), # TODO: rename field on runner side to not say "Capacity" + data.get('Memory Capacity', 0) * 1000, # TODO: double-check the multiplier here + has_error=bool(data.get('Errors')), + ) + if data['Errors']: formatted_error = "\n".join(data["Errors"]) - if instance.capacity: + if prior_capacity: logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}') else: logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}') - instance.mark_offline() else: - # TODO: spin off new instance method from refresh_capacity that calculates derived fields - instance.cpu = data['CPU Capacity'] # TODO: rename field on runner side to not say "Capacity" - instance.cpu_capacity = get_cpu_effective_capacity(instance.cpu) - instance.memory = data['Memory Capacity'] * 1000 # TODO: double-check the multiplier here - instance.mem_capacity = get_mem_effective_capacity(instance.memory) - instance.capacity = get_system_task_capacity( - instance.capacity_adjustment, - instance.cpu_capacity, - instance.mem_capacity, - ) - instance.version = 'ansible-runner-' + data['Version'] - instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2))) -def discover_receptor_nodes(): +def inspect_execution_nodes(instance_list): + node_lookup = {} + for inst in instance_list: + if inst.node_type == 'execution': + node_lookup[inst.hostname] = inst + ctl = get_receptor_ctl() connections = ctl.simple_command('status')['Advertisements'] nowtime = now() for ad in connections: hostname = ad['NodeID'] - commands = ad['WorkCommands'] or [] + commands = ad.get('WorkCommands') or [] if 'ansible-runner' not in commands: continue - (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution') + changed = False + if hostname in node_lookup: + instance = node_lookup[hostname] + else: + (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution') was_lost = instance.is_lost(ref_time=nowtime) last_seen = parse_date(ad['Time']) - if instance.modified == last_seen: - continue - instance.modified = last_seen - if instance.is_lost(ref_time=nowtime): - # if the instance hasn't advertised in awhile, don't save a new modified time - # this is so multiple cluster nodes do all make repetitive updates - continue - instance.save(update_fields=['modified']) + if instance.last_seen and instance.last_seen >= last_seen: + continue + instance.last_seen = last_seen + instance.save(update_fields=['last_seen']) + if changed: logger.warn("Registered execution node '{}'".format(hostname)) - check_heartbeat.apply_async([hostname]) + execution_node_health_check.apply_async([hostname]) elif was_lost: # if the instance *was* lost, but has appeared again, # attempt to re-establish the initial capacity and version # check logger.warn(f'Execution node attempting to rejoin as instance {hostname}.') - check_heartbeat.apply_async([hostname]) + execution_node_health_check.apply_async([hostname]) elif instance.capacity == 0: # Periodically re-run the health check of errored nodes, in case someone fixed it # TODO: perhaps decrease the frequency of these checks logger.debug(f'Restarting health check for execution node {hostname} with known errors.') - check_heartbeat.apply_async([hostname]) + execution_node_health_check.apply_async([hostname]) @task(queue=get_local_queuename) @@ -479,34 +478,34 @@ def cluster_node_heartbeat(): this_inst = None lost_instances = [] - (changed, instance) = Instance.objects.get_or_register() - if changed: - logger.info("Registered tower control node '{}'".format(instance.hostname)) - - discover_receptor_nodes() - - for inst in list(instance_list): + for inst in instance_list: if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst instance_list.remove(inst) - elif inst.node_type == 'execution': # TODO: zero out capacity of execution nodes that are MIA - # Only considering control plane for this logic - continue - elif inst.is_lost(ref_time=nowtime): + break + else: + (changed, this_inst) = Instance.objects.get_or_register() + if changed: + logger.info("Registered tower control node '{}'".format(this_inst.hostname)) + + inspect_execution_nodes(instance_list) + + for inst in list(instance_list): + if inst.is_lost(ref_time=nowtime): lost_instances.append(inst) instance_list.remove(inst) if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) - this_inst.refresh_capacity() - if startup_event: + this_inst.local_health_check() + if startup_event and this_inst.capacity != 0: logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) return else: raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) # IFF any node has a greater version than we do, then we'll shutdown services for other_inst in instance_list: - if other_inst.version == "" or other_inst.version.startswith('ansible-runner'): + if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution': continue if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: logger.error( @@ -534,7 +533,7 @@ def cluster_node_heartbeat(): # since we will delete the node anyway. if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES: other_inst.mark_offline() - logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified)) + logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen)) elif settings.AWX_AUTO_DEPROVISION_INSTANCES: deprovision_hostname = other_inst.hostname other_inst.delete() diff --git a/awx/main/tests/functional/api/test_instance.py b/awx/main/tests/functional/api/test_instance.py new file mode 100644 index 0000000000..88f0586fd9 --- /dev/null +++ b/awx/main/tests/functional/api/test_instance.py @@ -0,0 +1,32 @@ +import pytest + +from awx.api.versioning import reverse + +from awx.main.models.ha import Instance + + +@pytest.mark.django_db +def test_disabled_zeros_capacity(patch, admin_user): + instance = Instance.objects.create(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42) + + url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) + + r = patch(url=url, data={'enabled': False}, user=admin_user) + assert r.data['capacity'] == 0 + + instance.refresh_from_db() + assert instance.capacity == 0 + + +@pytest.mark.django_db +def test_enabled_sets_capacity(patch, admin_user): + instance = Instance.objects.create(hostname='example-host', enabled=False, cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42, capacity=0) + assert instance.capacity == 0 + + url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) + + r = patch(url=url, data={'enabled': True}, user=admin_user) + assert r.data['capacity'] > 0 + + instance.refresh_from_db() + assert instance.capacity > 0 diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index 197c1197e1..7d97aa0b9b 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -20,24 +20,27 @@ def test_orphan_unified_job_creation(instance, inventory): @pytest.mark.django_db -@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) -@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +@mock.patch('awx.main.tasks.inspect_execution_nodes', lambda *args, **kwargs: None) +@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8) +@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62) def test_job_capacity_and_with_inactive_node(): i = Instance.objects.create(hostname='test-1') - with mock.patch.object(redis.client.Redis, 'ping', lambda self: True): - i.refresh_capacity() + i.save_health_data('18.0.1', 2, 8000) + assert i.enabled is True + assert i.capacity_adjustment == 1.0 assert i.capacity == 62 i.enabled = False i.save() with override_settings(CLUSTER_HOST_ID=i.hostname): - cluster_node_heartbeat() + with mock.patch.object(redis.client.Redis, 'ping', lambda self: True): + cluster_node_heartbeat() i = Instance.objects.get(id=i.id) assert i.capacity == 0 @pytest.mark.django_db -@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) -@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8) +@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62) def test_job_capacity_with_redis_disabled(): i = Instance.objects.create(hostname='test-1') @@ -45,7 +48,7 @@ def test_job_capacity_with_redis_disabled(): raise redis.ConnectionError() with mock.patch.object(redis.client.Redis, 'ping', _raise): - i.refresh_capacity() + i.local_health_check() assert i.capacity == 0 diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index 52d5fdc16c..ec71a47fc2 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -1,10 +1,19 @@ import pytest from unittest import mock from unittest.mock import Mock +from decimal import Decimal -from awx.main.models import ( - InstanceGroup, -) +from awx.main.models import InstanceGroup, Instance + + +@pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3]) +def test_capacity_adjustment_no_save(capacity_adjustment): + inst = Instance(hostname='test-host', capacity_adjustment=Decimal(capacity_adjustment), capacity=0, cpu_capacity=10, mem_capacity=1000) + assert inst.capacity == 0 + assert inst.capacity_adjustment == capacity_adjustment # sanity + inst.set_capacity_value() + assert inst.capacity > 0 + assert inst.capacity == (float(inst.capacity_adjustment) * abs(inst.mem_capacity - inst.cpu_capacity) + min(inst.mem_capacity, inst.cpu_capacity)) def T(impact): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 9194fa51d9..dc324efaf5 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -18,8 +18,6 @@ import tempfile import psutil from functools import reduce, wraps -from decimal import Decimal - # Django from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist from django.utils.dateparse import parse_datetime @@ -72,9 +70,6 @@ __all__ = [ 'set_current_apps', 'extract_ansible_vars', 'get_search_fields', - 'get_system_task_capacity', - 'get_cpu_capacity', - 'get_mem_capacity', 'model_to_dict', 'NullablePromptPseudoField', 'model_instance_diff', @@ -715,7 +710,14 @@ def get_cpu_effective_capacity(cpu_count): return cpu_count * forkcpu -def get_cpu_capacity(): +def measure_cpu(): # TODO: replace with import from ansible-runner + return psutil.cpu_count() + + +def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity + """Some environments will do a correction to the reported CPU number + because the given OpenShift value is a lie + """ from django.conf import settings settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None) @@ -726,9 +728,7 @@ def get_cpu_capacity(): elif settings_abscpu is not None: return 0, int(settings_abscpu) - cpu = psutil.cpu_count() - - return (cpu, get_cpu_effective_capacity(cpu)) + return cpu_count # no correction def get_mem_effective_capacity(mem_mb): @@ -747,7 +747,11 @@ def get_mem_effective_capacity(mem_mb): return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem) -def get_mem_capacity(): +def measure_memory(): # TODO: replace with import from ansible-runner + return psutil.virtual_memory().total + + +def get_corrected_memory(memory): from django.conf import settings settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) @@ -758,33 +762,7 @@ def get_mem_capacity(): elif settings_absmem is not None: return 0, int(settings_absmem) - mem = psutil.virtual_memory().total - return (mem, get_mem_effective_capacity(mem)) - - -def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None): - """ - Measure system memory and use it as a baseline for determining the system's capacity - """ - from django.conf import settings - - settings_forks = getattr(settings, 'SYSTEM_TASK_FORKS_CAPACITY', None) - env_forks = os.getenv('SYSTEM_TASK_FORKS_CAPACITY', None) - - if env_forks: - return int(env_forks) - elif settings_forks: - return int(settings_forks) - - if cpu_capacity is None: - _, cpu_cap = get_cpu_capacity() - else: - cpu_cap = cpu_capacity - if mem_capacity is None: - _, mem_cap = get_mem_capacity() - else: - mem_cap = mem_capacity - return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale) + return memory _inventory_updates = threading.local() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d87ddc072a..b4b5876863 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -419,10 +419,18 @@ DEVSERVER_DEFAULT_PORT = '8013' # Set default ports for live server tests. os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') +# heartbeat period can factor into some forms of logic, so it is maintained as a setting here +CLUSTER_NODE_HEARTBEAT_PERIOD = 60 +RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}}, - 'cluster_heartbeat': {'task': 'awx.main.tasks.cluster_node_heartbeat', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, + 'cluster_heartbeat': { + 'task': 'awx.main.tasks.cluster_node_heartbeat', + 'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD), + 'options': {'expires': 50}, + }, 'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, diff --git a/tools/docker-compose/ansible/roles/sources/templates/receptor-awx.conf.j2 b/tools/docker-compose/ansible/roles/sources/templates/receptor-awx.conf.j2 index e95097e1ef..a274b5b2b7 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/receptor-awx.conf.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/receptor-awx.conf.j2 @@ -2,8 +2,7 @@ - node: id: awx_{{ item }} -- log-level: - debug +- log-level: info - tcp-listener: port: 2222