diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 9f0605f487..422c0134da 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -167,8 +167,7 @@ def rebuild_graph(message): new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks) # Check running tasks and make sure they are active in celery - if settings.DEBUG: - print("Active celery tasks: " + str(active_tasks)) + print("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): # NOTE: Pull status again and make sure it didn't finish in the meantime? @@ -176,16 +175,13 @@ def rebuild_graph(message): task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.save() running_tasks.pop(running_tasks.index(task)) - if settings.DEBUG: - print("Task %s appears orphaned... marking as failed" % task) + print("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks for task in new_tasks: - if settings.DEBUG: - print("Checking dependencies for: %s" % str(task)) + print("Checking dependencies for: %s" % str(task)) task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario - if settings.DEBUG: - print("New dependencies: %s" % str(task_dependencies)) + print("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task time_delt = len(task_dependencies) - task_dependencies.index(dep) @@ -200,24 +196,21 @@ def rebuild_graph(message): # Rebuild graph graph = SimpleDAG() for task in running_tasks: - if settings.DEBUG: - print("Adding running task: %s to graph" % str(task)) + print("Adding running task: %s to graph" % str(task)) graph.add_node(task) - if settings.DEBUG: - print("Waiting Tasks: %s" % str(waiting_tasks)) + print("Waiting Tasks: %s" % str(waiting_tasks)) for wait_task in waiting_tasks: node_dependencies = [] for node in graph: if wait_task.is_blocked_by(node['node_object']): - if settings.DEBUG: - print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object'])) + print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object'])) node_dependencies.append(node['node_object']) graph.add_node(wait_task) for dependency in node_dependencies: graph.add_edge(wait_task, dependency) - if settings.DEBUG: - print("Graph Edges: %s" % str(graph.edges)) - graph.generate_graphviz_plot() + #if settings.DEBUG: + print("Graph Edges: %s" % str(graph.edges)) + graph.generate_graphviz_plot() return graph def process_graph(graph, task_capacity): @@ -227,12 +220,11 @@ def process_graph(graph, task_capacity): running_impact = sum([t['node_object'].task_impact for t in running_nodes]) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact - if settings.DEBUG: - print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), - str(task_capacity), - str(running_impact), - str(remaining_volume))) - print("Ready Nodes: %s" % str(ready_nodes)) + print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), + str(task_capacity), + str(running_impact), + str(remaining_volume))) + print("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] node_args = task_node['metadata'] @@ -248,8 +240,7 @@ def process_graph(graph, task_capacity): continue remaining_volume -= impact running_impact += impact - if settings.DEBUG: - print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) + print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): ''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks ''' @@ -265,8 +256,7 @@ def run_taskmanager(command_port): command_context = zmq.Context() command_socket = command_context.socket(zmq.REP) command_socket.bind(command_port) - if settings.DEBUG: - print("Listening on %s" % command_port) + print("Listening on %s" % command_port) last_rebuild = datetime.datetime.fromtimestamp(0) while True: try: @@ -276,8 +266,7 @@ def run_taskmanager(command_port): message = None if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60: if message is not None and 'pause' in message: - if settings.DEBUG: - print("Pause command received: %s" % str(message)) + print("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) if not paused and graph is not None: