mirror of
https://github.com/ansible/awx.git
synced 2026-01-18 05:01:19 -03:30
use task manager models more consistently in serializer
This commit is contained in:
parent
4b7b3c7c7d
commit
e403c603d6
@ -5040,12 +5040,10 @@ class InstanceHealthCheckSerializer(BaseSerializer):
|
||||
class InstanceGroupSerializer(BaseSerializer):
|
||||
|
||||
show_capabilities = ['edit', 'delete']
|
||||
|
||||
capacity = serializers.SerializerMethodField()
|
||||
consumed_capacity = serializers.SerializerMethodField()
|
||||
percent_capacity_remaining = serializers.SerializerMethodField()
|
||||
jobs_running = serializers.IntegerField(
|
||||
help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance group'), read_only=True
|
||||
)
|
||||
jobs_running = serializers.SerializerMethodField()
|
||||
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance group'), read_only=True)
|
||||
instances = serializers.SerializerMethodField()
|
||||
is_container_group = serializers.BooleanField(
|
||||
@ -5206,14 +5204,24 @@ class InstanceGroupSerializer(BaseSerializer):
|
||||
ig_mgr = self.get_ig_mgr()
|
||||
return ig_mgr.get_consumed_capacity(obj.name)
|
||||
|
||||
def get_percent_capacity_remaining(self, obj):
|
||||
if not obj.capacity:
|
||||
return 0.0
|
||||
def get_capacity(self, obj):
|
||||
ig_mgr = self.get_ig_mgr()
|
||||
return float("{0:.2f}".format((float(ig_mgr.get_remaining_capacity(obj.name)) / (float(obj.capacity))) * 100))
|
||||
return ig_mgr.get_capacity(obj.name)
|
||||
|
||||
def get_percent_capacity_remaining(self, obj):
|
||||
capacity = self.get_capacity(obj)
|
||||
if not capacity:
|
||||
return 0.0
|
||||
consumed_capacity = self.get_consumed_capacity(obj)
|
||||
return float("{0:.2f}".format(((float(capacity) - float(consumed_capacity)) / (float(capacity))) * 100))
|
||||
|
||||
def get_instances(self, obj):
|
||||
return obj.instances.count()
|
||||
ig_mgr = self.get_ig_mgr()
|
||||
return len(ig_mgr.get_instances(obj.name))
|
||||
|
||||
def get_jobs_running(self, obj):
|
||||
ig_mgr = self.get_ig_mgr()
|
||||
return ig_mgr.get_jobs_running(obj.name)
|
||||
|
||||
|
||||
class ActivityStreamSerializer(BaseSerializer):
|
||||
|
||||
@ -237,9 +237,7 @@ def projects_by_scm_type(since, **kwargs):
|
||||
def instance_info(since, include_hostnames=False, **kwargs):
|
||||
info = {}
|
||||
# Use same method that the TaskManager does to compute consumed capacity without querying all running jobs for each Instance
|
||||
tm_models = TaskManagerModels.init_with_consumed_capacity(
|
||||
instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled']
|
||||
)
|
||||
tm_models = TaskManagerModels.init_with_consumed_capacity(instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'])
|
||||
for tm_instance in tm_models.instances.instances_by_hostname.values():
|
||||
instance = tm_instance.obj
|
||||
instance_info = {
|
||||
|
||||
@ -394,6 +394,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
|
||||
|
||||
@property
|
||||
def capacity(self):
|
||||
if self.is_container_group:
|
||||
return self.max_forks
|
||||
return sum(inst.capacity for inst in self.instances.all())
|
||||
|
||||
@property
|
||||
|
||||
@ -64,44 +64,68 @@ class TaskManagerInstanceGroup:
|
||||
def get_remaining_instance_capacity(self):
|
||||
return sum(inst.remaining_capacity for inst in self.instances)
|
||||
|
||||
def get_instance_capacity(self):
|
||||
return sum(inst.capacity for inst in self.instances)
|
||||
|
||||
def get_consumed_instance_capacity(self):
|
||||
return sum(inst.consumed_capacity for inst in self.instances)
|
||||
|
||||
def get_instance_jobs_running(self):
|
||||
return sum(inst.jobs_running for inst in self.instances)
|
||||
|
||||
def get_jobs_running(self):
|
||||
if self.is_container_group:
|
||||
return self.container_group_jobs
|
||||
return sum(inst.jobs_running for inst in self.instances)
|
||||
|
||||
def get_capacity(self):
|
||||
"""This reports any type of capacity, including that of container group jobs.
|
||||
|
||||
Container groups don't really have capacity, but if they have max_forks set,
|
||||
we can interperet that as how much capacity the user has defined them to have.
|
||||
"""
|
||||
if self.is_container_group:
|
||||
return self.max_forks
|
||||
return self.get_instance_capacity()
|
||||
|
||||
def get_consumed_capacity(self):
|
||||
if self.is_container_group:
|
||||
return self.container_group_consumed_forks
|
||||
return self.get_consumed_instance_capacity()
|
||||
|
||||
def get_remaining_capacity(self):
|
||||
return self.get_capacity() - self.get_consumed_capacity()
|
||||
|
||||
def has_remaining_capacity(self, task=None, control_impact=False):
|
||||
"""Pass either a task or control_impact=True to determine if the IG has capacity to run the control task or job task."""
|
||||
task_impact = self.control_task_impact if control_impact else task.task_impact
|
||||
job_impact = 0 if control_impact else 1
|
||||
task_string = f"task {task.log_format} with impact of {task_impact}" if task else f"control task with impact of {task_impact}"
|
||||
|
||||
# We only want to loop over instances if self.max_concurrent_jobs is set
|
||||
if self.max_concurrent_jobs == 0:
|
||||
# Override the calculated remaining capacity, because when max_concurrent_jobs == 0 we don't enforce any max
|
||||
remaining_jobs = 0
|
||||
else:
|
||||
instance_jobs_running = self.get_instance_jobs_running()
|
||||
remaining_jobs = self.max_concurrent_jobs - instance_jobs_running - self.container_group_jobs - job_impact
|
||||
remaining_jobs = self.max_concurrent_jobs - self.get_jobs_running() - job_impact
|
||||
|
||||
# We only want to loop over instances if self.max_forks is set
|
||||
if self.max_forks == 0:
|
||||
# Override the calculated remaining capacity, because when max_forks == 0 we don't enforce any max
|
||||
remaining_forks = 0
|
||||
else:
|
||||
instance_consumed_forks = self.get_consumed_instance_capacity()
|
||||
remaining_forks = self.max_forks - instance_consumed_forks - self.container_group_consumed_forks - task_impact
|
||||
remaining_forks = self.max_forks - self.get_consumed_capacity() - task_impact
|
||||
|
||||
if remaining_jobs < 0 or remaining_forks < 0:
|
||||
# A value less than zero means the task will not fit on the group
|
||||
task_string = f"task {task.log_format}" if task else "control task"
|
||||
if remaining_jobs < 0:
|
||||
logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_jobs} remaining jobs")
|
||||
if remaining_forks < 0:
|
||||
impact_string = f"with impact {task_impact}"
|
||||
logger.debug(f"{task_string} {impact_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks")
|
||||
logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks")
|
||||
return False
|
||||
|
||||
# Returning true means there is enough remaining capacity on the group to run the task (or no instance group level limits are being set)
|
||||
logger.debug(f"{task_string} can fit on instance group {self.name} with {remaining_forks} remaining forks and {remaining_jobs}")
|
||||
return True
|
||||
|
||||
|
||||
@ -171,7 +195,16 @@ class TaskManagerInstanceGroups:
|
||||
return self.instance_groups[group_name].get_remaining_instance_capacity()
|
||||
|
||||
def get_consumed_capacity(self, group_name):
|
||||
return self.instance_groups[group_name].get_consumed_instance_capacity()
|
||||
return self.instance_groups[group_name].get_consumed_capacity()
|
||||
|
||||
def get_jobs_running(self, group_name):
|
||||
return self.instance_groups[group_name].get_jobs_running()
|
||||
|
||||
def get_capacity(self, group_name):
|
||||
return self.instance_groups[group_name].get_capacity()
|
||||
|
||||
def get_instances(self, group_name):
|
||||
return self.instance_groups[group_name].instances
|
||||
|
||||
def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False):
|
||||
impact = impact if impact else task.task_impact
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user