Readability cleanup.

This commit is contained in:
Luke Sneeringer
2014-10-27 14:03:39 -05:00
parent f4efbfc95a
commit 842086eef7

View File

@@ -68,10 +68,15 @@ class SimpleDAG(object):
rankdir = LR rankdir = LR
""" """
for n in self.nodes: for n in self.nodes:
doc += "%s [color = %s]\n" % (short_string_obj(n['node_object']), "red" if n['node_object'].status == 'running' else "black") doc += "%s [color = %s]\n" % (
short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black",
)
for from_node, to_node in self.edges: for from_node, to_node in self.edges:
doc += "%s -> %s;\n" % (short_string_obj(self.nodes[from_node]['node_object']), doc += "%s -> %s;\n" % (
short_string_obj(self.nodes[to_node]['node_object'])) short_string_obj(self.nodes[from_node]['node_object']),
short_string_obj(self.nodes[to_node]['node_object']),
)
doc += "}\n" doc += "}\n"
gv_file = open('/tmp/graph.gv', 'w') gv_file = open('/tmp/graph.gv', 'w')
gv_file.write(doc) gv_file.write(doc)
@@ -131,19 +136,28 @@ class SimpleDAG(object):
return leafs return leafs
def get_tasks(): def get_tasks():
''' Fetch all Tower tasks that are relevant to the task management system ''' """Fetch all Tower tasks that are relevant to the task management
# TODO: Replace this when we can grab all objects in a sane way system.
graph_jobs = [j for j in Job.objects.filter(status__in=('pending', 'waiting', 'running'))] """
graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('pending', 'waiting', 'running'))] RELEVANT_JOBS = ('pending', 'waiting', 'running')
graph_project_updates = [pu for pu in ProjectUpdate.objects.filter(status__in=('pending', 'waiting', 'running'))] # TODO: Replace this when we can grab all objects in a sane way.
graph_system_jobs = [sj for sj in SystemJob.objects.filter(status__in=('pending', 'waiting', 'running'))] graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)]
all_actions = sorted(graph_jobs + graph_inventory_updates + graph_project_updates + graph_system_jobs, key=lambda task: task.created) graph_inventory_updates = [iu for iu in
InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)]
graph_project_updates = [pu for pu in
ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)]
graph_system_jobs = [sj for sj in
SystemJob.objects.filter(status__in=RELEVANT_JOBS)]
all_actions = sorted(graph_jobs + graph_inventory_updates +
graph_project_updates + graph_system_jobs,
key=lambda task: task.created)
return all_actions return all_actions
def rebuild_graph(message): def rebuild_graph(message):
''' Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, """Regenerate the task graph by refreshing known tasks from Tower, purging
and creatingdependencies for new tasks before generating directed edge relationships between those tasks ''' orphaned running tasks, and creating dependencies for new tasks before
generating directed edge relationships between those tasks.
"""
inspector = inspect() inspector = inspect()
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
active_task_queues = inspector.active() active_task_queues = inspector.active()
@@ -162,7 +176,8 @@ def rebuild_graph(message):
else: else:
if settings.DEBUG: if settings.DEBUG:
print("Could not communicate with celery!") print("Could not communicate with celery!")
# TODO: Something needs to be done here to signal to the system as a whole that celery appears to be down # TODO: Something needs to be done here to signal to the system
# as a whole that celery appears to be down.
if not hasattr(settings, 'CELERY_UNIT_TEST'): if not hasattr(settings, 'CELERY_UNIT_TEST'):
return None return None
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
@@ -172,10 +187,15 @@ def rebuild_graph(message):
# Check running tasks and make sure they are active in celery # Check running tasks and make sure they are active in celery
print("Active celery tasks: " + str(active_tasks)) print("Active celery tasks: " + str(active_tasks))
for task in list(running_tasks): for task in list(running_tasks):
if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): if (task.celery_task_id not in active_tasks and
# NOTE: Pull status again and make sure it didn't finish in the meantime? not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
# NOTE: Pull status again and make sure it didn't finish in
# the meantime?
task.status = 'failed' task.status = 'failed'
task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.job_explanation += ''.join((
'Task was marked as running in Tower but was not present in',
'Celery so it has been marked as failed',
))
task.save() task.save()
task.socketio_emit_status("failed") task.socketio_emit_status("failed")
running_tasks.pop(running_tasks.index(task)) running_tasks.pop(running_tasks.index(task))
@@ -184,10 +204,14 @@ def rebuild_graph(message):
# Create and process dependencies for new tasks # Create and process dependencies for new tasks
for task in new_tasks: for task in new_tasks:
print("Checking dependencies for: %s" % str(task)) print("Checking dependencies for: %s" % str(task))
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario #TODO: other 'new' tasks? Need to investigate this scenario
task_dependencies = task.generate_dependencies(running_tasks +
waiting_tasks)
print("New dependencies: %s" % str(task_dependencies)) print("New dependencies: %s" % str(task_dependencies))
for dep in task_dependencies: for dep in task_dependencies:
# We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task # We recalculate the created time for the moment to ensure the
# dependencies are always sorted in the right order relative to
# the dependent task.
time_delt = len(task_dependencies) - task_dependencies.index(dep) time_delt = len(task_dependencies) - task_dependencies.index(dep)
dep.created = task.created - datetime.timedelta(seconds=1+time_delt) dep.created = task.created - datetime.timedelta(seconds=1+time_delt)
dep.status = 'waiting' dep.status = 'waiting'
@@ -214,16 +238,18 @@ def rebuild_graph(message):
return graph return graph
def process_graph(graph, task_capacity): def process_graph(graph, task_capacity):
''' Given a task dependency graph, start and manage tasks given their priority and weight ''' """Given a task dependency graph, start and manage tasks given their
priority and weight.
"""
leaf_nodes = graph.get_leaf_nodes() leaf_nodes = graph.get_leaf_nodes()
running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes)
running_impact = sum([t['node_object'].task_impact for t in running_nodes]) running_impact = sum([t['node_object'].task_impact for t in running_nodes])
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
remaining_volume = task_capacity - running_impact remaining_volume = task_capacity - running_impact
print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), print('Running Nodes: %s; Capacity: %s; Running Impact: %s; '
str(task_capacity), 'Remaining Capacity: %s' %
str(running_impact), (str(running_nodes), str(task_capacity),
str(remaining_volume))) str(running_impact), str(remaining_volume)))
print("Ready Nodes: %s" % str(ready_nodes)) print("Ready Nodes: %s" % str(ready_nodes))
for task_node in ready_nodes: for task_node in ready_nodes:
node_obj = task_node['node_object'] node_obj = task_node['node_object']
@@ -232,10 +258,13 @@ def process_graph(graph, task_capacity):
node_is_job = graph.get_node_type(node_obj) == 'job' node_is_job = graph.get_node_type(node_obj) == 'job'
if impact <= remaining_volume or running_impact == 0: if impact <= remaining_volume or running_impact == 0:
node_dependencies = graph.get_dependents(node_obj) node_dependencies = graph.get_dependents(node_obj)
if graph.get_node_type(node_obj) == 'job': # Allow other tasks to continue if a job fails, even if they are other jobs # Allow other tasks to continue if a job fails, even if they are
# other jobs.
if graph.get_node_type(node_obj) == 'job':
node_dependencies = [] node_dependencies = []
dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \
[{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in node_dependencies] [{'type': graph.get_node_type(n['node_object']),
'id': n['node_object'].id} for n in node_dependencies]
error_handler = handle_work_error.s(subtasks=dependent_nodes) error_handler = handle_work_error.s(subtasks=dependent_nodes)
start_status = node_obj.start(error_callback=error_handler) start_status = node_obj.start(error_callback=error_handler)
if not start_status: if not start_status:
@@ -248,7 +277,8 @@ def process_graph(graph, task_capacity):
continue continue
remaining_volume -= impact remaining_volume -= impact
running_impact += impact running_impact += impact
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' %
(str(node_obj), str(impact), str(remaining_volume)))
def run_taskmanager(command_port): def run_taskmanager(command_port):
"""Receive task start and finish signals to rebuild a dependency graph """Receive task start and finish signals to rebuild a dependency graph
@@ -283,15 +313,17 @@ def run_taskmanager(command_port):
last_rebuild = datetime.datetime.now() last_rebuild = datetime.datetime.now()
time.sleep(0.1) time.sleep(0.1)
class Command(NoArgsCommand):
'''
Tower Task Management System
This daemon is designed to reside between our tasks and celery and provide a mechanism
for understanding the relationship between those tasks and their dependencies. It also
actively prevents situations in which Tower can get blocked because it doesn't have an
understanding of what is progressing through celery.
'''
class Command(NoArgsCommand):
"""Tower Task Management System
This daemon is designed to reside between our tasks and celery and
provide a mechanism for understanding the relationship between those tasks
and their dependencies.
It also actively prevents situations in which Tower can get blocked
because it doesn't have an understanding of what is progressing through
celery.
"""
help = 'Launch the Tower task management system' help = 'Launch the Tower task management system'
def init_logging(self): def init_logging(self):