From 343966f7441fc75d0c17c99ccd75eb43219a8c74 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 7 Nov 2016 13:44:17 -0500 Subject: [PATCH] Implement gathering overall task capacity For use when running/planning jobs --- awx/api/views.py | 2 +- awx/main/managers.py | 4 ++++ .../migrations/0048_310_instance_capacity.py | 19 +++++++++++++++++++ awx/main/models/ha.py | 4 ++++ awx/main/scheduler/__init__.py | 4 ++-- awx/main/tasks.py | 3 ++- 6 files changed, 32 insertions(+), 4 deletions(-) create mode 100644 awx/main/migrations/0048_310_instance_capacity.py diff --git a/awx/api/views.py b/awx/api/views.py index 53df80bff1..fdf370e6fa 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -178,7 +178,7 @@ class ApiV1PingView(APIView): response['instances'] = [] for instance in Instance.objects.all(): - response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified)) + response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity)) response['instances'].sort() return Response(response) diff --git a/awx/main/managers.py b/awx/main/managers.py index c054584b0c..7dda9f0d1e 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -4,6 +4,7 @@ import sys from django.db import models +from django.db.models import Sum from django.conf import settings @@ -40,6 +41,9 @@ class InstanceManager(models.Manager): """Return count of active Tower nodes for licensing.""" return self.all().count() + def total_capacity(self): + return self.aggregate(total_capacity=Sum('capacity'))['total_capacity'] + def my_role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" diff --git a/awx/main/migrations/0048_310_instance_capacity.py b/awx/main/migrations/0048_310_instance_capacity.py new file mode 100644 index 0000000000..5f0795a4fd --- /dev/null +++ b/awx/main/migrations/0048_310_instance_capacity.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0047_v310_tower_state'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='capacity', + field=models.PositiveIntegerField(default=100, editable=False), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 691faf6305..3fff1d77cb 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -26,6 +26,10 @@ class Instance(models.Model): hostname = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) + capacity = models.PositiveIntegerField( + default=100, + editable=False, + ) class Meta: app_label = 'main' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index dd2c4de705..7c7e9e5ab3 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -37,7 +37,7 @@ logger = logging.getLogger('awx.main.scheduler') class TaskManager(): def __init__(self): self.graph = DependencyGraph() - self.capacity_total = 200 + self.capacity_total = Instance.objects.total_capacity() self.capacity_used = 0 def get_tasks(self): @@ -152,7 +152,7 @@ class TaskManager(): map(lambda at: active_tasks.add(at['id']), active_task_queues[queue]) else: if not hasattr(settings, 'CELERY_UNIT_TEST'): - return None + return (None, None) return (active_task_queues, active_tasks) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c4597b22e2..6817bda76b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -51,7 +51,7 @@ from awx.main.queue import CallbackQueueDispatcher from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, - OutputEventFilter) + get_system_task_capacity, OutputEventFilter) from awx.main.consumers import emit_channel_notification __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', @@ -131,6 +131,7 @@ def cluster_node_heartbeat(self): inst = Instance.objects.filter(hostname=settings.CLUSTER_HOST_ID) if inst.exists(): inst = inst[0] + inst.capacity = get_system_task_capacity() inst.save() return raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))