From 842086eef7466512c8221fa8a8f73518255d3048 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Mon, 27 Oct 2014 14:03:39 -0500 Subject: [PATCH] Readability cleanup. --- .../management/commands/run_task_system.py | 102 ++++++++++++------ 1 file changed, 67 insertions(+), 35 deletions(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 72aad3d0cb..46cd8ed06e 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -68,10 +68,15 @@ class SimpleDAG(object): rankdir = LR """ for n in self.nodes: - doc += "%s [color = %s]\n" % (short_string_obj(n['node_object']), "red" if n['node_object'].status == 'running' else "black") + doc += "%s [color = %s]\n" % ( + short_string_obj(n['node_object']), + "red" if n['node_object'].status == 'running' else "black", + ) for from_node, to_node in self.edges: - doc += "%s -> %s;\n" % (short_string_obj(self.nodes[from_node]['node_object']), - short_string_obj(self.nodes[to_node]['node_object'])) + doc += "%s -> %s;\n" % ( + short_string_obj(self.nodes[from_node]['node_object']), + short_string_obj(self.nodes[to_node]['node_object']), + ) doc += "}\n" gv_file = open('/tmp/graph.gv', 'w') gv_file.write(doc) @@ -131,19 +136,28 @@ class SimpleDAG(object): return leafs def get_tasks(): - ''' Fetch all Tower tasks that are relevant to the task management system ''' - # TODO: Replace this when we can grab all objects in a sane way - graph_jobs = [j for j in Job.objects.filter(status__in=('pending', 'waiting', 'running'))] - graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('pending', 'waiting', 'running'))] - graph_project_updates = [pu for pu in ProjectUpdate.objects.filter(status__in=('pending', 'waiting', 'running'))] - graph_system_jobs = [sj for sj in SystemJob.objects.filter(status__in=('pending', 'waiting', 'running'))] - all_actions = sorted(graph_jobs + graph_inventory_updates + graph_project_updates + graph_system_jobs, key=lambda task: task.created) + """Fetch all Tower tasks that are relevant to the task management + system. + """ + RELEVANT_JOBS = ('pending', 'waiting', 'running') + # TODO: Replace this when we can grab all objects in a sane way. + graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] + graph_inventory_updates = [iu for iu in + InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_project_updates = [pu for pu in + ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_system_jobs = [sj for sj in + SystemJob.objects.filter(status__in=RELEVANT_JOBS)] + all_actions = sorted(graph_jobs + graph_inventory_updates + + graph_project_updates + graph_system_jobs, + key=lambda task: task.created) return all_actions def rebuild_graph(message): - ''' Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, - and creatingdependencies for new tasks before generating directed edge relationships between those tasks ''' - + """Regenerate the task graph by refreshing known tasks from Tower, purging + orphaned running tasks, and creating dependencies for new tasks before + generating directed edge relationships between those tasks. + """ inspector = inspect() if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() @@ -162,7 +176,8 @@ def rebuild_graph(message): else: if settings.DEBUG: print("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system as a whole that celery appears to be down + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. if not hasattr(settings, 'CELERY_UNIT_TEST'): return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) @@ -172,10 +187,15 @@ def rebuild_graph(message): # Check running tasks and make sure they are active in celery print("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): - if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - # NOTE: Pull status again and make sure it didn't finish in the meantime? + if (task.celery_task_id not in active_tasks and + not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + # NOTE: Pull status again and make sure it didn't finish in + # the meantime? task.status = 'failed' - task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" + task.job_explanation += ''.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery so it has been marked as failed', + )) task.save() task.socketio_emit_status("failed") running_tasks.pop(running_tasks.index(task)) @@ -184,10 +204,14 @@ def rebuild_graph(message): # Create and process dependencies for new tasks for task in new_tasks: print("Checking dependencies for: %s" % str(task)) - task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario + #TODO: other 'new' tasks? Need to investigate this scenario + task_dependencies = task.generate_dependencies(running_tasks + + waiting_tasks) print("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: - # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task + # We recalculate the created time for the moment to ensure the + # dependencies are always sorted in the right order relative to + # the dependent task. time_delt = len(task_dependencies) - task_dependencies.index(dep) dep.created = task.created - datetime.timedelta(seconds=1+time_delt) dep.status = 'waiting' @@ -214,16 +238,18 @@ def rebuild_graph(message): return graph def process_graph(graph, task_capacity): - ''' Given a task dependency graph, start and manage tasks given their priority and weight ''' + """Given a task dependency graph, start and manage tasks given their + priority and weight. + """ leaf_nodes = graph.get_leaf_nodes() running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_impact = sum([t['node_object'].task_impact for t in running_nodes]) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact - print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), - str(task_capacity), - str(running_impact), - str(remaining_volume))) + print('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) print("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] @@ -232,10 +258,13 @@ def process_graph(graph, task_capacity): node_is_job = graph.get_node_type(node_obj) == 'job' if impact <= remaining_volume or running_impact == 0: node_dependencies = graph.get_dependents(node_obj) - if graph.get_node_type(node_obj) == 'job': # Allow other tasks to continue if a job fails, even if they are other jobs + # Allow other tasks to continue if a job fails, even if they are + # other jobs. + if graph.get_node_type(node_obj) == 'job': node_dependencies = [] dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ - [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in node_dependencies] + [{'type': graph.get_node_type(n['node_object']), + 'id': n['node_object'].id} for n in node_dependencies] error_handler = handle_work_error.s(subtasks=dependent_nodes) start_status = node_obj.start(error_callback=error_handler) if not start_status: @@ -248,7 +277,8 @@ def process_graph(graph, task_capacity): continue remaining_volume -= impact running_impact += impact - print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) + print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): """Receive task start and finish signals to rebuild a dependency graph @@ -283,15 +313,17 @@ def run_taskmanager(command_port): last_rebuild = datetime.datetime.now() time.sleep(0.1) -class Command(NoArgsCommand): - ''' - Tower Task Management System - This daemon is designed to reside between our tasks and celery and provide a mechanism - for understanding the relationship between those tasks and their dependencies. It also - actively prevents situations in which Tower can get blocked because it doesn't have an - understanding of what is progressing through celery. - ''' +class Command(NoArgsCommand): + """Tower Task Management System + This daemon is designed to reside between our tasks and celery and + provide a mechanism for understanding the relationship between those tasks + and their dependencies. + + It also actively prevents situations in which Tower can get blocked + because it doesn't have an understanding of what is progressing through + celery. + """ help = 'Launch the Tower task management system' def init_logging(self):