mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 14:57:39 -02:30
inventory updates running correctly
This commit is contained in:
@@ -8,7 +8,7 @@ from sets import Set
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.db import transaction, connection
|
||||
from django.db.utils import DatabaseError
|
||||
|
||||
# AWX
|
||||
@@ -20,8 +20,10 @@ from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||
from awx.main.scheduler.partial import (
|
||||
JobDict,
|
||||
ProjectUpdateDict,
|
||||
InventoryUpdateDict,
|
||||
ProjectUpdateLatestDict,
|
||||
InventoryUpdateDict,
|
||||
InventoryUpdateLatestDict,
|
||||
InventorySourceDict,
|
||||
)
|
||||
|
||||
# Celery
|
||||
@@ -72,11 +74,34 @@ class Scheduler():
|
||||
|
||||
return ProjectUpdateLatestDict.filter_partial(list(project_ids))
|
||||
|
||||
# TODO: Consider a database query for this logic
|
||||
def get_latest_inventory_update_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = Set()
|
||||
for task in all_sorted_tasks:
|
||||
if type(task) == JobDict:
|
||||
inventory_ids.add(task['inventory_id'])
|
||||
|
||||
return InventoryUpdateLatestDict.filter_partial(list(inventory_ids))
|
||||
|
||||
|
||||
def get_running_workflow_jobs(self):
|
||||
graph_workflow_jobs = [wf for wf in
|
||||
WorkflowJob.objects.filter(status='running')]
|
||||
return graph_workflow_jobs
|
||||
|
||||
# TODO: Consider a database query for this logic
|
||||
def get_inventory_source_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = Set()
|
||||
results = []
|
||||
for task in all_sorted_tasks:
|
||||
if type(task) is JobDict:
|
||||
inventory_ids.add(task['inventory_id'])
|
||||
|
||||
for inventory_id in inventory_ids:
|
||||
results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id)))
|
||||
|
||||
return results
|
||||
|
||||
def spawn_workflow_graph_jobs(self, workflow_jobs):
|
||||
# TODO: Consider using transaction.atomic
|
||||
for workflow_job in workflow_jobs:
|
||||
@@ -134,8 +159,6 @@ class Scheduler():
|
||||
def start_task(self, task, dependent_tasks=[]):
|
||||
from awx.main.tasks import handle_work_error, handle_work_success
|
||||
|
||||
#print("start_task() <%s, %s> with deps %s" % (task.get_job_type_str(), task['id'], dependent_tasks))
|
||||
|
||||
# TODO: spawn inventory and project updates
|
||||
task_actual = {
|
||||
'type':task.get_job_type_str(),
|
||||
@@ -148,10 +171,8 @@ class Scheduler():
|
||||
|
||||
job_obj = task.get_full()
|
||||
job_obj.status = 'waiting'
|
||||
job_obj.save()
|
||||
|
||||
#print("For real, starting job <%s, %s>" % (type(job_obj), job_obj.id))
|
||||
start_status = job_obj.start(error_callback=error_handler, success_callback=success_handler)
|
||||
(start_status, opts) = job_obj.pre_start()
|
||||
if not start_status:
|
||||
job_obj.status = 'failed'
|
||||
if job_obj.job_explanation:
|
||||
@@ -163,6 +184,8 @@ class Scheduler():
|
||||
|
||||
self.consume_capacity(task)
|
||||
|
||||
connection.on_commit(lambda: job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler))
|
||||
|
||||
def process_runnable_tasks(self, runnable_tasks):
|
||||
for i, task in enumerate(runnable_tasks):
|
||||
# TODO: maybe batch process new tasks.
|
||||
@@ -179,10 +202,20 @@ class Scheduler():
|
||||
dep.save()
|
||||
|
||||
project_task = ProjectUpdateDict.get_partial(dep.id)
|
||||
#waiting_tasks.insert(waiting_tasks.index(task), dep)
|
||||
|
||||
return project_task
|
||||
|
||||
def create_inventory_update(self, task, inventory_source_task):
|
||||
dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency')
|
||||
|
||||
dep.created = task['created'] - timedelta(seconds=2)
|
||||
dep.status = 'waiting'
|
||||
dep.save()
|
||||
|
||||
inventory_task = InventoryUpdateDict.get_partial(dep.id)
|
||||
|
||||
return inventory_task
|
||||
|
||||
def generate_dependencies(self, task):
|
||||
dependencies = []
|
||||
# TODO: What if the project is null ?
|
||||
@@ -191,12 +224,24 @@ class Scheduler():
|
||||
self.graph.should_update_related_project(task):
|
||||
project_task = self.create_project_update(task)
|
||||
dependencies.append(project_task)
|
||||
# Inventory created 2 seconds behind
|
||||
# Inventory created 2 seconds behind job
|
||||
|
||||
for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']):
|
||||
if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']):
|
||||
inventory_task = self.create_inventory_update(task, inventory_source_task)
|
||||
dependencies.append(inventory_task)
|
||||
return dependencies
|
||||
|
||||
def process_latest_project_updates(self, latest_project_updates):
|
||||
for task in latest_project_updates:
|
||||
self.graph.add_latest_project_update(task)
|
||||
map(lambda task: self.graph.add_latest_project_update(task), latest_project_updates)
|
||||
|
||||
def process_latest_inventory_updates(self, latest_inventory_updates):
|
||||
map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates)
|
||||
|
||||
def process_inventory_sources(self, inventory_id_sources):
|
||||
#map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources)
|
||||
for inventory_id, inventory_sources in inventory_id_sources:
|
||||
self.graph.add_inventory_sources(inventory_id, inventory_sources)
|
||||
|
||||
def process_dependencies(self, dependent_task, dependency_tasks):
|
||||
for task in dependency_tasks:
|
||||
@@ -205,7 +250,6 @@ class Scheduler():
|
||||
if not self.graph.is_job_blocked(task):
|
||||
self.graph.add_job(task)
|
||||
if not self.would_exceed_capacity(task):
|
||||
#print("process_dependencies() going to run project update <%s, %s>" % (task['id'], task['project_id']))
|
||||
self.start_task(task, [dependent_task])
|
||||
else:
|
||||
self.graph.add_job(task)
|
||||
@@ -214,7 +258,6 @@ class Scheduler():
|
||||
for task in pending_tasks:
|
||||
|
||||
if not self.graph.is_job_blocked(task):
|
||||
#print("process_pending_tasks() generating deps for job <%s, %s, %s>" % (task['id'], task['project_id'], task.model))
|
||||
dependencies = self.generate_dependencies(task)
|
||||
self.process_dependencies(task, dependencies)
|
||||
|
||||
@@ -222,7 +265,6 @@ class Scheduler():
|
||||
if not self.graph.is_job_blocked(task):
|
||||
self.graph.add_job(task)
|
||||
if not self.would_exceed_capacity(task):
|
||||
#print("Starting the original task <%s, %s>" % (task.get_job_type_str(), task['id']))
|
||||
self.start_task(task)
|
||||
else:
|
||||
self.graph.add_job(task)
|
||||
@@ -272,7 +314,6 @@ class Scheduler():
|
||||
|
||||
def consume_capacity(self, task):
|
||||
self.capacity_used += task.task_impact()
|
||||
#print("Capacity used %s vs total %s" % (self.capacity_used, self.capacity_total))
|
||||
|
||||
def get_remaining_capacity(self):
|
||||
return (self.capacity_total - self.capacity_used)
|
||||
@@ -320,6 +361,12 @@ class Scheduler():
|
||||
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||
self.process_latest_project_updates(latest_project_updates)
|
||||
|
||||
latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
|
||||
self.process_latest_inventory_updates(latest_inventory_updates)
|
||||
|
||||
inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks)
|
||||
self.process_inventory_sources(inventory_id_sources)
|
||||
|
||||
self.process_tasks(all_sorted_tasks)
|
||||
|
||||
#print("Finished schedule()")
|
||||
|
||||
Reference in New Issue
Block a user