mirror of
https://github.com/ansible/awx.git
synced 2026-02-16 02:30:01 -03:30
added task manager system job support
This commit is contained in:
@@ -24,6 +24,7 @@ from awx.main.scheduler.partial import (
|
|||||||
InventoryUpdateDict,
|
InventoryUpdateDict,
|
||||||
InventoryUpdateLatestDict,
|
InventoryUpdateLatestDict,
|
||||||
InventorySourceDict,
|
InventorySourceDict,
|
||||||
|
SystemJobDict,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
@@ -37,16 +38,16 @@ class Scheduler():
|
|||||||
self.capacity_total = 200
|
self.capacity_total = 200
|
||||||
self.capacity_used = 0
|
self.capacity_used = 0
|
||||||
|
|
||||||
def _get_tasks_with_status(self, status_list):
|
def get_tasks(self):
|
||||||
|
status_list = ('pending', 'waiting', 'running')
|
||||||
|
|
||||||
graph_jobs = JobDict.filter_partial(status=status_list)
|
jobs = JobDict.filter_partial(status=status_list)
|
||||||
'''
|
'''
|
||||||
graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(**kv)]
|
graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(**kv)]
|
||||||
graph_inventory_updates = [iu for iu in
|
|
||||||
InventoryUpdate.objects.filter(**kv)]
|
|
||||||
'''
|
'''
|
||||||
graph_inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
|
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
|
||||||
graph_project_updates = ProjectUpdateDict.filter_partial(status=status_list)
|
project_updates = ProjectUpdateDict.filter_partial(status=status_list)
|
||||||
|
system_jobs = SystemJobDict.filter_partial(status=status_list)
|
||||||
'''
|
'''
|
||||||
graph_system_jobs = [sj for sj in
|
graph_system_jobs = [sj for sj in
|
||||||
SystemJob.objects.filter(**kv)]
|
SystemJob.objects.filter(**kv)]
|
||||||
@@ -57,14 +58,10 @@ class Scheduler():
|
|||||||
graph_workflow_jobs,
|
graph_workflow_jobs,
|
||||||
key=lambda task: task.created)
|
key=lambda task: task.created)
|
||||||
'''
|
'''
|
||||||
all_actions = sorted(graph_jobs + graph_project_updates + graph_inventory_updates,
|
all_actions = sorted(jobs + project_updates + inventory_updates + system_jobs,
|
||||||
key=lambda task: task['created'])
|
key=lambda task: task['created'])
|
||||||
return all_actions
|
return all_actions
|
||||||
|
|
||||||
def get_tasks(self):
|
|
||||||
RELEVANT_JOBS = ('pending', 'waiting', 'running')
|
|
||||||
return self._get_tasks_with_status(RELEVANT_JOBS)
|
|
||||||
|
|
||||||
# TODO: Consider a database query for this logic
|
# TODO: Consider a database query for this logic
|
||||||
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
||||||
project_ids = Set()
|
project_ids = Set()
|
||||||
@@ -239,9 +236,7 @@ class Scheduler():
|
|||||||
map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates)
|
map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates)
|
||||||
|
|
||||||
def process_inventory_sources(self, inventory_id_sources):
|
def process_inventory_sources(self, inventory_id_sources):
|
||||||
#map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources)
|
map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources)
|
||||||
for inventory_id, inventory_sources in inventory_id_sources:
|
|
||||||
self.graph.add_inventory_sources(inventory_id, inventory_sources)
|
|
||||||
|
|
||||||
def process_dependencies(self, dependent_task, dependency_tasks):
|
def process_dependencies(self, dependent_task, dependency_tasks):
|
||||||
for task in dependency_tasks:
|
for task in dependency_tasks:
|
||||||
|
|||||||
@@ -1,11 +1,17 @@
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from django.utils.timezone import now as tz_now
|
from django.utils.timezone import now as tz_now
|
||||||
|
|
||||||
from awx.main.scheduler.partial import JobDict, ProjectUpdateDict, InventoryUpdateDict
|
from awx.main.scheduler.partial import (
|
||||||
|
JobDict,
|
||||||
|
ProjectUpdateDict,
|
||||||
|
InventoryUpdateDict,
|
||||||
|
SystemJobDict,
|
||||||
|
)
|
||||||
class DependencyGraph(object):
|
class DependencyGraph(object):
|
||||||
PROJECT_UPDATES = 'project_updates'
|
PROJECT_UPDATES = 'project_updates'
|
||||||
INVENTORY_UPDATES = 'inventory_updates'
|
INVENTORY_UPDATES = 'inventory_updates'
|
||||||
JOB_TEMPLATE_JOBS = 'job_template_jobs'
|
JOB_TEMPLATE_JOBS = 'job_template_jobs'
|
||||||
|
SYSTEM_JOB = 'system_job'
|
||||||
INVENTORY_SOURCE_UPDATES = 'inventory_source_updates'
|
INVENTORY_SOURCE_UPDATES = 'inventory_source_updates'
|
||||||
|
|
||||||
LATEST_PROJECT_UPDATES = 'latest_project_updates'
|
LATEST_PROJECT_UPDATES = 'latest_project_updates'
|
||||||
@@ -23,6 +29,8 @@ class DependencyGraph(object):
|
|||||||
self.data[self.JOB_TEMPLATE_JOBS] = {}
|
self.data[self.JOB_TEMPLATE_JOBS] = {}
|
||||||
# inventory_source_id -> True / False
|
# inventory_source_id -> True / False
|
||||||
self.data[self.INVENTORY_SOURCE_UPDATES] = {}
|
self.data[self.INVENTORY_SOURCE_UPDATES] = {}
|
||||||
|
# True / False
|
||||||
|
self.data[self.SYSTEM_JOB] = True
|
||||||
|
|
||||||
# project_id -> latest ProjectUpdateLatestDict
|
# project_id -> latest ProjectUpdateLatestDict
|
||||||
self.data[self.LATEST_PROJECT_UPDATES] = {}
|
self.data[self.LATEST_PROJECT_UPDATES] = {}
|
||||||
@@ -112,6 +120,9 @@ class DependencyGraph(object):
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def mark_system_job(self):
|
||||||
|
self.data[self.SYSTEM_JOB] = False
|
||||||
|
|
||||||
def mark_project_update(self, job):
|
def mark_project_update(self, job):
|
||||||
self.data[self.PROJECT_UPDATES][job['project_id']] = False
|
self.data[self.PROJECT_UPDATES][job['project_id']] = False
|
||||||
|
|
||||||
@@ -141,6 +152,9 @@ class DependencyGraph(object):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def can_system_job_run(self):
|
||||||
|
return self.data[self.SYSTEM_JOB]
|
||||||
|
|
||||||
def is_job_blocked(self, job):
|
def is_job_blocked(self, job):
|
||||||
if type(job) is ProjectUpdateDict:
|
if type(job) is ProjectUpdateDict:
|
||||||
return not self.can_project_update_run(job)
|
return not self.can_project_update_run(job)
|
||||||
@@ -148,6 +162,8 @@ class DependencyGraph(object):
|
|||||||
return not self.can_inventory_update_run(job['inventory_source_id'])
|
return not self.can_inventory_update_run(job['inventory_source_id'])
|
||||||
elif type(job) is JobDict:
|
elif type(job) is JobDict:
|
||||||
return not self.can_job_run(job)
|
return not self.can_job_run(job)
|
||||||
|
elif type(job) is SystemJobDict:
|
||||||
|
return not self.can_system_job_run()
|
||||||
|
|
||||||
def add_job(self, job):
|
def add_job(self, job):
|
||||||
if type(job) is ProjectUpdateDict:
|
if type(job) is ProjectUpdateDict:
|
||||||
@@ -157,8 +173,9 @@ class DependencyGraph(object):
|
|||||||
self.mark_inventory_source_update(job['inventory_source_id'])
|
self.mark_inventory_source_update(job['inventory_source_id'])
|
||||||
elif type(job) is JobDict:
|
elif type(job) is JobDict:
|
||||||
self.mark_job_template_job(job)
|
self.mark_job_template_job(job)
|
||||||
|
elif type(job) is SystemJobDict:
|
||||||
|
self.mark_system_job()
|
||||||
|
|
||||||
def add_jobs(self, jobs):
|
def add_jobs(self, jobs):
|
||||||
for j in jobs:
|
map(lambda j: self.add_job(j), jobs)
|
||||||
self.add_job(j)
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from awx.main.models import (
|
|||||||
ProjectUpdate,
|
ProjectUpdate,
|
||||||
InventoryUpdate,
|
InventoryUpdate,
|
||||||
InventorySource,
|
InventorySource,
|
||||||
|
SystemJob,
|
||||||
)
|
)
|
||||||
|
|
||||||
class PartialModelDict(object):
|
class PartialModelDict(object):
|
||||||
@@ -171,3 +172,23 @@ class InventorySourceDict(PartialModelDict):
|
|||||||
'update_on_launch': True,
|
'update_on_launch': True,
|
||||||
}
|
}
|
||||||
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
|
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
|
||||||
|
|
||||||
|
class SystemJobDict(PartialModelDict):
|
||||||
|
FIELDS = (
|
||||||
|
'id', 'created', 'status',
|
||||||
|
)
|
||||||
|
model = SystemJob
|
||||||
|
|
||||||
|
def get_job_type_str(self):
|
||||||
|
return 'system_job'
|
||||||
|
|
||||||
|
def task_impact(self):
|
||||||
|
return 20
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def filter_partial(cls, status=[]):
|
||||||
|
kv = {
|
||||||
|
'status__in': status
|
||||||
|
}
|
||||||
|
return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())]
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user