From 46faeffbb3888c9fcbca9c7a677fe4bb7391a8ea Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 24 Oct 2016 15:32:43 -0400 Subject: [PATCH] added task manager system job support --- awx/main/scheduler/__init__.py | 23 +++++++++-------------- awx/main/scheduler/dependency_graph.py | 23 ++++++++++++++++++++--- awx/main/scheduler/partial.py | 21 +++++++++++++++++++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e704b3ef8a..67c0c81a6a 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -24,6 +24,7 @@ from awx.main.scheduler.partial import ( InventoryUpdateDict, InventoryUpdateLatestDict, InventorySourceDict, + SystemJobDict, ) # Celery @@ -37,16 +38,16 @@ class Scheduler(): self.capacity_total = 200 self.capacity_used = 0 - def _get_tasks_with_status(self, status_list): + def get_tasks(self): + status_list = ('pending', 'waiting', 'running') - graph_jobs = JobDict.filter_partial(status=status_list) + jobs = JobDict.filter_partial(status=status_list) ''' graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(**kv)] - graph_inventory_updates = [iu for iu in - InventoryUpdate.objects.filter(**kv)] ''' - graph_inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) - graph_project_updates = ProjectUpdateDict.filter_partial(status=status_list) + inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) + project_updates = ProjectUpdateDict.filter_partial(status=status_list) + system_jobs = SystemJobDict.filter_partial(status=status_list) ''' graph_system_jobs = [sj for sj in SystemJob.objects.filter(**kv)] @@ -57,14 +58,10 @@ class Scheduler(): graph_workflow_jobs, key=lambda task: task.created) ''' - all_actions = sorted(graph_jobs + graph_project_updates + graph_inventory_updates, + all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs, key=lambda task: task['created']) return all_actions - def get_tasks(self): - RELEVANT_JOBS = ('pending', 'waiting', 'running') - return self._get_tasks_with_status(RELEVANT_JOBS) - # TODO: Consider a database query for this logic def get_latest_project_update_tasks(self, all_sorted_tasks): project_ids = Set() @@ -239,9 +236,7 @@ class Scheduler(): map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates) def process_inventory_sources(self, inventory_id_sources): - #map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources) - for inventory_id, inventory_sources in inventory_id_sources: - self.graph.add_inventory_sources(inventory_id, inventory_sources) + map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources) def process_dependencies(self, dependent_task, dependency_tasks): for task in dependency_tasks: diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 26b7518cf1..e71cd6f0a7 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -1,11 +1,17 @@ from datetime import timedelta from django.utils.timezone import now as tz_now -from awx.main.scheduler.partial import JobDict, ProjectUpdateDict, InventoryUpdateDict +from awx.main.scheduler.partial import ( + JobDict, + ProjectUpdateDict, + InventoryUpdateDict, + SystemJobDict, +) class DependencyGraph(object): PROJECT_UPDATES = 'project_updates' INVENTORY_UPDATES = 'inventory_updates' JOB_TEMPLATE_JOBS = 'job_template_jobs' + SYSTEM_JOB = 'system_job' INVENTORY_SOURCE_UPDATES = 'inventory_source_updates' LATEST_PROJECT_UPDATES = 'latest_project_updates' @@ -23,6 +29,8 @@ class DependencyGraph(object): self.data[self.JOB_TEMPLATE_JOBS] = {} # inventory_source_id -> True / False self.data[self.INVENTORY_SOURCE_UPDATES] = {} + # True / False + self.data[self.SYSTEM_JOB] = True # project_id -> latest ProjectUpdateLatestDict self.data[self.LATEST_PROJECT_UPDATES] = {} @@ -112,6 +120,9 @@ class DependencyGraph(object): return False + def mark_system_job(self): + self.data[self.SYSTEM_JOB] = False + def mark_project_update(self, job): self.data[self.PROJECT_UPDATES][job['project_id']] = False @@ -141,6 +152,9 @@ class DependencyGraph(object): return True return False + def can_system_job_run(self): + return self.data[self.SYSTEM_JOB] + def is_job_blocked(self, job): if type(job) is ProjectUpdateDict: return not self.can_project_update_run(job) @@ -148,6 +162,8 @@ class DependencyGraph(object): return not self.can_inventory_update_run(job['inventory_source_id']) elif type(job) is JobDict: return not self.can_job_run(job) + elif type(job) is SystemJobDict: + return not self.can_system_job_run() def add_job(self, job): if type(job) is ProjectUpdateDict: @@ -157,8 +173,9 @@ class DependencyGraph(object): self.mark_inventory_source_update(job['inventory_source_id']) elif type(job) is JobDict: self.mark_job_template_job(job) + elif type(job) is SystemJobDict: + self.mark_system_job() def add_jobs(self, jobs): - for j in jobs: - self.add_job(j) + map(lambda j: self.add_job(j), jobs) diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py index a1870ccf4f..dddbf763e7 100644 --- a/awx/main/scheduler/partial.py +++ b/awx/main/scheduler/partial.py @@ -5,6 +5,7 @@ from awx.main.models import ( ProjectUpdate, InventoryUpdate, InventorySource, + SystemJob, ) class PartialModelDict(object): @@ -171,3 +172,23 @@ class InventorySourceDict(PartialModelDict): 'update_on_launch': True, } return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] + +class SystemJobDict(PartialModelDict): + FIELDS = ( + 'id', 'created', 'status', + ) + model = SystemJob + + def get_job_type_str(self): + return 'system_job' + + def task_impact(self): + return 20 + + @classmethod + def filter_partial(cls, status=[]): + kv = { + 'status__in': status + } + return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] +