Apply capacity algorithm changes

* This also adds fields to the instance view for tracking cpu and
  memory usage as well as information on what the capacity ranges are
* Also adds a flag for enabling/disabling instances which removes them
  from all queues and has them stop processing new work
* The capacity is now based almost exclusively on some value relative
  to forks
* capacity_adjustment allows you to commit an instance to a certain
  amount of forks, cpu focused or memory focused
* Each job run adds a single fork overhead (that's the reasoning
  behind the +1)
This commit is contained in:
Matthew Jones
2018-01-11 13:33:35 -05:00
parent 6a85fc38dd
commit 70bf78e29f
17 changed files with 248 additions and 76 deletions

View File

@@ -3977,8 +3977,10 @@ class InstanceSerializer(BaseSerializer):
class Meta:
model = Instance
fields = ("id", "type", "url", "related", "uuid", "hostname", "created", "modified",
"version", "capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running")
read_only_fields = ('uuid', 'hostname', 'version')
fields = ("id", "type", "url", "related", "uuid", "hostname", "created", "modified", 'capacity_adjustment',
"version", "capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running",
"cpu", "memory", "cpu_capacity", "mem_capacity", "enabled")
def get_related(self, obj):
res = super(InstanceSerializer, self).get_related(obj)

View File

@@ -57,7 +57,7 @@ import pytz
from wsgiref.util import FileWrapper
# AWX
from awx.main.tasks import send_notifications
from awx.main.tasks import send_notifications, handle_ha_toplogy_changes
from awx.main.access import get_user_queryset
from awx.main.ha import is_ha_environment
from awx.api.authentication import TokenGetAuthentication
@@ -560,7 +560,7 @@ class InstanceList(ListAPIView):
new_in_320 = True
class InstanceDetail(RetrieveAPIView):
class InstanceDetail(RetrieveUpdateAPIView):
view_name = _("Instance Detail")
model = Instance
@@ -568,6 +568,20 @@ class InstanceDetail(RetrieveAPIView):
new_in_320 = True
def update(self, request, *args, **kwargs):
r = super(InstanceDetail, self).update(request, *args, **kwargs)
if status.is_success(r.status_code):
obj = self.get_object()
if obj.enabled:
obj.refresh_capacity()
else:
obj.capacity = 0
obj.save()
handle_ha_toplogy_changes.apply_async()
r.data = InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r
class InstanceUnifiedJobsList(SubListAPIView):
view_name = _("Instance Running Jobs")

View File

@@ -2,12 +2,9 @@
# All Rights Reserved.
import sys
from datetime import timedelta
import logging
from django.db import models
from django.utils.timezone import now
from django.db.models import Sum
from django.conf import settings
from awx.main.utils.filters import SmartFilter

View File

@@ -1,31 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0017_v330_move_deprecated_stdout'),
]
operations = [
migrations.AddField(
model_name='instancegroup',
name='policy_instance_list',
field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group',
blank=True),
),
migrations.AddField(
model_name='instancegroup',
name='policy_instance_minimum',
field=models.IntegerField(default=0, help_text='Static minimum number of Instances to automatically assign to this group'),
),
migrations.AddField(
model_name='instancegroup',
name='policy_instance_percentage',
field=models.IntegerField(default=0, help_text='Percentage of Instances to automatically assign to this group'),
),
]

View File

@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from decimal import Decimal
import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0019_v330_custom_virtualenv'),
]
operations = [
migrations.AddField(
model_name='instancegroup',
name='policy_instance_list',
field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group',
blank=True),
),
migrations.AddField(
model_name='instancegroup',
name='policy_instance_minimum',
field=models.IntegerField(default=0, help_text='Static minimum number of Instances to automatically assign to this group'),
),
migrations.AddField(
model_name='instancegroup',
name='policy_instance_percentage',
field=models.IntegerField(default=0, help_text='Percentage of Instances to automatically assign to this group'),
),
migrations.AddField(
model_name='instance',
name='capacity_adjustment',
field=models.DecimalField(decimal_places=2, default=Decimal('1.0'), max_digits=3),
),
migrations.AddField(
model_name='instance',
name='cpu',
field=models.IntegerField(default=0, editable=False)
),
migrations.AddField(
model_name='instance',
name='memory',
field=models.BigIntegerField(default=0, editable=False)
),
migrations.AddField(
model_name='instance',
name='cpu_capacity',
field=models.IntegerField(default=0, editable=False)
),
migrations.AddField(
model_name='instance',
name='mem_capacity',
field=models.IntegerField(default=0, editable=False)
),
migrations.AddField(
model_name='instance',
name='enabled',
field=models.BooleanField(default=True)
)
]

View File

@@ -184,7 +184,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
# NOTE: We sorta have to assume the host count matches and that forks default to 5
from awx.main.models.inventory import Host
count_hosts = Host.objects.filter( enabled=True, inventory__ad_hoc_commands__pk=self.pk).count()
return min(count_hosts, 5 if self.forks == 0 else self.forks) * 10
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
def copy(self):
data = {}

View File

@@ -1,6 +1,8 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
from decimal import Decimal
from django.db import models, connection
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@@ -10,6 +12,7 @@ from django.utils.timezone import now, timedelta
from solo.models import SingletonModel
from awx import __version__ as awx_application_version
from awx.api.versioning import reverse
from awx.main.managers import InstanceManager, InstanceGroupManager
from awx.main.fields import JSONField
@@ -17,6 +20,7 @@ from awx.main.models.inventory import InventoryUpdate
from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectUpdate
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity
__all__ = ('Instance', 'InstanceGroup', 'JobOrigin', 'TowerScheduleState',)
@@ -39,6 +43,30 @@ class Instance(models.Model):
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(
default=Decimal(1.0),
max_digits=3,
decimal_places=2,
)
enabled = models.BooleanField(
default=True
)
cpu = models.IntegerField(
default=0,
editable=False,
)
memory = models.BigIntegerField(
default=0,
editable=False,
)
cpu_capacity = models.IntegerField(
default=0,
editable=False,
)
mem_capacity = models.IntegerField(
default=0,
editable=False,
)
class Meta:
app_label = 'main'
@@ -68,6 +96,20 @@ class Instance(models.Model):
return Instance.objects.filter(rampart_groups__controller__instances=self).exists()
def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()
self.capacity = get_system_task_capacity(self.capacity_adjustment)
self.cpu = cpu[0]
self.memory = mem[0]
self.cpu_capacity = cpu[1]
self.mem_capacity = mem[1]
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu',
'memory', 'cpu_capacity', 'mem_capacity'])
class InstanceGroup(models.Model):
"""A model representing a Queue/Group of AWX Instances."""
objects = InstanceGroupManager()

View File

@@ -1602,7 +1602,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
@property
def task_impact(self):
return 50
return 1
# InventoryUpdate credential required
# Custom and SCM InventoryUpdate credential not required

View File

@@ -623,7 +623,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
count_hosts = 1
else:
count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count()
return min(count_hosts, 5 if self.forks == 0 else self.forks) * 10
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
@property
def successful_hosts(self):
@@ -1190,7 +1190,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
@property
def task_impact(self):
return 150
return 5
@property
def preferred_instance_groups(self):

View File

@@ -492,7 +492,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
@property
def task_impact(self):
return 0 if self.job_type == 'run' else 20
return 0 if self.job_type == 'run' else 1
@property
def result_stdout(self):

View File

@@ -2,7 +2,6 @@
# All Rights Reserved.
# Python
import codecs
from collections import OrderedDict, namedtuple
import ConfigParser
import cStringIO
@@ -54,9 +53,8 @@ from awx.main.queue import CallbackQueueDispatcher
from awx.main.expect import run, isolated_manager
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, get_licenser,
wrap_args_with_proot, get_system_task_capacity, OutputEventFilter,
ignore_inventory_computed_fields, ignore_inventory_group_removal,
get_type_for_model, extract_ansible_vars)
wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields,
ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars)
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
@@ -307,6 +305,7 @@ def cluster_node_heartbeat(self):
instance_list = list(Instance.objects.filter(rampart_groups__controller__isnull=True).distinct())
this_inst = None
lost_instances = []
for inst in list(instance_list):
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
@@ -316,11 +315,15 @@ def cluster_node_heartbeat(self):
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
if this_inst.capacity == 0:
if this_inst.capacity == 0 and this_inst.enabled:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
this_inst.capacity = get_system_task_capacity()
this_inst.version = awx_application_version
this_inst.save(update_fields=['capacity', 'version', 'modified'])
if this_inst.enabled:
this_inst.refresh_capacity()
handle_ha_toplogy_changes.apply_async()
elif this_inst.capacity != 0 and not this_inst.enabled:
this_inst.capacity = 0
this_inst.save(update_fields=['capacity'])
handle_ha_toplogy_changes.apply_async()
if startup_event:
return
else:
@@ -329,7 +332,7 @@ def cluster_node_heartbeat(self):
for other_inst in instance_list:
if other_inst.version == "":
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG:
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname,
other_inst.version,
this_inst.hostname,

View File

@@ -1,9 +1,11 @@
from awx.main.models import Job, Instance
from django.test.utils import override_settings
import pytest
import mock
import json
from awx.main.models import Job, Instance
from awx.main.tasks import cluster_node_heartbeat
from django.test.utils import override_settings
@pytest.mark.django_db
def test_orphan_unified_job_creation(instance, inventory):
@@ -17,13 +19,19 @@ def test_orphan_unified_job_creation(instance, inventory):
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2,8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000,62))
@mock.patch('awx.main.tasks.handle_ha_toplogy_changes.apply_async', lambda: True)
def test_job_capacity_and_with_inactive_node():
Instance.objects.create(hostname='test-1', capacity=50)
assert Instance.objects.total_capacity() == 50
Instance.objects.create(hostname='test-2', capacity=50)
assert Instance.objects.total_capacity() == 100
with override_settings(AWX_ACTIVE_NODE_TIME=0):
assert Instance.objects.total_capacity() < 100
i = Instance.objects.create(hostname='test-1')
i.refresh_capacity()
assert i.capacity == 62
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):
cluster_node_heartbeat()
i = Instance.objects.get(id=i.id)
assert i.capacity == 0
@pytest.mark.django_db

View File

@@ -60,6 +60,7 @@ class TestAddRemoveCeleryWorkerQueues():
static_queues, _worker_queues,
groups, hostname,
added_expected, removed_expected):
added_expected.append('tower_instance_router')
instance = instance_generator(groups=groups, hostname=hostname)
worker_queues = worker_queues_generator(_worker_queues)
with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues):

View File

@@ -20,6 +20,8 @@ import six
import psutil
from StringIO import StringIO
from decimal import Decimal
# Decorator
from decorator import decorator
@@ -45,7 +47,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps', 'OutputEventFilter',
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity',
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity',
'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict',
'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices']
@@ -632,19 +634,52 @@ def parse_yaml_or_json(vars_str, silent_failure=True):
return vars_dict
@memoize()
def get_system_task_capacity():
def get_cpu_capacity():
from django.conf import settings
settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None)
env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None)
cpu = psutil.cpu_count()
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
forkcpu = int(settings_forkcpu)
else:
forkcpu = 4
return (cpu, cpu * forkcpu)
def get_mem_capacity():
from django.conf import settings
settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None)
env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None)
if env_forkmem:
forkmem = int(env_forkmem)
elif settings_forkmem:
forkmem = int(settings_forkmem)
else:
forkmem = 100
mem = psutil.virtual_memory().total
return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem))
def get_system_task_capacity(scale=Decimal(1.0)):
'''
Measure system memory and use it as a baseline for determining the system's capacity
'''
from django.conf import settings
if hasattr(settings, 'SYSTEM_TASK_CAPACITY'):
return settings.SYSTEM_TASK_CAPACITY
mem = psutil.virtual_memory()
total_mem_value = mem.total / 1024 / 1024
if total_mem_value <= 2048:
return 50
return 50 + ((total_mem_value / 1024) - 2) * 75
settings_forks = getattr(settings, 'SYSTEM_TASK_FORKS_CAPACITY', None)
env_forks = os.getenv('SYSTEM_TASK_FORKS_CAPACITY', None)
if env_forks:
return int(env_forks)
elif settings_forks:
return int(settings_forks)
_, cpu_cap = get_cpu_capacity()
_, mem_cap = get_mem_capacity()
return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale)
_inventory_updates = threading.local()

View File

@@ -24,7 +24,7 @@ def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name):
queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC:
continue
if queue['name'] not in ig_names | set([instance.hostname]):
if queue['name'] not in ig_names | set([instance.hostname]) or not instance.enabled:
app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name])
removed_queues.append(queue['name'])
@@ -43,7 +43,6 @@ def update_celery_worker_routes(instance, conf):
'awx.main.tasks.purge_old_stdout_files',
]
routes_updated = {}
# Instance is, effectively, a controller node
if instance.is_controller():
tasks.append('awx.main.tasks.awx_isolated_heartbeat')

View File

@@ -639,9 +639,6 @@ AWX_PROOT_BASE_PATH = "/tmp"
# Note: This setting may be overridden by database settings.
AWX_ANSIBLE_CALLBACK_PLUGINS = ""
# Time at which an HA node is considered active
AWX_ACTIVE_NODE_TIME = 7200
# Automatically remove nodes that have missed their heartbeats after some time
AWX_AUTO_DEPROVISION_INSTANCES = False