From 13300bdbd41b8f2a19790259cacee3ed269d819d Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 14 Jul 2021 14:53:50 -0400 Subject: [PATCH] Update rebase to keep old control plane capacity check Also do some basic work to separate control versus execution capacity this is to assure that we don't send jobs to the control node --- .../management/commands/list_instances.py | 5 +-- awx/main/models/ha.py | 30 +++++++++++++++ awx/main/scheduler/task_manager.py | 3 +- awx/main/tasks.py | 38 +++++++++++++------ awx/main/tests/functional/test_jobs.py | 36 +++++++++++++++++- awx/main/utils/common.py | 10 ++++- 6 files changed, 104 insertions(+), 18 deletions(-) diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index 7302953a76..7568f0b45c 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -48,8 +48,7 @@ class Command(BaseCommand): if no_color: color = '' fmt = '\t' + color + '{0.hostname} capacity={0.capacity} version={1}' - if x.last_isolated_check: - fmt += ' last_isolated_check="{0.last_isolated_check:%Y-%m-%d %H:%M:%S}"' - fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"' + if x.capacity: + fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"' print((fmt + '\033[0m').format(x, x.version or '?')) print('') diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 17116c5c3c..3ab2439b95 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -136,6 +136,28 @@ class Instance(HasPolicyEditsMixin, BaseModel): grace_period = 120 return self.modified < ref_time - timedelta(seconds=grace_period) + def refresh_capacity(self): + cpu = get_cpu_capacity() + mem = get_mem_capacity() + if self.enabled: + self.capacity = get_system_task_capacity(self.capacity_adjustment) + else: + self.capacity = 0 + + try: + # if redis is down for some reason, that means we can't persist + # playbook event data; we should consider this a zero capacity event + redis.Redis.from_url(settings.BROKER_URL).ping() + except redis.ConnectionError: + self.capacity = 0 + + self.cpu = cpu[0] + self.memory = mem[0] + self.cpu_capacity = cpu[1] + self.mem_capacity = mem[1] + self.version = awx_application_version + self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) + def is_receptor(self): return self.version.startswith('ansible-runner-') @@ -184,6 +206,11 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def capacity(self): return sum([inst.capacity for inst in self.instances.all()]) + @property + def execution_capacity(self): + # TODO: update query to exclude based on node_type field + return sum([inst.capacity for inst in self.instances.exclude(version__startswith='ansible-runner-')]) + @property def jobs_running(self): return UnifiedJob.objects.filter(status__in=('running', 'waiting'), instance_group=self).count() @@ -206,6 +233,9 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(task, instances): instance_most_capacity = None for i in instances: + # TODO: change this to check if "execution" is in node_type field + if not i.version.startswith('ansible-runner'): + continue if i.remaining_capacity >= task.task_impact and ( instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity ): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index ed80cc6d4f..56a18a1412 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -75,6 +75,7 @@ class TaskManager: instances_partial = [ SimpleNamespace( obj=instance, + version=instance.version, remaining_capacity=instance.remaining_capacity, capacity=instance.capacity, jobs_running=instance.jobs_running, @@ -86,7 +87,7 @@ class TaskManager: instances_by_hostname = {i.hostname: i for i in instances_partial} for rampart_group in InstanceGroup.objects.prefetch_related('instances'): - self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.capacity, consumed_capacity=0, instances=[]) + self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.execution_capacity, consumed_capacity=0, instances=[]) for instance in rampart_group.instances.filter(enabled=True).order_by('hostname'): if instance.hostname in instances_by_hostname: self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname]) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e5a8d4a6c6..27a3482766 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -102,7 +102,6 @@ from awx.main.utils.common import ( parse_yaml_or_json, cleanup_new_process, create_partition, - get_mem_capacity, get_cpu_capacity, get_system_task_capacity, ) @@ -422,15 +421,6 @@ def discover_receptor_nodes(): hostname = ad['NodeID'] commands = ad['WorkCommands'] or [] if 'ansible-runner' not in commands: - if 'local' in commands: - # this node is strictly a control plane node, and does not - # provide ansible-runner as a work command - (changed, instance) = Instance.objects.register(hostname=hostname) - if changed: - logger.info("Registered tower control node '{}'".format(hostname)) - instance.capacity = instance.cpu = instance.memory = instance.cpu_capacity = instance.mem_capacity = 0 # noqa - instance.version = get_awx_version() - instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) continue (changed, instance) = Instance.objects.register(hostname=hostname) was_lost = instance.is_lost(ref_time=nowtime) @@ -454,7 +444,7 @@ def discover_receptor_nodes(): # if the instance *was* lost, but has appeared again, # attempt to re-establish the initial capacity and version # check - logger.warning('Attempting to rejoin the cluster as instance {}.'.format(hostname)) + logger.warning('Execution node attempting to rejoin as instance {}.'.format(hostname)) check_heartbeat.apply_async([hostname]) @@ -463,6 +453,7 @@ def cluster_node_heartbeat(): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all()) + this_inst = None lost_instances = [] (changed, instance) = Instance.objects.get_or_register() @@ -473,10 +464,35 @@ def cluster_node_heartbeat(): for inst in list(instance_list): if inst.hostname == settings.CLUSTER_HOST_ID: + this_inst = inst instance_list.remove(inst) elif inst.is_lost(ref_time=nowtime): lost_instances.append(inst) instance_list.remove(inst) + + if this_inst: + startup_event = this_inst.is_lost(ref_time=nowtime) + this_inst.refresh_capacity() + if startup_event: + logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + return + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + # IFF any node has a greater version than we do, then we'll shutdown services + for other_inst in instance_list: + if other_inst.version == "" or other_inst.version.startswith('ansible-runner'): + continue + if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: + logger.error( + "Host {} reports version {}, but this node {} is at {}, shutting down".format( + other_inst.hostname, other_inst.version, this_inst.hostname, this_inst.version + ) + ) + # Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance. + # The heartbeat task will reset the capacity to the system capacity after upgrade. + stop_local_services(communicate=False) + raise RuntimeError("Shutting down.") + for other_inst in lost_instances: try: reaper.reap(other_inst) diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index 0a3966fc56..197c1197e1 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -1,7 +1,11 @@ +import redis import pytest +from unittest import mock import json -from awx.main.models import Job, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand +from awx.main.models import Job, Instance, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand +from awx.main.tasks import cluster_node_heartbeat +from django.test.utils import override_settings @pytest.mark.django_db @@ -15,6 +19,36 @@ def test_orphan_unified_job_creation(instance, inventory): assert job2.launch_type == 'relaunch' +@pytest.mark.django_db +@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) +@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +def test_job_capacity_and_with_inactive_node(): + i = Instance.objects.create(hostname='test-1') + with mock.patch.object(redis.client.Redis, 'ping', lambda self: True): + i.refresh_capacity() + assert i.capacity == 62 + i.enabled = False + i.save() + with override_settings(CLUSTER_HOST_ID=i.hostname): + cluster_node_heartbeat() + i = Instance.objects.get(id=i.id) + assert i.capacity == 0 + + +@pytest.mark.django_db +@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) +@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +def test_job_capacity_with_redis_disabled(): + i = Instance.objects.create(hostname='test-1') + + def _raise(self): + raise redis.ConnectionError() + + with mock.patch.object(redis.client.Redis, 'ping', _raise): + i.refresh_capacity() + assert i.capacity == 0 + + @pytest.mark.django_db def test_job_type_name(): job = Job.objects.create() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 7a09a8615f..e034dda02c 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -15,6 +15,7 @@ import urllib.parse import threading import contextlib import tempfile +import psutil from functools import reduce, wraps from decimal import Decimal @@ -698,7 +699,7 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -def get_cpu_capacity(raw): +def get_cpu_capacity(raw=None): from django.conf import settings settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) @@ -712,6 +713,9 @@ def get_cpu_capacity(raw): elif settings_abscpu is not None: return 0, int(settings_abscpu) + if raw is None: + raw = psutil.cpu_count() + if env_forkcpu: forkcpu = int(env_forkcpu) elif settings_forkcpu: @@ -721,7 +725,7 @@ def get_cpu_capacity(raw): return (raw, raw * forkcpu) -def get_mem_capacity(raw_mb): +def get_mem_capacity(raw_mb=None): from django.conf import settings settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) @@ -742,6 +746,8 @@ def get_mem_capacity(raw_mb): else: forkmem = 100 + if raw_mb is None: + raw_mb = psutil.virtual_memory().total return (raw_mb, max(1, ((raw_mb // 1024 // 1024) - 2048) // forkmem))