move static methods used by task manager (#12050)

* move static methods used by task manager

These static methods were being used to act on Instance-like objects
that were SimpleNamespace objects with the necessary attributes.

This change introduces dedicated classes to replace the SimpleNamespace
objects and moves the formerlly staticmethods to a place where they are
more relevant instead of tacked onto models to which they were only
loosly related.

Accept in-memory data structure in init methods for tests

* initialize remaining capacity AFTER we built map of instances
This commit is contained in:
Elijah DeLee 2022-04-21 13:05:06 -04:00 committed by GitHub
parent 4b45148614
commit 689a216726
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 133 additions and 111 deletions

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
from decimal import Decimal
import random
import logging
import os
@ -161,25 +160,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def remaining_capacity(self):
return self.capacity - self.consumed_capacity
@staticmethod
def update_remaining_capacity(instances, jobs):
"""Takes mapping of hostname to SimpleNamespace instance like objects and a list of jobs.
Computes remaining capacity for all the instances based on currently running and waiting jobs.
No return value, updates the "remaining_capacity" field on the SimpleNamespace instance like object in place.
For use in the task manager to avoid refetching jobs from the database.
"""
for job in jobs:
if job.status not in ['waiting', 'running']:
continue
control_instance = instances.get(job.controller_node, '')
execution_instance = instances.get(job.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
instances[job.execution_node].remaining_capacity -= job.task_impact
if control_instance and control_instance.node_type in ('hybrid', 'control'):
instances[job.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT
@property
def jobs_running(self):
return UnifiedJob.objects.filter(
@ -194,12 +174,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def jobs_total(self):
return UnifiedJob.objects.filter(execution_node=self.hostname).count()
@staticmethod
def choose_online_control_plane_node():
return random.choice(
Instance.objects.filter(enabled=True, capacity__gt=0).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True)
)
def get_cleanup_task_kwargs(self, **kwargs):
"""
Produce options to use for the command: ansible-runner worker cleanup
@ -385,37 +359,6 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
class Meta:
app_label = 'main'
@staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances, impact=None, capacity_type=None, add_hybrid_control_cost=False):
impact = impact if impact else task.task_impact
capacity_type = capacity_type if capacity_type else task.capacity_type
instance_most_capacity = None
most_remaining_capacity = -1
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
would_be_remaining = i.remaining_capacity - impact
# hybrid nodes _always_ control their own tasks
if add_hybrid_control_cost and i.node_type == 'hybrid':
would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT
if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity):
instance_most_capacity = i
most_remaining_capacity = would_be_remaining
return instance_most_capacity
@staticmethod
def find_largest_idle_instance(instances, capacity_type='execution'):
largest_instance = None
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity:
if largest_instance is None:
largest_instance = i
elif i.capacity > largest_instance.capacity:
largest_instance = i
return largest_instance
def set_default_policy_fields(self):
self.policy_instance_list = []
self.policy_instance_minimum = 0

View File

@ -6,7 +6,6 @@ from datetime import timedelta
import logging
import uuid
import json
from types import SimpleNamespace
# Django
from django.db import transaction, connection
@ -19,7 +18,6 @@ from awx.main.dispatch.reaper import reap_job
from awx.main.models import (
AdHocCommand,
Instance,
InstanceGroup,
InventorySource,
InventoryUpdate,
Job,
@ -37,6 +35,8 @@ from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, sch
from awx.main.utils.common import create_partition
from awx.main.signals import disable_activity_stream
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
from awx.main.utils import decrypt_field
@ -54,49 +54,22 @@ class TaskManager:
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
"""
self.graph = dict()
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
def after_lock_init(self, all_sorted_tasks):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances}
self.controlplane_ig = None
self.dependency_graph = DependencyGraph()
instances_partial = [
SimpleNamespace(
obj=instance,
node_type=instance.node_type,
remaining_capacity=instance.capacity, # Updated with Instance.update_remaining_capacity by looking at all active tasks
capacity=instance.capacity,
hostname=instance.hostname,
)
for instance in instances
]
instances_by_hostname = {i.hostname: i for i in instances_partial}
# updates remaining capacity value based on currently running and waiting tasks
Instance.update_remaining_capacity(instances_by_hostname, all_sorted_tasks)
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = rampart_group
self.graph[rampart_group.name] = dict(
instances=[
instances_by_hostname[instance.hostname] for instance in rampart_group.instances.all() if instance.hostname in instances_by_hostname
],
)
self.instances = TaskManagerInstances(all_sorted_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@ -244,7 +217,7 @@ class TaskManager:
schedule_task_manager()
return result
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
@ -277,10 +250,10 @@ class TaskManager:
schedule_task_manager()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = rampart_group
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {rampart_group.name}{execution_node_msg}.'
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
@ -478,8 +451,8 @@ class TaskManager:
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
else:
control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(
task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control'
control_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, impact=control_impact, capacity_type='control'
)
if not control_instance:
self.task_needs_capacity(task, tasks_to_update_job_explanation)
@ -493,29 +466,29 @@ class TaskManager:
task.execution_node = control_instance.hostname
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact)
self.dependency_graph.add_job(task)
execution_instance = self.real_instances[control_instance.hostname]
execution_instance = self.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
continue
for rampart_group in preferred_instance_groups:
if rampart_group.is_container_group:
for instance_group in preferred_instance_groups:
if instance_group.is_container_group:
self.dependency_graph.add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break
# TODO: remove this after we have confidence that OCP control nodes are reporting node_type=control
if settings.IS_K8S and task.capacity_type == 'execution':
logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name))
logger.debug("Skipping group {}, task cannot run on control plane".format(instance_group.name))
continue
# at this point we know the instance group is NOT a container group
# because if it was, it would have started the task and broke out of the loop.
execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(
task, self.graph[rampart_group.name]['instances'], add_hybrid_control_cost=True
) or InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'], capacity_type=task.capacity_type)
execution_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=instance_group.name, add_hybrid_control_cost=True
) or self.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type)
if execution_instance:
task.execution_node = execution_instance.hostname
@ -530,18 +503,18 @@ class TaskManager:
task.log_lifecycle("execution_node_chosen")
logger.debug(
"Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, execution_instance.remaining_capacity
task.log_format, instance_group.name, execution_instance.hostname, execution_instance.remaining_capacity
)
)
execution_instance = self.real_instances[execution_instance.hostname]
execution_instance = self.instances[execution_instance.hostname].obj
self.dependency_graph.add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
break
else:
logger.debug(
"No instance available in group {} to run job {} w/ capacity requirement {}".format(
rampart_group.name, task.log_format, task.task_impact
instance_group.name, task.log_format, task.task_impact
)
)
if not found_acceptable_queue:

View File

@ -0,0 +1,103 @@
# Copyright (c) 2022 Ansible by Red Hat
# All Rights Reserved.
import logging
from django.conf import settings
from awx.main.models import (
Instance,
InstanceGroup,
)
logger = logging.getLogger('awx.main.scheduler')
class TaskManagerInstance:
"""A class representing minimal data the task manager needs to represent an Instance."""
def __init__(self, obj):
self.obj = obj
self.node_type = obj.node_type
self.remaining_capacity = obj.capacity
self.capacity = obj.capacity
self.hostname = obj.hostname
class TaskManagerInstances:
def __init__(self, active_tasks, instances=None):
self.instances_by_hostname = dict()
if instances is None:
instances = (
Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled')
)
for instance in instances:
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
# initialize remaining capacity based on currently waiting and running tasks
for task in active_tasks:
if task.status not in ['waiting', 'running']:
continue
control_instance = self.instances_by_hostname.get(task.controller_node, '')
execution_instance = self.instances_by_hostname.get(task.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
self.instances_by_hostname[task.execution_node].remaining_capacity -= task.task_impact
if control_instance and control_instance.node_type in ('hybrid', 'control'):
self.instances_by_hostname[task.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT
def __getitem__(self, hostname):
return self.instances_by_hostname.get(hostname)
def __contains__(self, hostname):
return hostname in self.instances_by_hostname
class TaskManagerInstanceGroups:
"""A class representing minimal data the task manager needs to represent an InstanceGroup."""
def __init__(self, instances_by_hostname=None, instance_groups=None):
self.instance_groups = dict()
self.controlplane_ig = None
if instance_groups is not None: # for testing
self.instance_groups = instance_groups
else:
for instance_group in InstanceGroup.objects.prefetch_related('instances').only('name', 'instances'):
if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = instance_group
self.instance_groups[instance_group.name] = dict(
instances=[
instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname
],
)
def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False):
impact = impact if impact else task.task_impact
capacity_type = capacity_type if capacity_type else task.capacity_type
instance_most_capacity = None
most_remaining_capacity = -1
instances = self.instance_groups[instance_group_name]['instances']
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
would_be_remaining = i.remaining_capacity - impact
# hybrid nodes _always_ control their own tasks
if add_hybrid_control_cost and i.node_type == 'hybrid':
would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT
if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity):
instance_most_capacity = i
most_remaining_capacity = would_be_remaining
return instance_most_capacity
def find_largest_idle_instance(self, instance_group_name, capacity_type='execution'):
largest_instance = None
instances = self.instance_groups[instance_group_name]['instances']
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity:
if largest_instance is None:
largest_instance = i
elif i.capacity > largest_instance.capacity:
largest_instance = i
return largest_instance

View File

@ -4,6 +4,7 @@ from unittest.mock import Mock
from decimal import Decimal
from awx.main.models import InstanceGroup, Instance
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
@pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3])
@ -59,9 +60,10 @@ class TestInstanceGroup(object):
],
)
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
ig = InstanceGroup(id=10)
InstanceGroup(id=10)
tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances}})
instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances)
instance_picked = tm_igs.fit_task_to_most_remaining_capacity_instance(task, 'controlplane')
if instance_fit_index is None:
assert instance_picked is None, reason
@ -82,13 +84,14 @@ class TestInstanceGroup(object):
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)
ig = InstanceGroup(id=10)
InstanceGroup(id=10)
instances_online_only = filter_offline_instances(instances)
tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances_online_only}})
if instance_fit_index is None:
assert ig.find_largest_idle_instance(instances_online_only) is None, reason
assert tm_igs.find_largest_idle_instance('controlplane') is None, reason
else:
assert ig.find_largest_idle_instance(instances_online_only) == instances[instance_fit_index], reason
assert tm_igs.find_largest_idle_instance('controlplane') == instances[instance_fit_index], reason
def test_cleanup_params_defaults():