Update rebase to keep old control plane capacity check

Also do some basic work to separate control versus execution capacity
  this is to assure that we don't send jobs to the control node
This commit is contained in:
Alan Rominger 2021-07-14 14:53:50 -04:00
parent b09da48835
commit 13300bdbd4
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559
6 changed files with 104 additions and 18 deletions

View File

@ -48,8 +48,7 @@ class Command(BaseCommand):
if no_color:
color = ''
fmt = '\t' + color + '{0.hostname} capacity={0.capacity} version={1}'
if x.last_isolated_check:
fmt += ' last_isolated_check="{0.last_isolated_check:%Y-%m-%d %H:%M:%S}"'
fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"'
if x.capacity:
fmt += ' heartbeat="{0.modified:%Y-%m-%d %H:%M:%S}"'
print((fmt + '\033[0m').format(x, x.version or '?'))
print('')

View File

@ -136,6 +136,28 @@ class Instance(HasPolicyEditsMixin, BaseModel):
grace_period = 120
return self.modified < ref_time - timedelta(seconds=grace_period)
def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()
if self.enabled:
self.capacity = get_system_task_capacity(self.capacity_adjustment)
else:
self.capacity = 0
try:
# if redis is down for some reason, that means we can't persist
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
self.capacity = 0
self.cpu = cpu[0]
self.memory = mem[0]
self.cpu_capacity = cpu[1]
self.mem_capacity = mem[1]
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
def is_receptor(self):
return self.version.startswith('ansible-runner-')
@ -184,6 +206,11 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
def capacity(self):
return sum([inst.capacity for inst in self.instances.all()])
@property
def execution_capacity(self):
# TODO: update query to exclude based on node_type field
return sum([inst.capacity for inst in self.instances.exclude(version__startswith='ansible-runner-')])
@property
def jobs_running(self):
return UnifiedJob.objects.filter(status__in=('running', 'waiting'), instance_group=self).count()
@ -206,6 +233,9 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in instances:
# TODO: change this to check if "execution" is in node_type field
if not i.version.startswith('ansible-runner'):
continue
if i.remaining_capacity >= task.task_impact and (
instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity
):

View File

@ -75,6 +75,7 @@ class TaskManager:
instances_partial = [
SimpleNamespace(
obj=instance,
version=instance.version,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
@ -86,7 +87,7 @@ class TaskManager:
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.capacity, consumed_capacity=0, instances=[])
self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.execution_capacity, consumed_capacity=0, instances=[])
for instance in rampart_group.instances.filter(enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])

View File

@ -102,7 +102,6 @@ from awx.main.utils.common import (
parse_yaml_or_json,
cleanup_new_process,
create_partition,
get_mem_capacity,
get_cpu_capacity,
get_system_task_capacity,
)
@ -422,15 +421,6 @@ def discover_receptor_nodes():
hostname = ad['NodeID']
commands = ad['WorkCommands'] or []
if 'ansible-runner' not in commands:
if 'local' in commands:
# this node is strictly a control plane node, and does not
# provide ansible-runner as a work command
(changed, instance) = Instance.objects.register(hostname=hostname)
if changed:
logger.info("Registered tower control node '{}'".format(hostname))
instance.capacity = instance.cpu = instance.memory = instance.cpu_capacity = instance.mem_capacity = 0 # noqa
instance.version = get_awx_version()
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
continue
(changed, instance) = Instance.objects.register(hostname=hostname)
was_lost = instance.is_lost(ref_time=nowtime)
@ -454,7 +444,7 @@ def discover_receptor_nodes():
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warning('Attempting to rejoin the cluster as instance {}.'.format(hostname))
logger.warning('Execution node attempting to rejoin as instance {}.'.format(hostname))
check_heartbeat.apply_async([hostname])
@ -463,6 +453,7 @@ def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all())
this_inst = None
lost_instances = []
(changed, instance) = Instance.objects.get_or_register()
@ -473,10 +464,35 @@ def cluster_node_heartbeat():
for inst in list(instance_list):
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
this_inst.refresh_capacity()
if startup_event:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
return
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.version == "" or other_inst.version.startswith('ansible-runner'):
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error(
"Host {} reports version {}, but this node {} is at {}, shutting down".format(
other_inst.hostname, other_inst.version, this_inst.hostname, this_inst.version
)
)
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
# The heartbeat task will reset the capacity to the system capacity after upgrade.
stop_local_services(communicate=False)
raise RuntimeError("Shutting down.")
for other_inst in lost_instances:
try:
reaper.reap(other_inst)

View File

@ -1,7 +1,11 @@
import redis
import pytest
from unittest import mock
import json
from awx.main.models import Job, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand
from awx.main.models import Job, Instance, JobHostSummary, InventoryUpdate, InventorySource, Project, ProjectUpdate, SystemJob, AdHocCommand
from awx.main.tasks import cluster_node_heartbeat
from django.test.utils import override_settings
@pytest.mark.django_db
@ -15,6 +19,36 @@ def test_orphan_unified_job_creation(instance, inventory):
assert job2.launch_type == 'relaunch'
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
with mock.patch.object(redis.client.Redis, 'ping', lambda self: True):
i.refresh_capacity()
assert i.capacity == 62
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):
cluster_node_heartbeat()
i = Instance.objects.get(id=i.id)
assert i.capacity == 0
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
def test_job_capacity_with_redis_disabled():
i = Instance.objects.create(hostname='test-1')
def _raise(self):
raise redis.ConnectionError()
with mock.patch.object(redis.client.Redis, 'ping', _raise):
i.refresh_capacity()
assert i.capacity == 0
@pytest.mark.django_db
def test_job_type_name():
job = Job.objects.create()

View File

@ -15,6 +15,7 @@ import urllib.parse
import threading
import contextlib
import tempfile
import psutil
from functools import reduce, wraps
from decimal import Decimal
@ -698,7 +699,7 @@ def parse_yaml_or_json(vars_str, silent_failure=True):
return vars_dict
def get_cpu_capacity(raw):
def get_cpu_capacity(raw=None):
from django.conf import settings
settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None)
@ -712,6 +713,9 @@ def get_cpu_capacity(raw):
elif settings_abscpu is not None:
return 0, int(settings_abscpu)
if raw is None:
raw = psutil.cpu_count()
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
@ -721,7 +725,7 @@ def get_cpu_capacity(raw):
return (raw, raw * forkcpu)
def get_mem_capacity(raw_mb):
def get_mem_capacity(raw_mb=None):
from django.conf import settings
settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None)
@ -742,6 +746,8 @@ def get_mem_capacity(raw_mb):
else:
forkmem = 100
if raw_mb is None:
raw_mb = psutil.virtual_memory().total
return (raw_mb, max(1, ((raw_mb // 1024 // 1024) - 2048) // forkmem))