diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index 7302953a76..7568f0b45c 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -48,8 +48,7 @@ class Command(BaseCommand): if no_color: color = '' fmt = '\t' + color + '{0.hostname} capacity={0.capacity} version={1}' - if x.last_isolated_check: - fmt += ' last_isolated_check="{0.last_isolated_check:%Y-%m-%d %H:%M:%S}"' - fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"' + if x.capacity: + fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"' print((fmt + '\033[0m').format(x, x.version or '?')) print('') diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 17116c5c3c..3ab2439b95 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -136,6 +136,28 @@ class Instance(HasPolicyEditsMixin, BaseModel): grace_period = 120 return self.modified < ref_time - timedelta(seconds=grace_period) + def refresh_capacity(self): + cpu = get_cpu_capacity() + mem = get_mem_capacity() + if self.enabled: + self.capacity = get_system_task_capacity(self.capacity_adjustment) + else: + self.capacity = 0 + + try: + # if redis is down for some reason, that means we can't persist + # playbook event data; we should consider this a zero capacity event + redis.Redis.from_url(settings.BROKER_URL).ping() + except redis.ConnectionError: + self.capacity = 0 + + self.cpu = cpu[0] + self.memory = mem[0] + self.cpu_capacity = cpu[1] + self.mem_capacity = mem[1] + self.version = awx_application_version + self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) + def is_receptor(self): return self.version.startswith('ansible-runner-') @@ -184,6 +206,11 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def capacity(self): return sum([inst.capacity for inst in self.instances.all()]) + @property + def execution_capacity(self): + # TODO: update query to exclude based on node_type field + return sum([inst.capacity for inst in self.instances.exclude(version__startswith='ansible-runner-')]) + @property def jobs_running(self): return UnifiedJob.objects.filter(status__in=('running', 'waiting'), instance_group=self).count() @@ -206,6 +233,9 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(task, instances): instance_most_capacity = None for i in instances: + # TODO: change this to check if "execution" is in node_type field + if not i.version.startswith('ansible-runner'): + continue if i.remaining_capacity >= task.task_impact and ( instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity ): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index ed80cc6d4f..56a18a1412 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -75,6 +75,7 @@ class TaskManager: instances_partial = [ SimpleNamespace( obj=instance, + version=instance.version, remaining_capacity=instance.remaining_capacity, capacity=instance.capacity, jobs_running=instance.jobs_running, @@ -86,7 +87,7 @@ class TaskManager: instances_by_hostname = {i.hostname: i for i in instances_partial} for rampart_group in InstanceGroup.objects.prefetch_related('instances'): - self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.capacity, consumed_capacity=0, instances=[]) + self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.execution_capacity, consumed_capacity=0, instances=[]) for instance in rampart_group.instances.filter(enabled=True).order_by('hostname'): if instance.hostname in instances_by_hostname: self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname]) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e5a8d4a6c6..27a3482766 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -102,7 +102,6 @@ from awx.main.utils.common import ( parse_yaml_or_json, cleanup_new_process, create_partition, - get_mem_capacity, get_cpu_capacity, get_system_task_capacity, ) @@ -422,15 +421,6 @@ def discover_receptor_nodes(): hostname = ad['NodeID'] commands = ad['WorkCommands'] or [] if 'ansible-runner' not in commands: - if 'local' in commands: - # this node is strictly a control plane node, and does not - # provide ansible-runner as a work command - (changed, instance) = Instance.objects.register(hostname=hostname) - if changed: - logger.info("Registered tower control node '{}'".format(hostname)) - instance.capacity = instance.cpu = instance.memory = instance.cpu_capacity = instance.mem_capacity = 0 # noqa - instance.version = get_awx_version() - instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) continue (changed, instance) = Instance.objects.register(hostname=hostname) was_lost = instance.is_lost(ref_time=nowtime) @@ -454,7 +444,7 @@ def discover_receptor_nodes(): # if the instance *was* lost, but has appeared again, # attempt to re-establish the initial capacity and version # check - logger.warning('Attempting to rejoin the cluster as instance {}.'.format(hostname)) + logger.warning('Execution node attempting to rejoin as instance {}.'.format(hostname)) check_heartbeat.apply_async([hostname]) @@ -463,6 +453,7 @@ def cluster_node_heartbeat(): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all()) + this_inst = None lost_instances = [] (changed, instance) = Instance.objects.get_or_register() @@ -473,10 +464,35 @@ def cluster_node_heartbeat(): for inst in list(instance_list): if inst.hostname == settings.CLUSTER_HOST_ID: + this_inst = inst instance_list.remove(inst) elif inst.is_lost(ref_time=nowtime): lost_instances.append(inst) instance_list.remove(inst) + + if this_inst: + startup_event = this_inst.is_lost(ref_time=nowtime) + this_inst.refresh_capacity() + if startup_event: + logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + return + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + # IFF any node has a greater version than we do, then we'll shutdown services + for other_inst in instance_list: + if other_inst.version == "" or other_inst.version.startswith('ansible-runner'): + continue + if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: + logger.error( + "Host {} reports version {}, but this node {} is at {}, shutting down".format( + other_inst.hostname, other_inst.version, this_inst.hostname, this_inst.version + ) + ) + # Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance. + # The heartbeat task will reset the capacity to the system capacity after upgrade. + stop_local_services(communicate=False) + raise RuntimeError("Shutting down.") + for other_inst in lost_instances: try: reaper.reap(other_inst) diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index 0a3966fc56..197c1197e1 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -1,7 +1,11 @@ +import redis import pytest +from unittest import mock import json -from awx.main.models import Job, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand +from awx.main.models import Job, Instance, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand +from awx.main.tasks import cluster_node_heartbeat +from django.test.utils import override_settings @pytest.mark.django_db @@ -15,6 +19,36 @@ def test_orphan_unified_job_creation(instance, inventory): assert job2.launch_type == 'relaunch' +@pytest.mark.django_db +@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) +@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +def test_job_capacity_and_with_inactive_node(): + i = Instance.objects.create(hostname='test-1') + with mock.patch.object(redis.client.Redis, 'ping', lambda self: True): + i.refresh_capacity() + assert i.capacity == 62 + i.enabled = False + i.save() + with override_settings(CLUSTER_HOST_ID=i.hostname): + cluster_node_heartbeat() + i = Instance.objects.get(id=i.id) + assert i.capacity == 0 + + +@pytest.mark.django_db +@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8)) +@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62)) +def test_job_capacity_with_redis_disabled(): + i = Instance.objects.create(hostname='test-1') + + def _raise(self): + raise redis.ConnectionError() + + with mock.patch.object(redis.client.Redis, 'ping', _raise): + i.refresh_capacity() + assert i.capacity == 0 + + @pytest.mark.django_db def test_job_type_name(): job = Job.objects.create() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 7a09a8615f..e034dda02c 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -15,6 +15,7 @@ import urllib.parse import threading import contextlib import tempfile +import psutil from functools import reduce, wraps from decimal import Decimal @@ -698,7 +699,7 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -def get_cpu_capacity(raw): +def get_cpu_capacity(raw=None): from django.conf import settings settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) @@ -712,6 +713,9 @@ def get_cpu_capacity(raw): elif settings_abscpu is not None: return 0, int(settings_abscpu) + if raw is None: + raw = psutil.cpu_count() + if env_forkcpu: forkcpu = int(env_forkcpu) elif settings_forkcpu: @@ -721,7 +725,7 @@ def get_cpu_capacity(raw): return (raw, raw * forkcpu) -def get_mem_capacity(raw_mb): +def get_mem_capacity(raw_mb=None): from django.conf import settings settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) @@ -742,6 +746,8 @@ def get_mem_capacity(raw_mb): else: forkmem = 100 + if raw_mb is None: + raw_mb = psutil.virtual_memory().total return (raw_mb, max(1, ((raw_mb // 1024 // 1024) - 2048) // forkmem))