Emit log messages by default on the task runner

This commit is contained in:
Matthew Jones
2014-03-20 11:46:58 -04:00
parent f1ac2ff899
commit 6f11502528

View File

@@ -167,8 +167,7 @@ def rebuild_graph(message):
new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks)
# Check running tasks and make sure they are active in celery # Check running tasks and make sure they are active in celery
if settings.DEBUG: print("Active celery tasks: " + str(active_tasks))
print("Active celery tasks: " + str(active_tasks))
for task in list(running_tasks): for task in list(running_tasks):
if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
# NOTE: Pull status again and make sure it didn't finish in the meantime? # NOTE: Pull status again and make sure it didn't finish in the meantime?
@@ -176,16 +175,13 @@ def rebuild_graph(message):
task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
task.save() task.save()
running_tasks.pop(running_tasks.index(task)) running_tasks.pop(running_tasks.index(task))
if settings.DEBUG: print("Task %s appears orphaned... marking as failed" % task)
print("Task %s appears orphaned... marking as failed" % task)
# Create and process dependencies for new tasks # Create and process dependencies for new tasks
for task in new_tasks: for task in new_tasks:
if settings.DEBUG: print("Checking dependencies for: %s" % str(task))
print("Checking dependencies for: %s" % str(task))
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario
if settings.DEBUG: print("New dependencies: %s" % str(task_dependencies))
print("New dependencies: %s" % str(task_dependencies))
for dep in task_dependencies: for dep in task_dependencies:
# We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task
time_delt = len(task_dependencies) - task_dependencies.index(dep) time_delt = len(task_dependencies) - task_dependencies.index(dep)
@@ -200,24 +196,21 @@ def rebuild_graph(message):
# Rebuild graph # Rebuild graph
graph = SimpleDAG() graph = SimpleDAG()
for task in running_tasks: for task in running_tasks:
if settings.DEBUG: print("Adding running task: %s to graph" % str(task))
print("Adding running task: %s to graph" % str(task))
graph.add_node(task) graph.add_node(task)
if settings.DEBUG: print("Waiting Tasks: %s" % str(waiting_tasks))
print("Waiting Tasks: %s" % str(waiting_tasks))
for wait_task in waiting_tasks: for wait_task in waiting_tasks:
node_dependencies = [] node_dependencies = []
for node in graph: for node in graph:
if wait_task.is_blocked_by(node['node_object']): if wait_task.is_blocked_by(node['node_object']):
if settings.DEBUG: print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object']))
print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object']))
node_dependencies.append(node['node_object']) node_dependencies.append(node['node_object'])
graph.add_node(wait_task) graph.add_node(wait_task)
for dependency in node_dependencies: for dependency in node_dependencies:
graph.add_edge(wait_task, dependency) graph.add_edge(wait_task, dependency)
if settings.DEBUG: #if settings.DEBUG:
print("Graph Edges: %s" % str(graph.edges)) print("Graph Edges: %s" % str(graph.edges))
graph.generate_graphviz_plot() graph.generate_graphviz_plot()
return graph return graph
def process_graph(graph, task_capacity): def process_graph(graph, task_capacity):
@@ -227,12 +220,11 @@ def process_graph(graph, task_capacity):
running_impact = sum([t['node_object'].task_impact for t in running_nodes]) running_impact = sum([t['node_object'].task_impact for t in running_nodes])
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
remaining_volume = task_capacity - running_impact remaining_volume = task_capacity - running_impact
if settings.DEBUG: print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes),
print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), str(task_capacity),
str(task_capacity), str(running_impact),
str(running_impact), str(remaining_volume)))
str(remaining_volume))) print("Ready Nodes: %s" % str(ready_nodes))
print("Ready Nodes: %s" % str(ready_nodes))
for task_node in ready_nodes: for task_node in ready_nodes:
node_obj = task_node['node_object'] node_obj = task_node['node_object']
node_args = task_node['metadata'] node_args = task_node['metadata']
@@ -248,8 +240,7 @@ def process_graph(graph, task_capacity):
continue continue
remaining_volume -= impact remaining_volume -= impact
running_impact += impact running_impact += impact
if settings.DEBUG: print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
def run_taskmanager(command_port): def run_taskmanager(command_port):
''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks ''' ''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks '''
@@ -265,8 +256,7 @@ def run_taskmanager(command_port):
command_context = zmq.Context() command_context = zmq.Context()
command_socket = command_context.socket(zmq.REP) command_socket = command_context.socket(zmq.REP)
command_socket.bind(command_port) command_socket.bind(command_port)
if settings.DEBUG: print("Listening on %s" % command_port)
print("Listening on %s" % command_port)
last_rebuild = datetime.datetime.fromtimestamp(0) last_rebuild = datetime.datetime.fromtimestamp(0)
while True: while True:
try: try:
@@ -276,8 +266,7 @@ def run_taskmanager(command_port):
message = None message = None
if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60: if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60:
if message is not None and 'pause' in message: if message is not None and 'pause' in message:
if settings.DEBUG: print("Pause command received: %s" % str(message))
print("Pause command received: %s" % str(message))
paused = message['pause'] paused = message['pause']
graph = rebuild_graph(message) graph = rebuild_graph(message)
if not paused and graph is not None: if not paused and graph is not None: