diff --git a/awx/api/serializers.py b/awx/api/serializers.py index c4436424f5..4249715482 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -113,7 +113,7 @@ from awx.main.utils import ( ) from awx.main.utils.filters import SmartFilter from awx.main.utils.named_url_graph import reset_counters -from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances +from awx.main.scheduler.task_manager_models import TaskManagerModels from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.validators import vars_validate_or_raise @@ -5071,6 +5071,22 @@ class InstanceGroupSerializer(BaseSerializer): label=_('Policy Instance Minimum'), help_text=_("Static minimum number of Instances that will be automatically assign to " "this group when new instances come online."), ) + max_concurrent_jobs = serializers.IntegerField( + default=0, + min_value=0, + required=False, + initial=0, + label=_('Max Concurrent Jobs'), + help_text=_("Maximum number of concurrent jobs to run on a group. When set to zero, no maximum is enforced."), + ) + max_forks = serializers.IntegerField( + default=0, + min_value=0, + required=False, + initial=0, + label=_('Max Forks'), + help_text=_("Maximum number of forks to execute concurrently on a group. When set to zero, no maximum is enforced."), + ) policy_instance_list = serializers.ListField( child=serializers.CharField(), required=False, @@ -5092,6 +5108,8 @@ class InstanceGroupSerializer(BaseSerializer): "consumed_capacity", "percent_capacity_remaining", "jobs_running", + "max_concurrent_jobs", + "max_forks", "jobs_total", "instances", "is_container_group", @@ -5173,14 +5191,15 @@ class InstanceGroupSerializer(BaseSerializer): # Store capacity values (globally computed) in the context if 'task_manager_igs' not in self.context: instance_groups_queryset = None - jobs_qs = UnifiedJob.objects.filter(status__in=('running', 'waiting')) if self.parent: # Is ListView: instance_groups_queryset = self.parent.instance - instances = TaskManagerInstances(jobs_qs) - instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances, instance_groups_queryset=instance_groups_queryset) + tm_models = TaskManagerModels.init_with_consumed_capacity( + instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'], + instance_groups_queryset=instance_groups_queryset, + ) - self.context['task_manager_igs'] = instance_groups + self.context['task_manager_igs'] = tm_models.instance_groups return self.context['task_manager_igs'] def get_consumed_capacity(self, obj): diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index e70318c194..7e6f57a900 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -16,7 +16,7 @@ from awx.conf.license import get_license from awx.main.utils import get_awx_version, camelcase_to_underscore, datetime_hook from awx.main import models from awx.main.analytics import register -from awx.main.scheduler.task_manager_models import TaskManagerInstances +from awx.main.scheduler.task_manager_models import TaskManagerModels """ This module is used to define metrics collected by awx.main.analytics.gather() @@ -237,11 +237,10 @@ def projects_by_scm_type(since, **kwargs): def instance_info(since, include_hostnames=False, **kwargs): info = {} # Use same method that the TaskManager does to compute consumed capacity without querying all running jobs for each Instance - active_tasks = models.UnifiedJob.objects.filter(status__in=['running', 'waiting']).only('task_impact', 'controller_node', 'execution_node') - tm_instances = TaskManagerInstances( - active_tasks, instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled', 'node_type'] - ) - for tm_instance in tm_instances.instances_by_hostname.values(): + tm_models = TaskManagerModels.init_with_consumed_capacity( + instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'] + ) + for tm_instance in tm_models.instances.instances_by_hostname.values(): instance = tm_instance.obj instance_info = { 'uuid': instance.uuid, diff --git a/awx/main/migrations/0173_instancegroup_max_limits.py b/awx/main/migrations/0173_instancegroup_max_limits.py new file mode 100644 index 0000000000..57ba95992e --- /dev/null +++ b/awx/main/migrations/0173_instancegroup_max_limits.py @@ -0,0 +1,23 @@ +# Generated by Django 3.2.13 on 2022-10-24 18:22 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0172_prevent_instance_fallback'), + ] + + operations = [ + migrations.AddField( + model_name='instancegroup', + name='max_concurrent_jobs', + field=models.IntegerField(default=0, help_text='Maximum number of concurrent jobs to run on this group. Zero means no limit.'), + ), + migrations.AddField( + model_name='instancegroup', + name='max_forks', + field=models.IntegerField(default=0, help_text='Max forks to execute on this group. Zero means no limit.'), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index f101a94d7a..bf43872131 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -379,6 +379,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): default='', ) ) + max_concurrent_jobs = models.IntegerField(default=0, help_text=_("Maximum number of concurrent jobs to run on this group. Zero means no limit.")) + max_forks = models.IntegerField(default=0, help_text=_("Max forks to execute on this group. Zero means no limit.")) policy_instance_percentage = models.IntegerField(default=0, help_text=_("Percentage of Instances to automatically assign to this group")) policy_instance_minimum = models.IntegerField(default=0, help_text=_("Static minimum number of Instances to automatically assign to this group")) policy_instance_list = JSONBlob( diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index d4b6ffbc35..3610ecb89a 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -43,8 +43,7 @@ from awx.main.utils.common import task_manager_bulk_reschedule, is_testing from awx.main.signals import disable_activity_stream from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph -from awx.main.scheduler.task_manager_models import TaskManagerInstances -from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups +from awx.main.scheduler.task_manager_models import TaskManagerModels import awx.main.analytics.subsystem_metrics as s_metrics from awx.main.utils import decrypt_field @@ -71,7 +70,12 @@ class TaskBase: # is called later. self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) self.start_time = time.time() + + # We want to avoid calling settings in loops, so cache these settings at init time self.start_task_limit = settings.START_TASK_LIMIT + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + self.control_task_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT + for m in self.subsystem_metrics.METRICS: if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) @@ -79,7 +83,7 @@ class TaskBase: def timed_out(self): """Return True/False if we have met or exceeded the timeout for the task manager.""" elapsed = time.time() - self.start_time - if elapsed >= settings.TASK_MANAGER_TIMEOUT: + if elapsed >= self.task_manager_timeout: logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.") return True return False @@ -471,9 +475,8 @@ class TaskManager(TaskBase): Init AFTER we know this instance of the task manager will run because the lock is acquired. """ self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(self.all_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig + self.tm_models = TaskManagerModels() + self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig def job_blocked_by(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -505,7 +508,15 @@ class TaskManager(TaskBase): @timeit def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + # Just like for process_running_tasks, add the job to the dependency graph and + # ask the TaskManagerInstanceGroups object to update consumed capacity on all + # implicated instances and container groups. self.dependency_graph.add_job(task) + if instance_group is not None: + task.instance_group = instance_group + # We need the instance group assigned to correctly account for container group max_concurrent_jobs and max_forks + self.tm_models.consume_capacity(task) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) self.start_task_limit -= 1 if self.start_task_limit == 0: @@ -513,12 +524,6 @@ class TaskManager(TaskBase): ScheduleTaskManager().schedule() from awx.main.tasks.system import handle_work_error, handle_work_success - # update capacity for control node and execution node - if task.controller_node: - self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) - if task.execution_node: - self.instances[task.execution_node].consume_capacity(task.task_impact) - dependent_tasks = dependent_tasks or [] task_actual = { @@ -546,7 +551,6 @@ class TaskManager(TaskBase): ScheduleWorkflowManager().schedule() # at this point we already have control/execution nodes selected for the following cases else: - task.instance_group = instance_group execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' logger.debug( f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' @@ -580,6 +584,7 @@ class TaskManager(TaskBase): if type(task) is WorkflowJob: ScheduleWorkflowManager().schedule() self.dependency_graph.add_job(task) + self.tm_models.consume_capacity(task) @timeit def process_pending_tasks(self, pending_tasks): @@ -611,11 +616,11 @@ class TaskManager(TaskBase): # Determine if there is control capacity for the task if task.capacity_type == 'control': - control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT + control_impact = task.task_impact + self.control_task_impact else: - control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT - control_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance( - task, instance_group_name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, impact=control_impact, capacity_type='control' + control_impact = self.control_task_impact + control_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance( + task, instance_group_name=self.controlplane_ig.name, impact=control_impact, capacity_type='control' ) if not control_instance: self.task_needs_capacity(task, tasks_to_update_job_explanation) @@ -626,15 +631,19 @@ class TaskManager(TaskBase): # All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups if task.capacity_type == 'control': + if not self.tm_models.instance_groups[self.controlplane_ig.name].has_remaining_capacity(control_impact=True): + continue task.execution_node = control_instance.hostname - execution_instance = self.instances[control_instance.hostname].obj + execution_instance = self.tm_models.instances[control_instance.hostname].obj task.log_lifecycle("controller_node_chosen") task.log_lifecycle("execution_node_chosen") self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True continue - for instance_group in self.instance_groups.get_instance_groups_from_task_cache(task): + for instance_group in self.tm_models.instance_groups.get_instance_groups_from_task_cache(task): + if not self.tm_models.instance_groups[instance_group.name].has_remaining_capacity(task): + continue if instance_group.is_container_group: self.start_task(task, instance_group, task.get_jobs_fail_chain(), None) found_acceptable_queue = True @@ -642,9 +651,9 @@ class TaskManager(TaskBase): # at this point we know the instance group is NOT a container group # because if it was, it would have started the task and broke out of the loop. - execution_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance( + execution_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance( task, instance_group_name=instance_group.name, add_hybrid_control_cost=True - ) or self.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type) + ) or self.tm_models.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type) if execution_instance: task.execution_node = execution_instance.hostname @@ -660,7 +669,7 @@ class TaskManager(TaskBase): task.log_format, instance_group.name, execution_instance.hostname, execution_instance.remaining_capacity ) ) - execution_instance = self.instances[execution_instance.hostname].obj + execution_instance = self.tm_models.instances[execution_instance.hostname].obj self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True break diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index b9187c0e9c..2580bceb82 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -15,15 +15,18 @@ logger = logging.getLogger('awx.main.scheduler') class TaskManagerInstance: """A class representing minimal data the task manager needs to represent an Instance.""" - def __init__(self, obj): + def __init__(self, obj, **kwargs): self.obj = obj self.node_type = obj.node_type self.consumed_capacity = 0 self.capacity = obj.capacity self.hostname = obj.hostname + self.jobs_running = 0 - def consume_capacity(self, impact): + def consume_capacity(self, impact, job_impact=False): self.consumed_capacity += impact + if job_impact: + self.jobs_running += 1 @property def remaining_capacity(self): @@ -33,9 +36,82 @@ class TaskManagerInstance: return remaining +class TaskManagerInstanceGroup: + """A class representing minimal data the task manager needs to represent an InstanceGroup.""" + + def __init__(self, obj, task_manager_instances=None, **kwargs): + self.name = obj.name + self.is_container_group = obj.is_container_group + self.container_group_jobs = 0 + self.container_group_consumed_forks = 0 + _instances = obj.instances.all() + # We want the list of TaskManagerInstance objects because these are shared across the TaskManagerInstanceGroup objects. + # This way when we consume capacity on an instance that is in multiple groups, we tabulate across all the groups correctly. + self.instances = [task_manager_instances[instance.hostname] for instance in _instances if instance.hostname in task_manager_instances] + self.instance_hostnames = tuple([instance.hostname for instance in _instances if instance.hostname in task_manager_instances]) + self.max_concurrent_jobs = obj.max_concurrent_jobs + self.max_forks = obj.max_forks + self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT) + + def consume_capacity(self, task): + """We only consume capacity on an instance group level if it is a container group. Otherwise we consume capacity on an instance level.""" + if self.is_container_group: + self.container_group_jobs += 1 + self.container_group_consumed_forks += task.task_impact + else: + raise RuntimeError("We only track capacity for container groups at the instance group level. Otherwise, consume capacity on instances.") + + def get_remaining_instance_capacity(self): + return sum(inst.remaining_capacity for inst in self.instances) + + def get_consumed_instance_capacity(self): + return sum(inst.consumed_capacity for inst in self.instances) + + def get_instance_jobs_running(self): + return sum(inst.jobs_running for inst in self.instances) + + def has_remaining_capacity(self, task=None, control_impact=False): + """Pass either a task or control_impact=True to determine if the IG has capacity to run the control task or job task.""" + task_impact = self.control_task_impact if control_impact else task.task_impact + job_impact = 0 if control_impact else 1 + + # We only want to loop over instances if self.max_concurrent_jobs is set + if self.max_concurrent_jobs == 0: + # Override the calculated remaining capacity, because when max_concurrent_jobs == 0 we don't enforce any max + remaining_jobs = 0 + else: + instance_jobs_running = self.get_instance_jobs_running() + remaining_jobs = self.max_concurrent_jobs - instance_jobs_running - self.container_group_jobs - job_impact + + # We only want to loop over instances if self.max_forks is set + if self.max_forks == 0: + # Override the calculated remaining capacity, because when max_forks == 0 we don't enforce any max + remaining_forks = 0 + else: + instance_consumed_forks = self.get_consumed_instance_capacity() + remaining_forks = self.max_forks - instance_consumed_forks - self.container_group_consumed_forks - task_impact + + if remaining_jobs < 0 or remaining_forks < 0: + # A value less than zero means the task will not fit on the group + task_string = f"task {task.log_format}" if task else "control task" + if remaining_jobs < 0: + logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_jobs} remaining jobs") + if remaining_forks < 0: + impact_string = f"with impact {task_impact}" + logger.debug(f"{task_string} {impact_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks") + return False + + # Returning true means there is enough remaining capacity on the group to run the task (or no instance group level limits are being set) + return True + + class TaskManagerInstances: - def __init__(self, active_tasks, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled')): + def __init__(self, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled'), **kwargs): self.instances_by_hostname = dict() + self.instance_groups_container_group_jobs = dict() + self.instance_groups_container_group_consumed_forks = dict() + self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT) + if instances is None: instances = ( Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True) @@ -43,18 +119,15 @@ class TaskManagerInstances: .only('node_type', 'node_state', 'capacity', 'hostname', 'enabled') ) for instance in instances: - self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance) + self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance, **kwargs) - # initialize remaining capacity based on currently waiting and running tasks - for task in active_tasks: - if task.status not in ['waiting', 'running']: - continue - control_instance = self.instances_by_hostname.get(task.controller_node, '') - execution_instance = self.instances_by_hostname.get(task.execution_node, '') - if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): - self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact) - if control_instance and control_instance.node_type in ('hybrid', 'control'): - self.instances_by_hostname[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) + def consume_capacity(self, task): + control_instance = self.instances_by_hostname.get(task.controller_node, '') + execution_instance = self.instances_by_hostname.get(task.execution_node, '') + if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): + self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact, job_impact=True) + if control_instance and control_instance.node_type in ('hybrid', 'control'): + self.instances_by_hostname[task.controller_node].consume_capacity(self.control_task_impact) def __getitem__(self, hostname): return self.instances_by_hostname.get(hostname) @@ -64,42 +137,48 @@ class TaskManagerInstances: class TaskManagerInstanceGroups: - """A class representing minimal data the task manager needs to represent an InstanceGroup.""" + """A class representing minimal data the task manager needs to represent all the InstanceGroups.""" - def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None): + def __init__(self, task_manager_instances=None, instance_groups=None, instance_groups_queryset=None, **kwargs): self.instance_groups = dict() + self.task_manager_instances = task_manager_instances if task_manager_instances is not None else TaskManagerInstances() self.controlplane_ig = None self.pk_ig_map = dict() + self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT) + self.controlplane_ig_name = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME) if instance_groups is not None: # for testing - self.instance_groups = instance_groups + self.instance_groups = {ig.name: TaskManagerInstanceGroup(ig, self.task_manager_instances, **kwargs) for ig in instance_groups} + self.pk_ig_map = {ig.pk: ig for ig in instance_groups} else: if instance_groups_queryset is None: - instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only('name', 'instances') - for instance_group in instance_groups_queryset: - if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: - self.controlplane_ig = instance_group - self.instance_groups[instance_group.name] = dict( - instances=[ - instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname - ], + instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only( + 'name', 'instances', 'max_concurrent_jobs', 'max_forks', 'is_container_group' ) + for instance_group in instance_groups_queryset: + if instance_group.name == self.controlplane_ig_name: + self.controlplane_ig = instance_group + self.instance_groups[instance_group.name] = TaskManagerInstanceGroup(instance_group, self.task_manager_instances, **kwargs) self.pk_ig_map[instance_group.pk] = instance_group + def __getitem__(self, ig_name): + return self.instance_groups.get(ig_name) + + def __contains__(self, ig_name): + return ig_name in self.instance_groups + def get_remaining_capacity(self, group_name): - instances = self.instance_groups[group_name]['instances'] - return sum(inst.remaining_capacity for inst in instances) + return self.instance_groups[group_name].get_remaining_instance_capacity() def get_consumed_capacity(self, group_name): - instances = self.instance_groups[group_name]['instances'] - return sum(inst.consumed_capacity for inst in instances) + return self.instance_groups[group_name].get_consumed_instance_capacity() def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False): impact = impact if impact else task.task_impact capacity_type = capacity_type if capacity_type else task.capacity_type instance_most_capacity = None most_remaining_capacity = -1 - instances = self.instance_groups[instance_group_name]['instances'] + instances = self.instance_groups[instance_group_name].instances for i in instances: if i.node_type not in (capacity_type, 'hybrid'): @@ -107,7 +186,7 @@ class TaskManagerInstanceGroups: would_be_remaining = i.remaining_capacity - impact # hybrid nodes _always_ control their own tasks if add_hybrid_control_cost and i.node_type == 'hybrid': - would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT + would_be_remaining -= self.control_task_impact if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity): instance_most_capacity = i most_remaining_capacity = would_be_remaining @@ -115,10 +194,13 @@ class TaskManagerInstanceGroups: def find_largest_idle_instance(self, instance_group_name, capacity_type='execution'): largest_instance = None - instances = self.instance_groups[instance_group_name]['instances'] + instances = self.instance_groups[instance_group_name].instances for i in instances: if i.node_type not in (capacity_type, 'hybrid'): continue + if i.capacity <= 0: + # We don't want to select an idle instance with 0 capacity + continue if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity: if largest_instance is None: largest_instance = i @@ -139,3 +221,41 @@ class TaskManagerInstanceGroups: logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}") return task.global_instance_groups return igs + + +class TaskManagerModels: + def __init__(self, **kwargs): + # We want to avoid calls to settings over and over in loops, so cache this information here + kwargs['control_task_impact'] = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT) + kwargs['controlplane_ig_name'] = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME) + self.instances = TaskManagerInstances(**kwargs) + self.instance_groups = TaskManagerInstanceGroups(task_manager_instances=self.instances, **kwargs) + + @classmethod + def init_with_consumed_capacity(cls, **kwargs): + tmm = cls(**kwargs) + tasks = kwargs.get('tasks', None) + + if tasks is None: + # No tasks were provided, so we will fetch them from the database + task_status_filter_list = kwargs.get('task_status_filter_list', ['running', 'waiting']) + task_fields = kwargs.get('task_fields', ('task_impact', 'controller_node', 'execution_node', 'instance_group')) + from awx.main.models import UnifiedJob + + tasks = UnifiedJob.objects.filter(status__in=task_status_filter_list).only(*task_fields) + + for task in tasks: + tmm.consume_capacity(task) + + return tmm + + def consume_capacity(self, task): + # Consume capacity on instances, which bubbles up to instance groups they are a member of + self.instances.consume_capacity(task) + + # For container group jobs, additionally we must account for capacity consumed since + # The container groups have no instances to look at to track how many jobs/forks are consumed + if task.instance_group_id: + ig = self.instance_groups.pk_ig_map[task.instance_group_id] + if ig.is_container_group: + self.instance_groups[ig.name].consume_capacity(task) diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py index 28e7e8c655..54891f69a1 100644 --- a/awx/main/tests/functional/task_management/test_capacity.py +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -4,7 +4,7 @@ from awx.main.models import ( Instance, InstanceGroup, ) -from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups class TestInstanceGroupInstanceMapping(TransactionTestCase): @@ -23,11 +23,10 @@ class TestInstanceGroupInstanceMapping(TransactionTestCase): def test_mapping(self): self.sample_cluster() with self.assertNumQueries(3): - instances = TaskManagerInstances([]) # empty task list - instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances) + instance_groups = TaskManagerInstanceGroups() ig_instance_map = instance_groups.instance_groups - assert set(i.hostname for i in ig_instance_map['ig_small']['instances']) == set(['i1']) - assert set(i.hostname for i in ig_instance_map['ig_large']['instances']) == set(['i2', 'i3']) - assert set(i.hostname for i in ig_instance_map['default']['instances']) == set(['i2']) + assert set(i.hostname for i in ig_instance_map['ig_small'].instances) == set(['i1']) + assert set(i.hostname for i in ig_instance_map['ig_large'].instances) == set(['i2', 'i3']) + assert set(i.hostname for i in ig_instance_map['default'].instances) == set(['i2']) diff --git a/awx/main/tests/functional/task_management/test_container_groups.py b/awx/main/tests/functional/task_management/test_container_groups.py index 9c565f16d0..360e8e2df2 100644 --- a/awx/main/tests/functional/task_management/test_container_groups.py +++ b/awx/main/tests/functional/task_management/test_container_groups.py @@ -10,6 +10,10 @@ from awx.main.utils import ( create_temporary_fifo, ) +from awx.main.scheduler import TaskManager + +from . import create_job + @pytest.fixture def containerized_job(default_instance_group, kube_credential, job_template_factory): @@ -34,6 +38,50 @@ def test_containerized_job(containerized_job): assert containerized_job.instance_group.credential.kubernetes +@pytest.mark.django_db +def test_max_concurrent_jobs_blocks_start_of_new_jobs(controlplane_instance_group, containerized_job, mocker): + """Construct a scenario where only 1 job will fit within the max_concurrent_jobs of the container group. + + Since max_concurrent_jobs is set to 1, even though 2 jobs are in pending + and would be launched into the container group, only one will be started. + """ + containerized_job.unified_job_template.allow_simultaneous = True + containerized_job.unified_job_template.save() + default_instance_group = containerized_job.instance_group + default_instance_group.max_concurrent_jobs = 1 + default_instance_group.save() + task_impact = 1 + # Create a second job that should not be scheduled at first, blocked by the other + create_job(containerized_job.unified_job_template) + tm = TaskManager() + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = task_impact + with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: + tm.schedule() + mock_job.assert_called_once() + + +@pytest.mark.django_db +def test_max_forks_blocks_start_of_new_jobs(controlplane_instance_group, containerized_job, mocker): + """Construct a scenario where only 1 job will fit within the max_forks of the container group. + + In this case, we set the container_group max_forks to 10, and make the task_impact of a job 6. + Therefore, only 1 job will fit within the max of 10. + """ + containerized_job.unified_job_template.allow_simultaneous = True + containerized_job.unified_job_template.save() + default_instance_group = containerized_job.instance_group + default_instance_group.max_forks = 10 + # Create a second job that should not be scheduled + create_job(containerized_job.unified_job_template) + tm = TaskManager() + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = 6 + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + tm.schedule() + tm.start_task.assert_called_once() + + @pytest.mark.django_db def test_kubectl_ssl_verification(containerized_job, default_job_execution_environment): containerized_job.execution_environment = default_job_execution_environment diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index b28152ff01..f362841033 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -248,6 +248,76 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke mock_job.assert_called_once_with(j2, controlplane_instance_group, [], instance) +@pytest.mark.django_db +def test_max_concurrent_jobs_ig_capacity_blocking(hybrid_instance, job_template_factory, mocker): + """When max_concurrent_jobs of an instance group is more restrictive than capacity of instances, enforce max_concurrent_jobs.""" + instance = hybrid_instance + controlplane_instance_group = instance.rampart_groups.first() + # We will expect only 1 job to be started + controlplane_instance_group.max_concurrent_jobs = 1 + controlplane_instance_group.save() + num_jobs = 3 + jobs = [] + for i in range(num_jobs): + jobs.append( + create_job(job_template_factory(f'jt{i}', organization=f'org{i}', project=f'proj{i}', inventory=f'inv{i}', credential=f'cred{i}').job_template) + ) + tm = TaskManager() + task_impact = 1 + + # Sanity check that multiple jobs would run if not for the max_concurrent_jobs setting. + assert task_impact * num_jobs < controlplane_instance_group.capacity + tm = TaskManager() + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = task_impact + with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: + tm.schedule() + mock_job.assert_called_once() + jobs[0].status = 'running' + jobs[0].controller_node = instance.hostname + jobs[0].execution_node = instance.hostname + jobs[0].instance_group = controlplane_instance_group + jobs[0].save() + + # while that job is running, we should not start another job + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = task_impact + with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: + tm.schedule() + mock_job.assert_not_called() + # now job is done, we should start one of the two other jobs + jobs[0].status = 'successful' + jobs[0].save() + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = task_impact + with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: + tm.schedule() + mock_job.assert_called_once() + + +@pytest.mark.django_db +def test_max_forks_ig_capacity_blocking(hybrid_instance, job_template_factory, mocker): + """When max_forks of an instance group is less than the capacity of instances, enforce max_forks.""" + instance = hybrid_instance + controlplane_instance_group = instance.rampart_groups.first() + controlplane_instance_group.max_forks = 15 + controlplane_instance_group.save() + task_impact = 10 + num_jobs = 2 + # Sanity check that 2 jobs would run if not for the max_forks setting. + assert controlplane_instance_group.max_forks < controlplane_instance_group.capacity + assert task_impact * num_jobs > controlplane_instance_group.max_forks + assert task_impact * num_jobs < controlplane_instance_group.capacity + for i in range(num_jobs): + create_job(job_template_factory(f'jt{i}', organization=f'org{i}', project=f'proj{i}', inventory=f'inv{i}', credential=f'cred{i}').job_template) + tm = TaskManager() + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: + mock_task_impact.return_value = task_impact + with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: + tm.schedule() + mock_job.assert_called_once() + + @pytest.mark.django_db def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index e21d980dd5..c4da81406c 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -1,10 +1,7 @@ import pytest -from unittest import mock -from unittest.mock import Mock from decimal import Decimal -from awx.main.models import InstanceGroup, Instance -from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups +from awx.main.models import Instance @pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3]) @@ -17,83 +14,6 @@ def test_capacity_adjustment_no_save(capacity_adjustment): assert inst.capacity == (float(inst.capacity_adjustment) * abs(inst.mem_capacity - inst.cpu_capacity) + min(inst.mem_capacity, inst.cpu_capacity)) -def T(impact): - j = mock.Mock(spec_set=['task_impact', 'capacity_type']) - j.task_impact = impact - j.capacity_type = 'execution' - return j - - -def Is(param): - """ - param: - [remaining_capacity1, remaining_capacity2, remaining_capacity3, ...] - [(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...] - """ - - instances = [] - if isinstance(param[0], tuple): - for (jobs_running, capacity) in param: - inst = Mock() - inst.capacity = capacity - inst.jobs_running = jobs_running - inst.node_type = 'execution' - instances.append(inst) - else: - for i in param: - inst = Mock() - inst.remaining_capacity = i - inst.node_type = 'execution' - instances.append(inst) - return instances - - -class TestInstanceGroup(object): - @pytest.mark.parametrize( - 'task,instances,instance_fit_index,reason', - [ - (T(100), Is([100]), 0, "Only one, pick it"), - (T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"), - (T(100), Is([50, 100]), 1, "First instance not as good as second instance"), - (T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."), - (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), - ], - ) - def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): - InstanceGroup(id=10) - tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances}}) - - instance_picked = tm_igs.fit_task_to_most_remaining_capacity_instance(task, 'controlplane') - - if instance_fit_index is None: - assert instance_picked is None, reason - else: - assert instance_picked == instances[instance_fit_index], reason - - @pytest.mark.parametrize( - 'instances,instance_fit_index,reason', - [ - (Is([(0, 100)]), 0, "One idle instance, pick it"), - (Is([(1, 100)]), None, "One un-idle instance, pick nothing"), - (Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"), - (Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"), - (Is([(0, 0)]), None, "One idle but down instance, don't pick it"), - ], - ) - def test_find_largest_idle_instance(self, instances, instance_fit_index, reason): - def filter_offline_instances(*args): - return filter(lambda i: i.capacity > 0, instances) - - InstanceGroup(id=10) - instances_online_only = filter_offline_instances(instances) - tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances_online_only}}) - - if instance_fit_index is None: - assert tm_igs.find_largest_idle_instance('controlplane') is None, reason - else: - assert tm_igs.find_largest_idle_instance('controlplane') == instances[instance_fit_index], reason - - def test_cleanup_params_defaults(): inst = Instance(hostname='foobar') assert inst.get_cleanup_task_kwargs(exclude_strings=['awx_423_']) == {'exclude_strings': ['awx_423_'], 'file_pattern': '/tmp/awx_*_*', 'grace_period': 60} diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py index 740cbc193e..7665c962b7 100644 --- a/awx/main/tests/unit/test_capacity.py +++ b/awx/main/tests/unit/test_capacity.py @@ -1,6 +1,6 @@ import pytest -from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances +from awx.main.scheduler.task_manager_models import TaskManagerModels class FakeMeta(object): @@ -16,38 +16,64 @@ class FakeObject(object): class Job(FakeObject): - task_impact = 43 - is_container_group_task = False - controller_node = '' - execution_node = '' + def __init__(self, **kwargs): + self.task_impact = kwargs.get('task_impact', 43) + self.is_container_group_task = kwargs.get('is_container_group_task', False) + self.controller_node = kwargs.get('controller_node', '') + self.execution_node = kwargs.get('execution_node', '') + self.instance_group = kwargs.get('instance_group', None) + self.instance_group_id = self.instance_group.id if self.instance_group else None + self.capacity_type = kwargs.get('capacity_type', 'execution') def log_format(self): return 'job 382 (fake)' +class Instances(FakeObject): + def add(self, *args): + for instance in args: + self.obj.instance_list.append(instance) + + def all(self): + return self.obj.instance_list + + +class InstanceGroup(FakeObject): + def __init__(self, **kwargs): + super(InstanceGroup, self).__init__(**kwargs) + self.instance_list = [] + self.pk = self.id = kwargs.get('id', 1) + + @property + def instances(self): + mgr = Instances(obj=self) + return mgr + + @property + def is_container_group(self): + return False + + @property + def max_concurrent_jobs(self): + return 0 + + @property + def max_forks(self): + return 0 + + +class Instance(FakeObject): + def __init__(self, **kwargs): + self.node_type = kwargs.get('node_type', 'hybrid') + self.capacity = kwargs.get('capacity', 0) + self.hostname = kwargs.get('hostname', 'fakehostname') + self.consumed_capacity = 0 + self.jobs_running = 0 + + @pytest.fixture def sample_cluster(): def stand_up_cluster(): - class Instances(FakeObject): - def add(self, *args): - for instance in args: - self.obj.instance_list.append(instance) - - def all(self): - return self.obj.instance_list - - class InstanceGroup(FakeObject): - def __init__(self, **kwargs): - super(InstanceGroup, self).__init__(**kwargs) - self.instance_list = [] - - @property - def instances(self): - mgr = Instances(obj=self) - return mgr - - class Instance(FakeObject): - pass ig_small = InstanceGroup(name='ig_small') ig_large = InstanceGroup(name='ig_large') @@ -66,14 +92,12 @@ def sample_cluster(): @pytest.fixture def create_ig_manager(): def _rf(ig_list, tasks): - instances = TaskManagerInstances(tasks, instances=set(inst for ig in ig_list for inst in ig.instance_list)) - - seed_igs = {} - for ig in ig_list: - seed_igs[ig.name] = {'instances': [instances[inst.hostname] for inst in ig.instance_list]} - - instance_groups = TaskManagerInstanceGroups(instance_groups=seed_igs) - return instance_groups + tm_models = TaskManagerModels.init_with_consumed_capacity( + tasks=tasks, + instances=set(inst for ig in ig_list for inst in ig.instance_list), + instance_groups=ig_list, + ) + return tm_models.instance_groups return _rf @@ -126,3 +150,75 @@ def test_RBAC_reduced_filter(sample_cluster, create_ig_manager): # Cross-links between groups not visible to current user, # so a naieve accounting of capacities is returned instead assert instance_groups_mgr.get_consumed_capacity('default') == 43 + + +def Is(param): + """ + param: + [remaining_capacity1, remaining_capacity2, remaining_capacity3, ...] + [(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...] + """ + + instances = [] + if isinstance(param[0], tuple): + for index, (jobs_running, capacity) in enumerate(param): + inst = Instance(capacity=capacity, node_type='execution', hostname=f'fakehost-{index}') + inst.jobs_running = jobs_running + instances.append(inst) + else: + for index, capacity in enumerate(param): + inst = Instance(capacity=capacity, node_type='execution', hostname=f'fakehost-{index}') + inst.node_type = 'execution' + instances.append(inst) + return instances + + +class TestSelectBestInstanceForTask(object): + @pytest.mark.parametrize( + 'task,instances,instance_fit_index,reason', + [ + (Job(task_impact=100), Is([100]), 0, "Only one, pick it"), + (Job(task_impact=100), Is([100, 100]), 0, "Two equally good fits, pick the first"), + (Job(task_impact=100), Is([50, 100]), 1, "First instance not as good as second instance"), + (Job(task_impact=100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."), + (Job(task_impact=100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), + ], + ) + def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): + ig = InstanceGroup(id=10, name='controlplane') + tasks = [] + for instance in instances: + ig.instances.add(instance) + for _ in range(instance.jobs_running): + tasks.append(Job(execution_node=instance.hostname, controller_node=instance.hostname, instance_group=ig)) + tm_models = TaskManagerModels.init_with_consumed_capacity(tasks=tasks, instances=instances, instance_groups=[ig]) + instance_picked = tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(task, 'controlplane') + + if instance_fit_index is None: + assert instance_picked is None, reason + else: + assert instance_picked.hostname == instances[instance_fit_index].hostname, reason + + @pytest.mark.parametrize( + 'instances,instance_fit_index,reason', + [ + (Is([(0, 100)]), 0, "One idle instance, pick it"), + (Is([(1, 100)]), None, "One un-idle instance, pick nothing"), + (Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"), + (Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"), + (Is([(0, 0)]), None, "One idle but down instance, don't pick it"), + ], + ) + def test_find_largest_idle_instance(self, instances, instance_fit_index, reason): + ig = InstanceGroup(id=10, name='controlplane') + tasks = [] + for instance in instances: + ig.instances.add(instance) + for _ in range(instance.jobs_running): + tasks.append(Job(execution_node=instance.hostname, controller_node=instance.hostname, instance_group=ig)) + tm_models = TaskManagerModels.init_with_consumed_capacity(tasks=tasks, instances=instances, instance_groups=[ig]) + + if instance_fit_index is None: + assert tm_models.instance_groups.find_largest_idle_instance('controlplane') is None, reason + else: + assert tm_models.instance_groups.find_largest_idle_instance('controlplane').hostname == instances[instance_fit_index].hostname, reason