diff --git a/awx/api/serializers.py b/awx/api/serializers.py index f134d393e6..003a80e317 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3977,8 +3977,10 @@ class InstanceSerializer(BaseSerializer): class Meta: model = Instance - fields = ("id", "type", "url", "related", "uuid", "hostname", "created", "modified", - "version", "capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running") + read_only_fields = ('uuid', 'hostname', 'version') + fields = ("id", "type", "url", "related", "uuid", "hostname", "created", "modified", 'capacity_adjustment', + "version", "capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running", + "cpu", "memory", "cpu_capacity", "mem_capacity", "enabled") def get_related(self, obj): res = super(InstanceSerializer, self).get_related(obj) diff --git a/awx/api/views.py b/awx/api/views.py index 56bee58c21..4a87204248 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -57,7 +57,7 @@ import pytz from wsgiref.util import FileWrapper # AWX -from awx.main.tasks import send_notifications +from awx.main.tasks import send_notifications, handle_ha_toplogy_changes from awx.main.access import get_user_queryset from awx.main.ha import is_ha_environment from awx.api.authentication import TokenGetAuthentication @@ -560,7 +560,7 @@ class InstanceList(ListAPIView): new_in_320 = True -class InstanceDetail(RetrieveAPIView): +class InstanceDetail(RetrieveUpdateAPIView): view_name = _("Instance Detail") model = Instance @@ -568,6 +568,20 @@ class InstanceDetail(RetrieveAPIView): new_in_320 = True + def update(self, request, *args, **kwargs): + r = super(InstanceDetail, self).update(request, *args, **kwargs) + if status.is_success(r.status_code): + obj = self.get_object() + if obj.enabled: + obj.refresh_capacity() + else: + obj.capacity = 0 + obj.save() + handle_ha_toplogy_changes.apply_async() + r.data = InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj) + return r + + class InstanceUnifiedJobsList(SubListAPIView): view_name = _("Instance Running Jobs") diff --git a/awx/main/managers.py b/awx/main/managers.py index f6f2dfd5b7..70c402f672 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -2,12 +2,9 @@ # All Rights Reserved. import sys -from datetime import timedelta import logging from django.db import models -from django.utils.timezone import now -from django.db.models import Sum from django.conf import settings from awx.main.utils.filters import SmartFilter diff --git a/awx/main/migrations/0018_v330_instancegroup_policies.py b/awx/main/migrations/0018_v330_instancegroup_policies.py deleted file mode 100644 index 63403f6766..0000000000 --- a/awx/main/migrations/0018_v330_instancegroup_policies.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - -from django.db import migrations, models -import awx.main.fields - - -class Migration(migrations.Migration): - - dependencies = [ - ('main', '0017_v330_move_deprecated_stdout'), - ] - - operations = [ - migrations.AddField( - model_name='instancegroup', - name='policy_instance_list', - field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group', - blank=True), - ), - migrations.AddField( - model_name='instancegroup', - name='policy_instance_minimum', - field=models.IntegerField(default=0, help_text='Static minimum number of Instances to automatically assign to this group'), - ), - migrations.AddField( - model_name='instancegroup', - name='policy_instance_percentage', - field=models.IntegerField(default=0, help_text='Percentage of Instances to automatically assign to this group'), - ), - ] diff --git a/awx/main/migrations/0020_v330_instancegroup_policies.py b/awx/main/migrations/0020_v330_instancegroup_policies.py new file mode 100644 index 0000000000..a6716352e9 --- /dev/null +++ b/awx/main/migrations/0020_v330_instancegroup_policies.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models +from decimal import Decimal +import awx.main.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0019_v330_custom_virtualenv'), + ] + + operations = [ + migrations.AddField( + model_name='instancegroup', + name='policy_instance_list', + field=awx.main.fields.JSONField(default=[], help_text='List of exact-match Instances that will always be automatically assigned to this group', + blank=True), + ), + migrations.AddField( + model_name='instancegroup', + name='policy_instance_minimum', + field=models.IntegerField(default=0, help_text='Static minimum number of Instances to automatically assign to this group'), + ), + migrations.AddField( + model_name='instancegroup', + name='policy_instance_percentage', + field=models.IntegerField(default=0, help_text='Percentage of Instances to automatically assign to this group'), + ), + migrations.AddField( + model_name='instance', + name='capacity_adjustment', + field=models.DecimalField(decimal_places=2, default=Decimal('1.0'), max_digits=3), + ), + migrations.AddField( + model_name='instance', + name='cpu', + field=models.IntegerField(default=0, editable=False) + ), + migrations.AddField( + model_name='instance', + name='memory', + field=models.BigIntegerField(default=0, editable=False) + ), + migrations.AddField( + model_name='instance', + name='cpu_capacity', + field=models.IntegerField(default=0, editable=False) + ), + migrations.AddField( + model_name='instance', + name='mem_capacity', + field=models.IntegerField(default=0, editable=False) + ), + migrations.AddField( + model_name='instance', + name='enabled', + field=models.BooleanField(default=True) + ) + ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 56137378d6..3913a4ace7 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -184,7 +184,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): # NOTE: We sorta have to assume the host count matches and that forks default to 5 from awx.main.models.inventory import Host count_hosts = Host.objects.filter( enabled=True, inventory__ad_hoc_commands__pk=self.pk).count() - return min(count_hosts, 5 if self.forks == 0 else self.forks) * 10 + return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 def copy(self): data = {} diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index c7a10b5e00..bf1d7f8266 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -1,6 +1,8 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +from decimal import Decimal + from django.db import models, connection from django.db.models.signals import post_save, post_delete from django.dispatch import receiver @@ -10,6 +12,7 @@ from django.utils.timezone import now, timedelta from solo.models import SingletonModel +from awx import __version__ as awx_application_version from awx.api.versioning import reverse from awx.main.managers import InstanceManager, InstanceGroupManager from awx.main.fields import JSONField @@ -17,6 +20,7 @@ from awx.main.models.inventory import InventoryUpdate from awx.main.models.jobs import Job from awx.main.models.projects import ProjectUpdate from awx.main.models.unified_jobs import UnifiedJob +from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity __all__ = ('Instance', 'InstanceGroup', 'JobOrigin', 'TowerScheduleState',) @@ -39,6 +43,30 @@ class Instance(models.Model): default=100, editable=False, ) + capacity_adjustment = models.DecimalField( + default=Decimal(1.0), + max_digits=3, + decimal_places=2, + ) + enabled = models.BooleanField( + default=True + ) + cpu = models.IntegerField( + default=0, + editable=False, + ) + memory = models.BigIntegerField( + default=0, + editable=False, + ) + cpu_capacity = models.IntegerField( + default=0, + editable=False, + ) + mem_capacity = models.IntegerField( + default=0, + editable=False, + ) class Meta: app_label = 'main' @@ -68,6 +96,20 @@ class Instance(models.Model): return Instance.objects.filter(rampart_groups__controller__instances=self).exists() + def refresh_capacity(self): + cpu = get_cpu_capacity() + mem = get_mem_capacity() + self.capacity = get_system_task_capacity(self.capacity_adjustment) + self.cpu = cpu[0] + self.memory = mem[0] + self.cpu_capacity = cpu[1] + self.mem_capacity = mem[1] + self.version = awx_application_version + self.save(update_fields=['capacity', 'version', 'modified', 'cpu', + 'memory', 'cpu_capacity', 'mem_capacity']) + + + class InstanceGroup(models.Model): """A model representing a Queue/Group of AWX Instances.""" objects = InstanceGroupManager() diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index fc437e236e..832e66ea04 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -1602,7 +1602,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, @property def task_impact(self): - return 50 + return 1 # InventoryUpdate credential required # Custom and SCM InventoryUpdate credential not required diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 4d3213b12c..a50c222b70 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -623,7 +623,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana count_hosts = 1 else: count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count() - return min(count_hosts, 5 if self.forks == 0 else self.forks) * 10 + return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 @property def successful_hosts(self): @@ -1190,7 +1190,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): @property def task_impact(self): - return 150 + return 5 @property def preferred_instance_groups(self): diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 5794d170d7..0ee6aac241 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -492,7 +492,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage @property def task_impact(self): - return 0 if self.job_type == 'run' else 20 + return 0 if self.job_type == 'run' else 1 @property def result_stdout(self): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 698fda006e..d41f21b087 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import codecs from collections import OrderedDict, namedtuple import ConfigParser import cStringIO @@ -54,9 +53,8 @@ from awx.main.queue import CallbackQueueDispatcher from awx.main.expect import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, get_licenser, - wrap_args_with_proot, get_system_task_capacity, OutputEventFilter, - ignore_inventory_computed_fields, ignore_inventory_group_removal, - get_type_for_model, extract_ansible_vars) + wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields, + ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues @@ -307,6 +305,7 @@ def cluster_node_heartbeat(self): instance_list = list(Instance.objects.filter(rampart_groups__controller__isnull=True).distinct()) this_inst = None lost_instances = [] + for inst in list(instance_list): if inst.hostname == settings.CLUSTER_HOST_ID: this_inst = inst @@ -316,11 +315,15 @@ def cluster_node_heartbeat(self): instance_list.remove(inst) if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) - if this_inst.capacity == 0: + if this_inst.capacity == 0 and this_inst.enabled: logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) - this_inst.capacity = get_system_task_capacity() - this_inst.version = awx_application_version - this_inst.save(update_fields=['capacity', 'version', 'modified']) + if this_inst.enabled: + this_inst.refresh_capacity() + handle_ha_toplogy_changes.apply_async() + elif this_inst.capacity != 0 and not this_inst.enabled: + this_inst.capacity = 0 + this_inst.save(update_fields=['capacity']) + handle_ha_toplogy_changes.apply_async() if startup_event: return else: @@ -329,7 +332,7 @@ def cluster_node_heartbeat(self): for other_inst in instance_list: if other_inst.version == "": continue - if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version) and not settings.DEBUG: + if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG: logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(other_inst.hostname, other_inst.version, this_inst.hostname, diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index e9504d1232..aa95574b36 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -1,9 +1,11 @@ -from awx.main.models import Job, Instance -from django.test.utils import override_settings import pytest - +import mock import json +from awx.main.models import Job, Instance +from awx.main.tasks import cluster_node_heartbeat +from django.test.utils import override_settings + @pytest.mark.django_db def test_orphan_unified_job_creation(instance, inventory): @@ -17,13 +19,19 @@ def test_orphan_unified_job_creation(instance, inventory): @pytest.mark.django_db +@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2,8)) +@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000,62)) +@mock.patch('awx.main.tasks.handle_ha_toplogy_changes.apply_async', lambda: True) def test_job_capacity_and_with_inactive_node(): - Instance.objects.create(hostname='test-1', capacity=50) - assert Instance.objects.total_capacity() == 50 - Instance.objects.create(hostname='test-2', capacity=50) - assert Instance.objects.total_capacity() == 100 - with override_settings(AWX_ACTIVE_NODE_TIME=0): - assert Instance.objects.total_capacity() < 100 + i = Instance.objects.create(hostname='test-1') + i.refresh_capacity() + assert i.capacity == 62 + i.enabled = False + i.save() + with override_settings(CLUSTER_HOST_ID=i.hostname): + cluster_node_heartbeat() + i = Instance.objects.get(id=i.id) + assert i.capacity == 0 @pytest.mark.django_db diff --git a/awx/main/tests/unit/utils/test_ha.py b/awx/main/tests/unit/utils/test_ha.py index b71ca454e1..3dd9adfc35 100644 --- a/awx/main/tests/unit/utils/test_ha.py +++ b/awx/main/tests/unit/utils/test_ha.py @@ -60,6 +60,7 @@ class TestAddRemoveCeleryWorkerQueues(): static_queues, _worker_queues, groups, hostname, added_expected, removed_expected): + added_expected.append('tower_instance_router') instance = instance_generator(groups=groups, hostname=hostname) worker_queues = worker_queues_generator(_worker_queues) with mock.patch('awx.main.utils.ha.settings.AWX_CELERY_QUEUES_STATIC', static_queues): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 443da1f1f3..f2ea63c4ed 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -20,6 +20,8 @@ import six import psutil from StringIO import StringIO +from decimal import Decimal + # Decorator from decorator import decorator @@ -45,7 +47,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided', 'get_current_apps', 'set_current_apps', 'OutputEventFilter', - 'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', + 'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity', 'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict', 'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest', 'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices'] @@ -632,19 +634,52 @@ def parse_yaml_or_json(vars_str, silent_failure=True): return vars_dict -@memoize() -def get_system_task_capacity(): +def get_cpu_capacity(): + from django.conf import settings + settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None) + env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None) + cpu = psutil.cpu_count() + + if env_forkcpu: + forkcpu = int(env_forkcpu) + elif settings_forkcpu: + forkcpu = int(settings_forkcpu) + else: + forkcpu = 4 + return (cpu, cpu * forkcpu) + + +def get_mem_capacity(): + from django.conf import settings + settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None) + env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None) + if env_forkmem: + forkmem = int(env_forkmem) + elif settings_forkmem: + forkmem = int(settings_forkmem) + else: + forkmem = 100 + + mem = psutil.virtual_memory().total + return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem)) + + +def get_system_task_capacity(scale=Decimal(1.0)): ''' Measure system memory and use it as a baseline for determining the system's capacity ''' from django.conf import settings - if hasattr(settings, 'SYSTEM_TASK_CAPACITY'): - return settings.SYSTEM_TASK_CAPACITY - mem = psutil.virtual_memory() - total_mem_value = mem.total / 1024 / 1024 - if total_mem_value <= 2048: - return 50 - return 50 + ((total_mem_value / 1024) - 2) * 75 + settings_forks = getattr(settings, 'SYSTEM_TASK_FORKS_CAPACITY', None) + env_forks = os.getenv('SYSTEM_TASK_FORKS_CAPACITY', None) + + if env_forks: + return int(env_forks) + elif settings_forks: + return int(settings_forks) + + _, cpu_cap = get_cpu_capacity() + _, mem_cap = get_mem_capacity() + return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale) _inventory_updates = threading.local() diff --git a/awx/main/utils/ha.py b/awx/main/utils/ha.py index 376faf9eb9..bb3a0a73cc 100644 --- a/awx/main/utils/ha.py +++ b/awx/main/utils/ha.py @@ -24,7 +24,7 @@ def _add_remove_celery_worker_queues(app, instance, worker_queues, worker_name): queue['alias'] in settings.AWX_CELERY_QUEUES_STATIC: continue - if queue['name'] not in ig_names | set([instance.hostname]): + if queue['name'] not in ig_names | set([instance.hostname]) or not instance.enabled: app.control.cancel_consumer(queue['name'], reply=True, destination=[worker_name]) removed_queues.append(queue['name']) @@ -43,7 +43,6 @@ def update_celery_worker_routes(instance, conf): 'awx.main.tasks.purge_old_stdout_files', ] routes_updated = {} - # Instance is, effectively, a controller node if instance.is_controller(): tasks.append('awx.main.tasks.awx_isolated_heartbeat') diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 5fb4edfe69..a1223c5ed1 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -639,9 +639,6 @@ AWX_PROOT_BASE_PATH = "/tmp" # Note: This setting may be overridden by database settings. AWX_ANSIBLE_CALLBACK_PLUGINS = "" -# Time at which an HA node is considered active -AWX_ACTIVE_NODE_TIME = 7200 - # Automatically remove nodes that have missed their heartbeats after some time AWX_AUTO_DEPROVISION_INSTANCES = False diff --git a/docs/clustering.md b/docs/clustering.md index df52cbadd8..9ab61bba56 100644 --- a/docs/clustering.md +++ b/docs/clustering.md @@ -28,6 +28,8 @@ It's important to point out a few existing things: * Existing old-style HA deployments will be transitioned automatically to the new HA system during the upgrade process to 3.1. * Manual projects will need to be synced to all instances by the customer +Ansible Tower 3.3 adds support for container-based clusters using Openshift or Kubernetes + ## Important Changes * There is no concept of primary/secondary in the new Tower system. *All* systems are primary. @@ -226,6 +228,47 @@ show up in api endpoints and stats monitoring. These groups can be removed with $ awx-manage unregister_queue --queuename= ``` +### Configuring Instances and Instance Groups from the API + +Instance Groups can be created by posting to `/api/v2/instance_groups` as a System Admin. + +Once created, `Instances` can be associated with an Instance Group with: + +``` +HTTP POST /api/v2/instance_groups/x/instances/ {'id': y}` +``` + +An `Instance` that is added to an `InstanceGroup` will automatically reconfigure itself to listen on the group's work queue. See the following +section `Instance Group Policies` for more details. + +### Instance Group Policies + +Tower `Instances` can be configured to automatically join `Instance Groups` when they come online by defining a policy. These policies are evaluated for +every new Instance that comes online. + +Instance Group Policies are controlled by 3 optional fields on an `Instance Group`: + +* `policy_instance_percentage`: This is a number between 0 - 100. It gaurantees that this percentage of active Tower instances will be added + to this `Instance Group`. As new instances come online, if the number of Instances in this group relative to the total number of instances + is less than the given percentage then new ones will be added until the percentage condition is satisfied. +* `policy_instance_minimum`: This policy attempts to keep at least this many `Instances` in the `Instance Group`. If the number of + available instances is lower than this minimum then all `Instances` will be placed in this `Instance Group`. +* `policy_instance_list`: This is a fixed list of `Instance` names. These `Instances` will *always* be added to this `Instance Group`. + Further, by adding Instances to this list you are declaring that you will manually manage those Instances and they will not be eligible under any other + policy. This means they will not be automatically added to any other `Instance Group` even if the policy would cause them to be matched. + +> NOTES + +* `Instances` that are assigned directly to `Instance Groups` by posting to `/api/v2/instance_groups/x/instances` or + `/api/v2/instances/x/instance_groups` are automatically added to the `policy_instance_list`. This means they are subject to the + normal caveats for `policy_instance_list` and must be manually managed. +* `policy_instance_percentage` and `policy_instance_minimum` work together. For example, if you have a `policy_instance_percentage` of + 50% and a `policy_instance_minimum` of 2 and you start 6 `Instances`. 3 of them would be assigned to the `Instance Group`. If you reduce the number + of `Instances` to 2 then both of them would be assigned to the `Instance Group` to satisfy `policy_instance_minimum`. In this way, you can set a lower + bound on the amount of available resources. +* Policies don't actively prevent `Instances` from being associated with multiple `Instance Groups` but this can effectively be achieved by making the percentages + sum to 100. If you have 4 `Instance Groups` assign each a percentage value of 25 and the `Instances` will be distributed among them with no overlap. + ### Status and Monitoring Tower itself reports as much status as it can via the api at `/api/v2/ping` in order to provide validation of the health