From ed37e68c53f1ad7a037da94e622f6f0dee7850fe Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 31 Oct 2016 14:16:59 -0500 Subject: [PATCH] run dependencies when capacity is available --- awx/main/scheduler/__init__.py | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 99bc87917c..0f551997bf 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -157,8 +157,6 @@ class Scheduler(): def start_task(self, task, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success - status_changed = False - task_actual = { 'type':task.get_job_type_str(), 'id': task['id'], @@ -169,13 +167,10 @@ class Scheduler(): success_handler = handle_work_success.s(task_actual=task_actual) job_obj = task.get_full() - if job_obj.status == 'pending': - status_changed = True - job_obj.status = 'waiting' + job_obj.status = 'waiting' (start_status, opts) = job_obj.pre_start() if not start_status: - status_changed = True job_obj.status = 'failed' if job_obj.job_explanation: job_obj.job_explanation += ' ' @@ -185,33 +180,27 @@ class Scheduler(): else: if type(job_obj) is WorkflowJob: job_obj.status = 'running' - status_changed = True - if status_changed is True: - job_obj.save() + job_obj.save() self.consume_capacity(task) def post_commit(): - if status_changed: - job_obj.websocket_emit_status(job_obj.status) + job_obj.websocket_emit_status(job_obj.status) if job_obj.status != 'failed': job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) connection.on_commit(post_commit) def process_runnable_tasks(self, runnable_tasks): - for i, task in enumerate(runnable_tasks): - # TODO: maybe batch process new tasks. - # Processing a new task individually seems to be expensive - self.graph.add_job(task) + map(lambda task: self.graph.add_job(task), runnable_tasks) def create_project_update(self, task): dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency') # Project created 1 seconds behind dep.created = task['created'] - timedelta(seconds=1) - dep.status = 'waiting' + dep.status = 'pending' dep.save() project_task = ProjectUpdateDict.get_partial(dep.id) @@ -222,7 +211,7 @@ class Scheduler(): dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency') dep.created = task['created'] - timedelta(seconds=2) - dep.status = 'waiting' + dep.status = 'pending' dep.save() inventory_task = InventoryUpdateDict.get_partial(dep.id) @@ -267,6 +256,9 @@ class Scheduler(): def process_pending_tasks(self, pending_tasks): for task in pending_tasks: + # Stop processing tasks if we know we are out of capacity + if self.get_remaining_capacity() <= 0: + return if not self.graph.is_job_blocked(task): dependencies = self.generate_dependencies(task) @@ -280,10 +272,6 @@ class Scheduler(): else: self.graph.add_job(task) - # Stop processing tasks if we know we are out of capacity - if self.get_remaining_capacity() <= 0: - return - def process_celery_tasks(self, active_tasks, all_running_sorted_tasks): ''' Rectify tower db <-> celery inconsistent view of jobs state @@ -329,8 +317,8 @@ class Scheduler(): self.calculate_capacity_used(running_tasks) self.process_runnable_tasks(runnable_tasks) - - pending_tasks = filter(lambda t: t['status'] == 'pending', all_sorted_tasks) + + pending_tasks = filter(lambda t: t['status'] in 'pending', all_sorted_tasks) self.process_pending_tasks(pending_tasks) def _schedule(self):