diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 7c7e9e5ab3..65664fc56b 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -291,7 +291,6 @@ class TaskManager(): 'Celery, so it has been marked as failed.', )) task_obj.save() - print("Going to fail %s" % task_obj.id) connection.on_commit(lambda: task_obj.websocket_emit_status('failed')) logger.error("Task %s appears orphaned... marking as failed" % task) @@ -312,7 +311,6 @@ class TaskManager(): return (self.capacity_total - self.capacity_used) def process_tasks(self, all_sorted_tasks): - running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks) runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks) diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index edd49c98a9..08c00977c1 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -12,7 +12,11 @@ from awx.main.scheduler.partial import ( class DependencyGraph(object): PROJECT_UPDATES = 'project_updates' INVENTORY_UPDATES = 'inventory_updates' + JOB_TEMPLATE_JOBS = 'job_template_jobs' + JOB_PROJECT_IDS = 'job_project_ids' + JOB_INVENTORY_IDS = 'job_inventory_ids' + SYSTEM_JOB = 'system_job' INVENTORY_SOURCE_UPDATES = 'inventory_source_updates' WORKFLOW_JOB_TEMPLATES_JOBS = 'workflow_job_template_jobs' @@ -30,6 +34,16 @@ class DependencyGraph(object): self.data[self.INVENTORY_UPDATES] = {} # job_template_id -> True / False self.data[self.JOB_TEMPLATE_JOBS] = {} + + ''' + Track runnable job related project and inventory to ensure updates + don't run while a job needing those resources is running. + ''' + # project_id -> True / False + self.data[self.JOB_PROJECT_IDS] = {} + # inventory_id -> True / False + self.data[self.JOB_INVENTORY_IDS] = {} + # inventory_source_id -> True / False self.data[self.INVENTORY_SOURCE_UPDATES] = {} # True / False @@ -138,21 +152,23 @@ class DependencyGraph(object): self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False def mark_job_template_job(self, job): - self.data[self.INVENTORY_UPDATES][job['inventory_id']] = False - self.data[self.PROJECT_UPDATES][job['project_id']] = False + self.data[self.JOB_INVENTORY_IDS][job['inventory_id']] = False + self.data[self.JOB_PROJECT_IDS][job['project_id']] = False self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False def mark_workflow_job(self, job): self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS][job['workflow_job_template_id']] = False def can_project_update_run(self, job): - return self.data[self.PROJECT_UPDATES].get(job['project_id'], True) + return self.data[self.JOB_PROJECT_IDS].get(job['project_id'], True) and \ + self.data[self.PROJECT_UPDATES].get(job['project_id'], True) def can_inventory_update_run(self, job): - return self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True) + return self.data[self.JOB_INVENTORY_IDS].get(job['inventory_source__inventory_id'], True) and \ + self.data[self.INVENTORY_SOURCE_UPDATES].get(job['inventory_source_id'], True) def can_job_run(self, job): - if self.can_project_update_run(job) is True and \ + if self.data[self.PROJECT_UPDATES].get(job['project_id'], True) is True and \ self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) is True: if job['allow_simultaneous'] is False: return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True)