From e720fe5dd07845d1493d4ac28f88f531684ec2c2 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 22 May 2018 18:09:29 -0400 Subject: [PATCH] decide the node a job will run early * Deciding the Instance that a Job runs on at celery task run-time makes it hard to evenly distribute tasks among Instnaces. Instead, the task manager will look at the world of running jobs and choose an instance node to run on; applying a deterministic job distribution algo. --- awx/main/models/ha.py | 23 ++++++++++++ awx/main/models/unified_jobs.py | 2 +- awx/main/scheduler/task_manager.py | 57 ++++++++++++++++++++++-------- awx/main/tasks.py | 4 ++- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 42e8117061..f158582871 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -92,6 +92,10 @@ class Instance(BaseModel): return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + @property + def remaining_capacity(self): + return self.capacity - self.consumed_capacity + @property def role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing @@ -187,6 +191,25 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): validate_queuename(self.name) return self.name + def fit_task_to_most_remaining_capacity_instance(self, task): + instance_most_capacity = None + for i in self.instances.order_by('hostname'): + if i.remaining_capacity >= task.task_impact and \ + (instance_most_capacity is None or + i.remaining_capacity > instance_most_capacity.remaining_capacity): + instance_most_capacity = i + return instance_most_capacity + + def find_largest_idle_instance(self): + largest_instance = None + for i in self.instances.order_by('hostname'): + if i.jobs_running == 0: + if largest_instance is None: + largest_instance = i + elif i.capacity > largest_instance.capacity: + largest_instance = i + return largest_instance + class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index fd3d6f4082..750472323b 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1228,9 +1228,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique raise RuntimeError("Expected celery_task_id to be set on model.") kwargs['task_id'] = self.celery_task_id task_class = self._get_task_class() + args = [self.pk] from awx.main.models.ha import InstanceGroup ig = InstanceGroup.objects.get(name=queue) - args = [self.pk] if ig.controller_id: if self.supports_isolation(): # case of jobs and ad hoc commands isolated_instance = ig.instances.order_by('-capacity').first() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 810fbafdac..1408601b79 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -234,7 +234,7 @@ class TaskManager(): def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] - def start_task(self, task, rampart_group, dependent_tasks=None): + def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): from awx.main.tasks import handle_work_error, handle_work_success dependent_tasks = dependent_tasks or [] @@ -269,7 +269,11 @@ class TaskManager(): task.log_format, task.instance_group_id, rampart_group.controller_id) else: task.instance_group = rampart_group - logger.info('Submitting %s to instance group %s.', task.log_format, task.instance_group_id) + if instance is not None: + task.execution_node = instance.hostname + logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format)) + logger.info(six.text_type('Submitting {} to <{},{}>.').format( + task.log_format, task.instance_group_id, task.execution_node)) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -280,8 +284,8 @@ class TaskManager(): def post_commit(): task.websocket_emit_status(task.status) if task.status != 'failed': - if rampart_group is not None: - actual_queue=rampart_group.name + if instance is not None: + actual_queue=instance.hostname else: actual_queue=settings.CELERY_DEFAULT_QUEUE task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue) @@ -433,17 +437,32 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + idle_instance_that_fits = None for rampart_group in preferred_instance_groups: + if idle_instance_that_fits is None: + idle_instance_that_fits = rampart_group.find_largest_idle_instance() if self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name)) continue - if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug(six.text_type("Starting dependent {} in group {}").format(task.log_format, rampart_group.name)) + + execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) + if execution_instance: + logger.debug(six.text_type("Starting dependent {} in group {} instance {}").format( + task.log_format, rampart_group.name, execution_instance.hostname)) + elif not execution_instance and idle_instance_that_fits: + execution_instance = idle_instance_that_fits + logger.debug(six.text_type("Starting dependent {} in group {} on idle instance {}").format( + task.log_format, rampart_group.name, execution_instance.hostname)) + if execution_instance: self.graph[rampart_group.name]['graph'].add_job(task) tasks_to_fail = filter(lambda t: t != task, dependency_tasks) tasks_to_fail += [dependent_task] - self.start_task(task, rampart_group, tasks_to_fail) + self.start_task(task, rampart_group, tasks_to_fail, execution_instance) found_acceptable_queue = True + break + else: + logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format( + rampart_group.name, task.log_format, task.task_impact)) if not found_acceptable_queue: logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) @@ -455,25 +474,35 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + idle_instance_that_fits = None if isinstance(task, WorkflowJob): - self.start_task(task, None, task.get_jobs_fail_chain()) + self.start_task(task, None, task.get_jobs_fail_chain(), None) continue for rampart_group in preferred_instance_groups: + if idle_instance_that_fits is None: + idle_instance_that_fits = rampart_group.find_largest_idle_instance() remaining_capacity = self.get_remaining_capacity(rampart_group.name) if remaining_capacity <= 0: logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format( rampart_group.name, remaining_capacity)) continue - if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format( - task.log_format, rampart_group.name, remaining_capacity)) + + execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) + if execution_instance: + logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format( + task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) + elif not execution_instance and idle_instance_that_fits: + execution_instance = idle_instance_that_fits + logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format( + task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) + if execution_instance: self.graph[rampart_group.name]['graph'].add_job(task) - self.start_task(task, rampart_group, task.get_jobs_fail_chain()) + self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True break else: - logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format( - task.log_format, rampart_group.name, remaining_capacity)) + logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format( + rampart_group.name, task.log_format, task.task_impact)) if not found_acceptable_queue: logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 613caa6320..d1a06d2af7 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -872,10 +872,12 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' + ''' execution_node = settings.CLUSTER_HOST_ID if isolated_host is not None: execution_node = isolated_host - instance = self.update_model(pk, status='running', execution_node=execution_node, + ''' + instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords instance.websocket_emit_status("running")