From e403c603d68def3b4cc6e4b253092343d646da1a Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Thu, 27 Oct 2022 21:21:15 -0400 Subject: [PATCH] use task manager models more consistently in serializer --- awx/api/serializers.py | 26 +++++++----- awx/main/analytics/collectors.py | 4 +- awx/main/models/ha.py | 2 + awx/main/scheduler/task_manager_models.py | 49 +++++++++++++++++++---- 4 files changed, 61 insertions(+), 20 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 4249715482..99676c668d 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5040,12 +5040,10 @@ class InstanceHealthCheckSerializer(BaseSerializer): class InstanceGroupSerializer(BaseSerializer): show_capabilities = ['edit', 'delete'] - + capacity = serializers.SerializerMethodField() consumed_capacity = serializers.SerializerMethodField() percent_capacity_remaining = serializers.SerializerMethodField() - jobs_running = serializers.IntegerField( - help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance group'), read_only=True - ) + jobs_running = serializers.SerializerMethodField() jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance group'), read_only=True) instances = serializers.SerializerMethodField() is_container_group = serializers.BooleanField( @@ -5206,14 +5204,24 @@ class InstanceGroupSerializer(BaseSerializer): ig_mgr = self.get_ig_mgr() return ig_mgr.get_consumed_capacity(obj.name) - def get_percent_capacity_remaining(self, obj): - if not obj.capacity: - return 0.0 + def get_capacity(self, obj): ig_mgr = self.get_ig_mgr() - return float("{0:.2f}".format((float(ig_mgr.get_remaining_capacity(obj.name)) / (float(obj.capacity))) * 100)) + return ig_mgr.get_capacity(obj.name) + + def get_percent_capacity_remaining(self, obj): + capacity = self.get_capacity(obj) + if not capacity: + return 0.0 + consumed_capacity = self.get_consumed_capacity(obj) + return float("{0:.2f}".format(((float(capacity) - float(consumed_capacity)) / (float(capacity))) * 100)) def get_instances(self, obj): - return obj.instances.count() + ig_mgr = self.get_ig_mgr() + return len(ig_mgr.get_instances(obj.name)) + + def get_jobs_running(self, obj): + ig_mgr = self.get_ig_mgr() + return ig_mgr.get_jobs_running(obj.name) class ActivityStreamSerializer(BaseSerializer): diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 7e6f57a900..30bf730784 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -237,9 +237,7 @@ def projects_by_scm_type(since, **kwargs): def instance_info(since, include_hostnames=False, **kwargs): info = {} # Use same method that the TaskManager does to compute consumed capacity without querying all running jobs for each Instance - tm_models = TaskManagerModels.init_with_consumed_capacity( - instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'] - ) + tm_models = TaskManagerModels.init_with_consumed_capacity(instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled']) for tm_instance in tm_models.instances.instances_by_hostname.values(): instance = tm_instance.obj instance_info = { diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index bf43872131..268f0f6996 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -394,6 +394,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): @property def capacity(self): + if self.is_container_group: + return self.max_forks return sum(inst.capacity for inst in self.instances.all()) @property diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index 2580bceb82..e8ae4ba144 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -64,44 +64,68 @@ class TaskManagerInstanceGroup: def get_remaining_instance_capacity(self): return sum(inst.remaining_capacity for inst in self.instances) + def get_instance_capacity(self): + return sum(inst.capacity for inst in self.instances) + def get_consumed_instance_capacity(self): return sum(inst.consumed_capacity for inst in self.instances) def get_instance_jobs_running(self): return sum(inst.jobs_running for inst in self.instances) + def get_jobs_running(self): + if self.is_container_group: + return self.container_group_jobs + return sum(inst.jobs_running for inst in self.instances) + + def get_capacity(self): + """This reports any type of capacity, including that of container group jobs. + + Container groups don't really have capacity, but if they have max_forks set, + we can interperet that as how much capacity the user has defined them to have. + """ + if self.is_container_group: + return self.max_forks + return self.get_instance_capacity() + + def get_consumed_capacity(self): + if self.is_container_group: + return self.container_group_consumed_forks + return self.get_consumed_instance_capacity() + + def get_remaining_capacity(self): + return self.get_capacity() - self.get_consumed_capacity() + def has_remaining_capacity(self, task=None, control_impact=False): """Pass either a task or control_impact=True to determine if the IG has capacity to run the control task or job task.""" task_impact = self.control_task_impact if control_impact else task.task_impact job_impact = 0 if control_impact else 1 + task_string = f"task {task.log_format} with impact of {task_impact}" if task else f"control task with impact of {task_impact}" # We only want to loop over instances if self.max_concurrent_jobs is set if self.max_concurrent_jobs == 0: # Override the calculated remaining capacity, because when max_concurrent_jobs == 0 we don't enforce any max remaining_jobs = 0 else: - instance_jobs_running = self.get_instance_jobs_running() - remaining_jobs = self.max_concurrent_jobs - instance_jobs_running - self.container_group_jobs - job_impact + remaining_jobs = self.max_concurrent_jobs - self.get_jobs_running() - job_impact # We only want to loop over instances if self.max_forks is set if self.max_forks == 0: # Override the calculated remaining capacity, because when max_forks == 0 we don't enforce any max remaining_forks = 0 else: - instance_consumed_forks = self.get_consumed_instance_capacity() - remaining_forks = self.max_forks - instance_consumed_forks - self.container_group_consumed_forks - task_impact + remaining_forks = self.max_forks - self.get_consumed_capacity() - task_impact if remaining_jobs < 0 or remaining_forks < 0: # A value less than zero means the task will not fit on the group - task_string = f"task {task.log_format}" if task else "control task" if remaining_jobs < 0: logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_jobs} remaining jobs") if remaining_forks < 0: - impact_string = f"with impact {task_impact}" - logger.debug(f"{task_string} {impact_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks") + logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks") return False # Returning true means there is enough remaining capacity on the group to run the task (or no instance group level limits are being set) + logger.debug(f"{task_string} can fit on instance group {self.name} with {remaining_forks} remaining forks and {remaining_jobs}") return True @@ -171,7 +195,16 @@ class TaskManagerInstanceGroups: return self.instance_groups[group_name].get_remaining_instance_capacity() def get_consumed_capacity(self, group_name): - return self.instance_groups[group_name].get_consumed_instance_capacity() + return self.instance_groups[group_name].get_consumed_capacity() + + def get_jobs_running(self, group_name): + return self.instance_groups[group_name].get_jobs_running() + + def get_capacity(self, group_name): + return self.instance_groups[group_name].get_capacity() + + def get_instances(self, group_name): + return self.instance_groups[group_name].instances def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False): impact = impact if impact else task.task_impact