mirror of
https://github.com/ansible/awx.git
synced 2026-02-28 00:08:44 -03:30
correct capacity algorithm for task manager
This commit is contained in:
@@ -79,3 +79,33 @@ class InstanceManager(models.Manager):
|
|||||||
def my_role(self):
|
def my_role(self):
|
||||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||||
return "tower"
|
return "tower"
|
||||||
|
|
||||||
|
def capacity_mapping(self):
|
||||||
|
"""
|
||||||
|
Returns tuple of two dictionaries that shows mutual connections by name
|
||||||
|
for global accounting of capacity
|
||||||
|
|
||||||
|
instance_ig_mapping: {'instance_name': <set of group names instance is member of>}
|
||||||
|
ig_ig_mapping: {'group_name': <set of group names that share instances>}
|
||||||
|
"""
|
||||||
|
qs = self.all().prefetch_related('rampart_groups')
|
||||||
|
instance_ig_mapping = {}
|
||||||
|
ig_instance_mapping = {}
|
||||||
|
# Create simple dictionary of instance IG memberships
|
||||||
|
for instance in qs.all():
|
||||||
|
if instance.capacity == 0:
|
||||||
|
continue
|
||||||
|
instance_ig_mapping[instance.hostname] = set()
|
||||||
|
for group in instance.rampart_groups.all():
|
||||||
|
ig_instance_mapping.setdefault(group.name, set())
|
||||||
|
ig_instance_mapping[group.name].add(instance.hostname)
|
||||||
|
instance_ig_mapping[instance.hostname].add(group.name)
|
||||||
|
# Create IG mapping by union of all groups their instances are members of
|
||||||
|
ig_ig_mapping = {}
|
||||||
|
for group_name in ig_instance_mapping.keys():
|
||||||
|
ig_ig_set = set()
|
||||||
|
for instance_hostname in ig_instance_mapping[group_name]:
|
||||||
|
ig_ig_set |= instance_ig_mapping[instance_hostname]
|
||||||
|
ig_ig_mapping[group_name] = ig_ig_set
|
||||||
|
|
||||||
|
return instance_ig_mapping, ig_ig_mapping
|
||||||
|
|||||||
@@ -403,15 +403,21 @@ class TaskManager():
|
|||||||
preferred_instance_groups = task.preferred_instance_groups
|
preferred_instance_groups = task.preferred_instance_groups
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
for rampart_group in preferred_instance_groups:
|
for rampart_group in preferred_instance_groups:
|
||||||
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
|
||||||
logger.debug("Skipping group %s capacity <= 0", rampart_group.name)
|
if remaining_capacity <= 0:
|
||||||
|
logger.debug("Skipping group %s, remaining_capacity %s <= 0",
|
||||||
|
rampart_group.name, remaining_capacity)
|
||||||
continue
|
continue
|
||||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
if not self.would_exceed_capacity(task, rampart_group.name):
|
||||||
logger.debug("Starting %s in group %s", task.log_format, rampart_group.name)
|
logger.debug("Starting %s in group %s (remaining_capacity=%s)",
|
||||||
|
task.log_format, rampart_group.name, remaining_capacity)
|
||||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||||
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
|
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)",
|
||||||
|
task.log_format, rampart_group.name, remaining_capacity)
|
||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
||||||
|
|
||||||
@@ -489,11 +495,19 @@ class TaskManager():
|
|||||||
def calculate_capacity_used(self, tasks):
|
def calculate_capacity_used(self, tasks):
|
||||||
for rampart_group in self.graph:
|
for rampart_group in self.graph:
|
||||||
self.graph[rampart_group]['capacity_used'] = 0
|
self.graph[rampart_group]['capacity_used'] = 0
|
||||||
|
instance_ig_mapping, ig_ig_mapping = Instance.objects.capacity_mapping()
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
# TODO: dock capacity for isolated job management tasks running in queue
|
# TODO: dock capacity for isolated job management tasks running in queue
|
||||||
for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'):
|
if t.status == 'waiting':
|
||||||
if group_actual[0] in self.graph:
|
# Subtract capacity from any peer groups that share instances
|
||||||
self.graph[group_actual[0]]['capacity_used'] += t.task_impact
|
for instance_group_name in ig_ig_mapping[t.instance_group.name]:
|
||||||
|
self.graph[instance_group_name]['capacity_used'] += t.task_impact
|
||||||
|
elif t.status == 'running':
|
||||||
|
# Subtract capacity from all groups that contain the instance
|
||||||
|
for instance_group_name in instance_ig_mapping[t.execution_node]:
|
||||||
|
self.graph[instance_group_name]['capacity_used'] += t.task_impact
|
||||||
|
else:
|
||||||
|
logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format)
|
||||||
|
|
||||||
def would_exceed_capacity(self, task, instance_group):
|
def would_exceed_capacity(self, task, instance_group):
|
||||||
current_capacity = self.graph[instance_group]['capacity_used']
|
current_capacity = self.graph[instance_group]['capacity_used']
|
||||||
@@ -503,6 +517,9 @@ class TaskManager():
|
|||||||
return (task.task_impact + current_capacity > capacity_total)
|
return (task.task_impact + current_capacity > capacity_total)
|
||||||
|
|
||||||
def consume_capacity(self, task, instance_group):
|
def consume_capacity(self, task, instance_group):
|
||||||
|
logger.debug('%s consumed %s capacity units from %s with prior total of %s',
|
||||||
|
task.log_format, task.task_impact, instance_group,
|
||||||
|
self.graph[instance_group]['capacity_used'])
|
||||||
self.graph[instance_group]['capacity_used'] += task.task_impact
|
self.graph[instance_group]['capacity_used'] += task.task_impact
|
||||||
|
|
||||||
def get_remaining_capacity(self, instance_group):
|
def get_remaining_capacity(self, instance_group):
|
||||||
@@ -540,12 +557,13 @@ class TaskManager():
|
|||||||
return finished_wfjs
|
return finished_wfjs
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
logger.debug("Starting Schedule")
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
# Lock
|
# Lock
|
||||||
with advisory_lock('task_manager_lock', wait=False) as acquired:
|
with advisory_lock('task_manager_lock', wait=False) as acquired:
|
||||||
if acquired is False:
|
if acquired is False:
|
||||||
|
logger.debug("Not running scheduler, another task holds lock")
|
||||||
return
|
return
|
||||||
|
logger.debug("Starting Scheduler")
|
||||||
|
|
||||||
self.cleanup_inconsistent_celery_tasks()
|
self.cleanup_inconsistent_celery_tasks()
|
||||||
finished_wfjs = self._schedule()
|
finished_wfjs = self._schedule()
|
||||||
|
|||||||
@@ -4,15 +4,43 @@ from datetime import timedelta, datetime
|
|||||||
|
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.utils.timezone import now as tz_now
|
from django.utils.timezone import now as tz_now
|
||||||
|
from django.test import TransactionTestCase
|
||||||
|
|
||||||
from awx.main.scheduler import TaskManager
|
from awx.main.scheduler import TaskManager
|
||||||
from awx.main.models import (
|
from awx.main.models import (
|
||||||
Job,
|
Job,
|
||||||
Instance,
|
Instance,
|
||||||
|
InstanceGroup,
|
||||||
WorkflowJob,
|
WorkflowJob,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestCapacityMapping(TransactionTestCase):
|
||||||
|
|
||||||
|
def sample_cluster(self):
|
||||||
|
ig_small = InstanceGroup.objects.create(name='ig_small')
|
||||||
|
ig_large = InstanceGroup.objects.create(name='ig_large')
|
||||||
|
tower = InstanceGroup.objects.create(name='tower')
|
||||||
|
i1 = Instance.objects.create(hostname='i1', capacity=200)
|
||||||
|
i2 = Instance.objects.create(hostname='i2', capacity=200)
|
||||||
|
i3 = Instance.objects.create(hostname='i3', capacity=200)
|
||||||
|
ig_small.instances.add(i1)
|
||||||
|
ig_large.instances.add(i2, i3)
|
||||||
|
tower.instances.add(i2)
|
||||||
|
|
||||||
|
def test_something(self):
|
||||||
|
self.sample_cluster()
|
||||||
|
with self.assertNumQueries(2):
|
||||||
|
inst_map, ig_map = Instance.objects.capacity_mapping()
|
||||||
|
assert inst_map['i1'] == set(['ig_small'])
|
||||||
|
assert inst_map['i2'] == set(['ig_large', 'tower'])
|
||||||
|
assert ig_map['ig_small'] == set(['ig_small'])
|
||||||
|
assert ig_map['ig_large'] == set(['ig_large', 'tower'])
|
||||||
|
assert ig_map['tower'] == set(['ig_large', 'tower'])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
|
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
|
||||||
objects = job_template_factory('jt', organization='org1', project='proj',
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
||||||
|
|||||||
Reference in New Issue
Block a user