Add max concurrent jobs and max forks per ig

The intention of this feature is primarily to provide some notion of max
capacity of container groups, but the logic I've left generic. Default
is 0, which will be interpereted as no maximum number of jobs or forks.

Includes refactor of variable and method names for clarity.
instances_by_hostname is an internal attribute of TaskManagerInstances.
Clarify when we are expecting the actual TaskManagerInstances object.

Unify how we process running tasks and consume capacity. This has the
effect that we do less expensive work in after_lock_init and have 1 less
loop over all the running tasks. Previously we looped for both building
the dependency graph as well as for calculating the starting capacity of
all the instances and instance groups. Now we acheive both tasks in the
same loop.

Because of how this changes the somewhat subtle "do-si-do" of how to
initialize the Task Manager models, introduce a wrapper class that tries
to take some of that burden off of other areas where we re-use this like
in the serializer and the metrics. Also use this wrapper class to handle
nicities of how to track capacity consumption on instances and instance
groups.

Add tests for max_forks and max_concurrent_jobs

Fixup tests that use TaskManagerModels to accomodate changes.

assign ig before call to consume capacity

if we don't do it in that order, then we don't correctly account for
the container group jobs we are starting in the middle of the task
manager run
This commit is contained in:
Elijah DeLee
2022-10-23 23:20:41 -04:00
parent 65c3db8cb8
commit 86856f242a
11 changed files with 490 additions and 185 deletions

View File

@@ -43,8 +43,7 @@ from awx.main.utils.common import task_manager_bulk_reschedule, is_testing
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
from awx.main.scheduler.task_manager_models import TaskManagerModels
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field
@@ -71,7 +70,12 @@ class TaskBase:
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.start_time = time.time()
# We want to avoid calling settings in loops, so cache these settings at init time
self.start_task_limit = settings.START_TASK_LIMIT
self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT
self.control_task_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
for m in self.subsystem_metrics.METRICS:
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
@@ -79,7 +83,7 @@ class TaskBase:
def timed_out(self):
"""Return True/False if we have met or exceeded the timeout for the task manager."""
elapsed = time.time() - self.start_time
if elapsed >= settings.TASK_MANAGER_TIMEOUT:
if elapsed >= self.task_manager_timeout:
logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.")
return True
return False
@@ -471,9 +475,8 @@ class TaskManager(TaskBase):
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(self.all_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
self.tm_models = TaskManagerModels()
self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@@ -505,7 +508,15 @@ class TaskManager(TaskBase):
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
# Just like for process_running_tasks, add the job to the dependency graph and
# ask the TaskManagerInstanceGroups object to update consumed capacity on all
# implicated instances and container groups.
self.dependency_graph.add_job(task)
if instance_group is not None:
task.instance_group = instance_group
# We need the instance group assigned to correctly account for container group max_concurrent_jobs and max_forks
self.tm_models.consume_capacity(task)
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
@@ -513,12 +524,6 @@ class TaskManager(TaskBase):
ScheduleTaskManager().schedule()
from awx.main.tasks.system import handle_work_error, handle_work_success
# update capacity for control node and execution node
if task.controller_node:
self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
if task.execution_node:
self.instances[task.execution_node].consume_capacity(task.task_impact)
dependent_tasks = dependent_tasks or []
task_actual = {
@@ -546,7 +551,6 @@ class TaskManager(TaskBase):
ScheduleWorkflowManager().schedule()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
@@ -580,6 +584,7 @@ class TaskManager(TaskBase):
if type(task) is WorkflowJob:
ScheduleWorkflowManager().schedule()
self.dependency_graph.add_job(task)
self.tm_models.consume_capacity(task)
@timeit
def process_pending_tasks(self, pending_tasks):
@@ -611,11 +616,11 @@ class TaskManager(TaskBase):
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
control_impact = task.task_impact + self.control_task_impact
else:
control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
control_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, impact=control_impact, capacity_type='control'
control_impact = self.control_task_impact
control_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=self.controlplane_ig.name, impact=control_impact, capacity_type='control'
)
if not control_instance:
self.task_needs_capacity(task, tasks_to_update_job_explanation)
@@ -626,15 +631,19 @@ class TaskManager(TaskBase):
# All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups
if task.capacity_type == 'control':
if not self.tm_models.instance_groups[self.controlplane_ig.name].has_remaining_capacity(control_impact=True):
continue
task.execution_node = control_instance.hostname
execution_instance = self.instances[control_instance.hostname].obj
execution_instance = self.tm_models.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
continue
for instance_group in self.instance_groups.get_instance_groups_from_task_cache(task):
for instance_group in self.tm_models.instance_groups.get_instance_groups_from_task_cache(task):
if not self.tm_models.instance_groups[instance_group.name].has_remaining_capacity(task):
continue
if instance_group.is_container_group:
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
@@ -642,9 +651,9 @@ class TaskManager(TaskBase):
# at this point we know the instance group is NOT a container group
# because if it was, it would have started the task and broke out of the loop.
execution_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
execution_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=instance_group.name, add_hybrid_control_cost=True
) or self.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type)
) or self.tm_models.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type)
if execution_instance:
task.execution_node = execution_instance.hostname
@@ -660,7 +669,7 @@ class TaskManager(TaskBase):
task.log_format, instance_group.name, execution_instance.hostname, execution_instance.remaining_capacity
)
)
execution_instance = self.instances[execution_instance.hostname].obj
execution_instance = self.tm_models.instances[execution_instance.hostname].obj
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
break

View File

@@ -15,15 +15,18 @@ logger = logging.getLogger('awx.main.scheduler')
class TaskManagerInstance:
"""A class representing minimal data the task manager needs to represent an Instance."""
def __init__(self, obj):
def __init__(self, obj, **kwargs):
self.obj = obj
self.node_type = obj.node_type
self.consumed_capacity = 0
self.capacity = obj.capacity
self.hostname = obj.hostname
self.jobs_running = 0
def consume_capacity(self, impact):
def consume_capacity(self, impact, job_impact=False):
self.consumed_capacity += impact
if job_impact:
self.jobs_running += 1
@property
def remaining_capacity(self):
@@ -33,9 +36,82 @@ class TaskManagerInstance:
return remaining
class TaskManagerInstanceGroup:
"""A class representing minimal data the task manager needs to represent an InstanceGroup."""
def __init__(self, obj, task_manager_instances=None, **kwargs):
self.name = obj.name
self.is_container_group = obj.is_container_group
self.container_group_jobs = 0
self.container_group_consumed_forks = 0
_instances = obj.instances.all()
# We want the list of TaskManagerInstance objects because these are shared across the TaskManagerInstanceGroup objects.
# This way when we consume capacity on an instance that is in multiple groups, we tabulate across all the groups correctly.
self.instances = [task_manager_instances[instance.hostname] for instance in _instances if instance.hostname in task_manager_instances]
self.instance_hostnames = tuple([instance.hostname for instance in _instances if instance.hostname in task_manager_instances])
self.max_concurrent_jobs = obj.max_concurrent_jobs
self.max_forks = obj.max_forks
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
def consume_capacity(self, task):
"""We only consume capacity on an instance group level if it is a container group. Otherwise we consume capacity on an instance level."""
if self.is_container_group:
self.container_group_jobs += 1
self.container_group_consumed_forks += task.task_impact
else:
raise RuntimeError("We only track capacity for container groups at the instance group level. Otherwise, consume capacity on instances.")
def get_remaining_instance_capacity(self):
return sum(inst.remaining_capacity for inst in self.instances)
def get_consumed_instance_capacity(self):
return sum(inst.consumed_capacity for inst in self.instances)
def get_instance_jobs_running(self):
return sum(inst.jobs_running for inst in self.instances)
def has_remaining_capacity(self, task=None, control_impact=False):
"""Pass either a task or control_impact=True to determine if the IG has capacity to run the control task or job task."""
task_impact = self.control_task_impact if control_impact else task.task_impact
job_impact = 0 if control_impact else 1
# We only want to loop over instances if self.max_concurrent_jobs is set
if self.max_concurrent_jobs == 0:
# Override the calculated remaining capacity, because when max_concurrent_jobs == 0 we don't enforce any max
remaining_jobs = 0
else:
instance_jobs_running = self.get_instance_jobs_running()
remaining_jobs = self.max_concurrent_jobs - instance_jobs_running - self.container_group_jobs - job_impact
# We only want to loop over instances if self.max_forks is set
if self.max_forks == 0:
# Override the calculated remaining capacity, because when max_forks == 0 we don't enforce any max
remaining_forks = 0
else:
instance_consumed_forks = self.get_consumed_instance_capacity()
remaining_forks = self.max_forks - instance_consumed_forks - self.container_group_consumed_forks - task_impact
if remaining_jobs < 0 or remaining_forks < 0:
# A value less than zero means the task will not fit on the group
task_string = f"task {task.log_format}" if task else "control task"
if remaining_jobs < 0:
logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_jobs} remaining jobs")
if remaining_forks < 0:
impact_string = f"with impact {task_impact}"
logger.debug(f"{task_string} {impact_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks")
return False
# Returning true means there is enough remaining capacity on the group to run the task (or no instance group level limits are being set)
return True
class TaskManagerInstances:
def __init__(self, active_tasks, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled')):
def __init__(self, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled'), **kwargs):
self.instances_by_hostname = dict()
self.instance_groups_container_group_jobs = dict()
self.instance_groups_container_group_consumed_forks = dict()
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
if instances is None:
instances = (
Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True)
@@ -43,18 +119,15 @@ class TaskManagerInstances:
.only('node_type', 'node_state', 'capacity', 'hostname', 'enabled')
)
for instance in instances:
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance, **kwargs)
# initialize remaining capacity based on currently waiting and running tasks
for task in active_tasks:
if task.status not in ['waiting', 'running']:
continue
control_instance = self.instances_by_hostname.get(task.controller_node, '')
execution_instance = self.instances_by_hostname.get(task.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact)
if control_instance and control_instance.node_type in ('hybrid', 'control'):
self.instances_by_hostname[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
def consume_capacity(self, task):
control_instance = self.instances_by_hostname.get(task.controller_node, '')
execution_instance = self.instances_by_hostname.get(task.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact, job_impact=True)
if control_instance and control_instance.node_type in ('hybrid', 'control'):
self.instances_by_hostname[task.controller_node].consume_capacity(self.control_task_impact)
def __getitem__(self, hostname):
return self.instances_by_hostname.get(hostname)
@@ -64,42 +137,48 @@ class TaskManagerInstances:
class TaskManagerInstanceGroups:
"""A class representing minimal data the task manager needs to represent an InstanceGroup."""
"""A class representing minimal data the task manager needs to represent all the InstanceGroups."""
def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None):
def __init__(self, task_manager_instances=None, instance_groups=None, instance_groups_queryset=None, **kwargs):
self.instance_groups = dict()
self.task_manager_instances = task_manager_instances if task_manager_instances is not None else TaskManagerInstances()
self.controlplane_ig = None
self.pk_ig_map = dict()
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
self.controlplane_ig_name = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)
if instance_groups is not None: # for testing
self.instance_groups = instance_groups
self.instance_groups = {ig.name: TaskManagerInstanceGroup(ig, self.task_manager_instances, **kwargs) for ig in instance_groups}
self.pk_ig_map = {ig.pk: ig for ig in instance_groups}
else:
if instance_groups_queryset is None:
instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only('name', 'instances')
for instance_group in instance_groups_queryset:
if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = instance_group
self.instance_groups[instance_group.name] = dict(
instances=[
instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname
],
instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only(
'name', 'instances', 'max_concurrent_jobs', 'max_forks', 'is_container_group'
)
for instance_group in instance_groups_queryset:
if instance_group.name == self.controlplane_ig_name:
self.controlplane_ig = instance_group
self.instance_groups[instance_group.name] = TaskManagerInstanceGroup(instance_group, self.task_manager_instances, **kwargs)
self.pk_ig_map[instance_group.pk] = instance_group
def __getitem__(self, ig_name):
return self.instance_groups.get(ig_name)
def __contains__(self, ig_name):
return ig_name in self.instance_groups
def get_remaining_capacity(self, group_name):
instances = self.instance_groups[group_name]['instances']
return sum(inst.remaining_capacity for inst in instances)
return self.instance_groups[group_name].get_remaining_instance_capacity()
def get_consumed_capacity(self, group_name):
instances = self.instance_groups[group_name]['instances']
return sum(inst.consumed_capacity for inst in instances)
return self.instance_groups[group_name].get_consumed_instance_capacity()
def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False):
impact = impact if impact else task.task_impact
capacity_type = capacity_type if capacity_type else task.capacity_type
instance_most_capacity = None
most_remaining_capacity = -1
instances = self.instance_groups[instance_group_name]['instances']
instances = self.instance_groups[instance_group_name].instances
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
@@ -107,7 +186,7 @@ class TaskManagerInstanceGroups:
would_be_remaining = i.remaining_capacity - impact
# hybrid nodes _always_ control their own tasks
if add_hybrid_control_cost and i.node_type == 'hybrid':
would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT
would_be_remaining -= self.control_task_impact
if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity):
instance_most_capacity = i
most_remaining_capacity = would_be_remaining
@@ -115,10 +194,13 @@ class TaskManagerInstanceGroups:
def find_largest_idle_instance(self, instance_group_name, capacity_type='execution'):
largest_instance = None
instances = self.instance_groups[instance_group_name]['instances']
instances = self.instance_groups[instance_group_name].instances
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
if i.capacity <= 0:
# We don't want to select an idle instance with 0 capacity
continue
if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity:
if largest_instance is None:
largest_instance = i
@@ -139,3 +221,41 @@ class TaskManagerInstanceGroups:
logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}")
return task.global_instance_groups
return igs
class TaskManagerModels:
def __init__(self, **kwargs):
# We want to avoid calls to settings over and over in loops, so cache this information here
kwargs['control_task_impact'] = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
kwargs['controlplane_ig_name'] = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)
self.instances = TaskManagerInstances(**kwargs)
self.instance_groups = TaskManagerInstanceGroups(task_manager_instances=self.instances, **kwargs)
@classmethod
def init_with_consumed_capacity(cls, **kwargs):
tmm = cls(**kwargs)
tasks = kwargs.get('tasks', None)
if tasks is None:
# No tasks were provided, so we will fetch them from the database
task_status_filter_list = kwargs.get('task_status_filter_list', ['running', 'waiting'])
task_fields = kwargs.get('task_fields', ('task_impact', 'controller_node', 'execution_node', 'instance_group'))
from awx.main.models import UnifiedJob
tasks = UnifiedJob.objects.filter(status__in=task_status_filter_list).only(*task_fields)
for task in tasks:
tmm.consume_capacity(task)
return tmm
def consume_capacity(self, task):
# Consume capacity on instances, which bubbles up to instance groups they are a member of
self.instances.consume_capacity(task)
# For container group jobs, additionally we must account for capacity consumed since
# The container groups have no instances to look at to track how many jobs/forks are consumed
if task.instance_group_id:
ig = self.instance_groups.pk_ig_map[task.instance_group_id]
if ig.is_container_group:
self.instance_groups[ig.name].consume_capacity(task)