Use global capacity algorithm in serializer

The task manager was doing work to compute currently consumed
capacity, this is moved into the manager and applied in the
same form to the instance group list.
This commit is contained in:
AlanCoding
2017-08-27 13:14:35 -04:00
parent ce3c969c08
commit 5327a4c622
8 changed files with 216 additions and 81 deletions

View File

@@ -41,7 +41,7 @@ class TaskManager():
for rampart_group in InstanceGroup.objects.all():
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
capacity_used=0)
consumed_capacity=0)
def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@@ -492,25 +492,11 @@ class TaskManager():
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time)
def calculate_capacity_used(self, tasks):
for rampart_group in self.graph:
self.graph[rampart_group]['capacity_used'] = 0
instance_ig_mapping, ig_ig_mapping = Instance.objects.capacity_mapping()
for t in tasks:
# TODO: dock capacity for isolated job management tasks running in queue
if t.status == 'waiting':
# Subtract capacity from any peer groups that share instances
for instance_group_name in ig_ig_mapping[t.instance_group.name]:
self.graph[instance_group_name]['capacity_used'] += t.task_impact
elif t.status == 'running':
# Subtract capacity from all groups that contain the instance
for instance_group_name in instance_ig_mapping[t.execution_node]:
self.graph[instance_group_name]['capacity_used'] += t.task_impact
else:
logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format)
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
def would_exceed_capacity(self, task, instance_group):
current_capacity = self.graph[instance_group]['capacity_used']
current_capacity = self.graph[instance_group]['consumed_capacity']
capacity_total = self.graph[instance_group]['capacity_total']
if current_capacity == 0:
return False
@@ -519,16 +505,16 @@ class TaskManager():
def consume_capacity(self, task, instance_group):
logger.debug('%s consumed %s capacity units from %s with prior total of %s',
task.log_format, task.task_impact, instance_group,
self.graph[instance_group]['capacity_used'])
self.graph[instance_group]['capacity_used'] += task.task_impact
self.graph[instance_group]['consumed_capacity'])
self.graph[instance_group]['consumed_capacity'] += task.task_impact
def get_remaining_capacity(self, instance_group):
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used'])
return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity'])
def process_tasks(self, all_sorted_tasks):
running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks)
self.calculate_capacity_used(running_tasks)
self.calculate_capacity_consumed(running_tasks)
self.process_running_tasks(running_tasks)

View File

@@ -3,7 +3,7 @@
import logging
# Celery
from celery import task
from celery import Task, task
# AWX
from awx.main.scheduler import TaskManager
@@ -15,6 +15,12 @@ logger = logging.getLogger('awx.main.scheduler')
# updated model, the call to schedule() may get stale data.
class LogErrorsTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc)
super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)
@task
def run_job_launch(job_id):
TaskManager().schedule()
@@ -25,7 +31,7 @@ def run_job_complete(job_id):
TaskManager().schedule()
@task
@task(base=LogErrorsTask)
def run_task_manager():
logger.debug("Running Tower task manager.")
TaskManager().schedule()