mirror of
https://github.com/ansible/awx.git
synced 2026-03-01 08:48:46 -03:30
Merge pull request #1637 from chrismeyersfsu/fix-instance_removed_from_group
handle instance group names unicode
This commit is contained in:
@@ -6,6 +6,7 @@ from datetime import datetime, timedelta
|
|||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
import json
|
import json
|
||||||
|
import six
|
||||||
from sets import Set
|
from sets import Set
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@@ -422,50 +423,50 @@ class TaskManager():
|
|||||||
def process_dependencies(self, dependent_task, dependency_tasks):
|
def process_dependencies(self, dependent_task, dependency_tasks):
|
||||||
for task in dependency_tasks:
|
for task in dependency_tasks:
|
||||||
if self.is_job_blocked(task):
|
if self.is_job_blocked(task):
|
||||||
logger.debug("Dependent %s is blocked from running", task.log_format)
|
logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format))
|
||||||
continue
|
continue
|
||||||
preferred_instance_groups = task.preferred_instance_groups
|
preferred_instance_groups = task.preferred_instance_groups
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
for rampart_group in preferred_instance_groups:
|
for rampart_group in preferred_instance_groups:
|
||||||
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
if self.get_remaining_capacity(rampart_group.name) <= 0:
|
||||||
logger.debug("Skipping group %s capacity <= 0", rampart_group.name)
|
logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name))
|
||||||
continue
|
continue
|
||||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
if not self.would_exceed_capacity(task, rampart_group.name):
|
||||||
logger.debug("Starting dependent %s in group %s", task.log_format, rampart_group.name)
|
logger.debug(six.text_type("Starting dependent {} in group {}").format(task.log_format, rampart_group.name))
|
||||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||||
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
|
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
|
||||||
tasks_to_fail += [dependent_task]
|
tasks_to_fail += [dependent_task]
|
||||||
self.start_task(task, rampart_group, tasks_to_fail)
|
self.start_task(task, rampart_group, tasks_to_fail)
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("Dependent %s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
||||||
|
|
||||||
def process_pending_tasks(self, pending_tasks):
|
def process_pending_tasks(self, pending_tasks):
|
||||||
for task in pending_tasks:
|
for task in pending_tasks:
|
||||||
self.process_dependencies(task, self.generate_dependencies(task))
|
self.process_dependencies(task, self.generate_dependencies(task))
|
||||||
if self.is_job_blocked(task):
|
if self.is_job_blocked(task):
|
||||||
logger.debug("%s is blocked from running", task.log_format)
|
logger.debug(six.text_type("{} is blocked from running").format(task.log_format))
|
||||||
continue
|
continue
|
||||||
preferred_instance_groups = task.preferred_instance_groups
|
preferred_instance_groups = task.preferred_instance_groups
|
||||||
found_acceptable_queue = False
|
found_acceptable_queue = False
|
||||||
for rampart_group in preferred_instance_groups:
|
for rampart_group in preferred_instance_groups:
|
||||||
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
|
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
|
||||||
if remaining_capacity <= 0:
|
if remaining_capacity <= 0:
|
||||||
logger.debug("Skipping group %s, remaining_capacity %s <= 0",
|
logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
|
||||||
rampart_group.name, remaining_capacity)
|
rampart_group.name, remaining_capacity))
|
||||||
continue
|
continue
|
||||||
if not self.would_exceed_capacity(task, rampart_group.name):
|
if not self.would_exceed_capacity(task, rampart_group.name):
|
||||||
logger.debug("Starting %s in group %s (remaining_capacity=%s)",
|
logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format(
|
||||||
task.log_format, rampart_group.name, remaining_capacity)
|
task.log_format, rampart_group.name, remaining_capacity))
|
||||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||||
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
|
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
|
||||||
found_acceptable_queue = True
|
found_acceptable_queue = True
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)",
|
logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format(
|
||||||
task.log_format, rampart_group.name, remaining_capacity)
|
task.log_format, rampart_group.name, remaining_capacity))
|
||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
|
logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
|
||||||
|
|
||||||
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
|
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
|
||||||
isolated=False):
|
isolated=False):
|
||||||
@@ -576,9 +577,9 @@ class TaskManager():
|
|||||||
return (task.task_impact + current_capacity > capacity_total)
|
return (task.task_impact + current_capacity > capacity_total)
|
||||||
|
|
||||||
def consume_capacity(self, task, instance_group):
|
def consume_capacity(self, task, instance_group):
|
||||||
logger.debug('%s consumed %s capacity units from %s with prior total of %s',
|
logger.debug(six.text_type('{} consumed {} capacity units from {} with prior total of {}').format(
|
||||||
task.log_format, task.task_impact, instance_group,
|
task.log_format, task.task_impact, instance_group,
|
||||||
self.graph[instance_group]['consumed_capacity'])
|
self.graph[instance_group]['consumed_capacity']))
|
||||||
self.graph[instance_group]['consumed_capacity'] += task.task_impact
|
self.graph[instance_group]['consumed_capacity'] += task.task_impact
|
||||||
|
|
||||||
def get_remaining_capacity(self, instance_group):
|
def get_remaining_capacity(self, instance_group):
|
||||||
|
|||||||
Reference in New Issue
Block a user