From 06cc5ffb4a0c10cbeeea44bd4c30f80d3bdb0995 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 12 Mar 2014 14:09:00 -0400 Subject: [PATCH] Docs and setup details for the task manager --- Makefile | 3 +++ awx/main/management/commands/run_task_system.py | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index a00de5492f..b081f6f1ba 100644 --- a/Makefile +++ b/Makefile @@ -120,6 +120,9 @@ celeryd: receiver: $(PYTHON) manage.py run_callback_receiver +taskmanager: + $(PYTHON) manage.py run_task_system + # Run all API unit tests. test: $(PYTHON) manage.py test -v2 main diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index a20eb93e80..a6722e1bc0 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -32,6 +32,7 @@ import zmq from celery.task.control import inspect class SimpleDAG(object): + ''' A simple implementation of a directed acyclic graph ''' def __init__(self): self.nodes = [] @@ -130,6 +131,7 @@ class SimpleDAG(object): return leafs def get_tasks(): + ''' Fetch all Tower tasks that are relevant to the task management system ''' # TODO: Replace this when we can grab all objects in a sane way graph_jobs = [j for j in Job.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))] graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))] @@ -138,6 +140,8 @@ def get_tasks(): return all_actions def rebuild_graph(message): + ''' Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, + and creatingdependencies for new tasks before generating directed edge relationships between those tasks ''' inspector = inspect() active_task_queues = inspector.active() active_tasks = [] @@ -176,11 +180,12 @@ def rebuild_graph(message): # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task time_delt = len(task_dependencies) - task_dependencies.index(dep) dep.created = task.created - datetime.timedelta(seconds=1+time_delt) + dep.status = 'waiting' dep.save() waiting_tasks.insert(waiting_tasks.index(task), dep) task.status = 'waiting' task.save() - + # Rebuild graph graph = SimpleDAG() print("Graph nodes: " + str(graph.nodes)) @@ -206,6 +211,7 @@ def rebuild_graph(message): return graph def process_graph(graph, task_capacity): + ''' Given a task dependency graph, start and manage tasks given their priority and weight ''' leaf_nodes = graph.get_leaf_nodes() running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_impact = sum([t['node_object'].task_impact for t in running_nodes]) @@ -226,12 +232,16 @@ def process_graph(graph, task_capacity): error_handler = handle_work_error.s(subtasks=dependent_nodes) start_status = node_obj.start(error_callback=error_handler) if not start_status: - print("Job didn't start!") + node_obj.status = 'failed' + node_obj.result_traceback += "Task failed pre-start check" + # TODO: Run error handler + continue remaining_volume -= impact running_impact += impact print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): + ''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks ''' paused = False task_capacity = get_system_task_capacity() command_context = zmq.Context()