diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 68ec9277b7..0a991fd90c 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -33,9 +33,9 @@ from celery.task.control import inspect class SimpleDAG(object): - def __init__(self, nodes=[], edges=[]): - self.nodes = nodes - self.edges = edges + def __init__(self): + self.nodes = [] + self.edges = [] def __contains__(self, obj): for node in self.nodes: @@ -50,14 +50,27 @@ class SimpleDAG(object): return self.nodes.__iter__() def generate_graphviz_plot(self): + def short_string_obj(obj): + if type(obj) == Job: + type_str = "Job" + elif type(obj) == InventoryUpdate: + type_str = "Inventory" + elif type(obj) == ProjectUpdate: + type_str = "Project" + else: + type_str = "Unknown" + type_str += "-%s" % str(obj.id) + return type_str + doc = """ digraph g { rankdir = LR """ for n in self.nodes: - doc += "%s [color = %s]\n" % (str(n), "red" if n.status == 'running' else "black") - for from, to in self.edges: - doc += "%s -> %s;\n" % (str(self.nodes[from]), str(self.nodes[to])) + doc += "%s [color = %s]\n" % (short_string_obj(n['node_object']), "red" if n['node_object'].status == 'running' else "black") + for from_node, to_node in self.edges: + doc += "%s -> %s;\n" % (short_string_obj(self.nodes[from_node]['node_object']), + short_string_obj(self.nodes[to_node]['node_object'])) doc += "}" gv_file = open('/tmp/graph.gv', 'w') gv_file.write(doc) @@ -69,14 +82,14 @@ class SimpleDAG(object): def add_edge(self, from_obj, to_obj): from_obj_ord = self.find_ord(from_obj) - to_obj_ord = self.find_ord(from_obj) + to_obj_ord = self.find_ord(to_obj) if from_obj_ord is None or to_obj_ord is None: raise LookupError("Object not found") self.edges.append((from_obj_ord, to_obj_ord)) def add_edges(self, edgelist): - for from_obj, to_obj in edgelist: - self.add_edge(from_obj, to_obj) + for edge_pair in edgelist: + self.add_edge(edge_pair[0], edge_pair[1]) def find_ord(self, obj): for idx in range(len(self.nodes)): @@ -95,7 +108,7 @@ class SimpleDAG(object): def get_dependencies(self, obj): antecedents = [] - this_ord = find_ord(self, obj) + this_ord = self.find_ord(obj) for node, dep in self.edges: if node == this_ord: antecedents.append(self.nodes[dep]) @@ -103,18 +116,18 @@ class SimpleDAG(object): def get_dependents(self, obj): decendents = [] - this_ord = find_ord(self, obj) + this_ord = self.find_ord(obj) for node, dep in self.edges: if dep == this_ord: decendents.append(self.nodes[node]) return decendents - def get_leaf_nodes(): + def get_leaf_nodes(self): leafs = [] for n in self.nodes: - if len(self.get_dependencies(n)) < 1: + if len(self.get_dependencies(n['node_object'])) < 1: leafs.append(n) - return n + return leafs def get_tasks(): # TODO: Replace this when we can grab all objects in a sane way @@ -122,22 +135,29 @@ def get_tasks(): graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))] graph_project_updates = [pu for pu in ProjectUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))] all_actions = sorted(graph_jobs + graph_inventory_updates + graph_project_updates, key=lambda task: task.created) + return all_actions def rebuild_graph(message): inspector = inspect() active_task_queues = inspector.active() active_tasks = [] for queue in active_task_queues: - active_tasks += active_task_queues[queue] + active_tasks += [at['id'] for at in active_task_queues[queue]] all_sorted_tasks = get_tasks() + if not len(all_sorted_tasks): + return None + running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks) # Check running tasks and make sure they are active in celery + if settings.DEBUG: + print("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if task.celery_task_id not in active_tasks: + # Pull status again and make sure it didn't finish in the meantime task.status = 'failed' task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.save() @@ -147,45 +167,69 @@ def rebuild_graph(message): # Create and process dependencies for new tasks for task in new_tasks: + if settings.DEBUG: + print("Checking dependencies for: %s" % str(task)) task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario + if settings.DEBUG: + print("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task time_delt = len(task_dependencies) - task_dependencies.index(dep) dep.created = task.created - datetime.timedelta(seconds=1+time_delt) dep.save() waiting_tasks.insert(dep, waiting_tasks.index(task)) + task.status = 'waiting' + task.save() # Rebuild graph graph = SimpleDAG() + print("Graph nodes: " + str(graph.nodes)) for task in running_tasks: + if settings.DEBUG: + print("Adding running task: %s to graph" % str(task)) graph.add_node(task) + if settings.DEBUG: + print("Waiting Tasks: %s" % str(waiting_tasks)) for wait_task in waiting_tasks: node_dependencies = [] for node in graph: - if wait_task.is_blocked_by(node['node_objects']): - node_dependencies.append(node) + if wait_task.is_blocked_by(node['node_object']): + if settings.DEBUG: + print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object'])) + node_dependencies.append(node['node_object']) graph.add_node(wait_task) - graph.add_edges([(wait_task, n) for n in node_dependencies]) + for dependency in node_dependencies: + graph.add_edge(wait_task, dependency) if settings.DEBUG: + print("Graph Edges: %s" % str(graph.edges)) graph.generate_graphviz_plot() return graph def process_graph(graph, task_capacity): leaf_nodes = graph.get_leaf_nodes() - running_nodes = filter(lambda x['node_object'].status == 'running', leaf_nodes) + running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_impact = sum([t['node_object'].task_impact for t in running_nodes]) - ready_nodes = filter(lambda x['node_object'].status != 'running', leaf_nodes) + ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact + if settings.DEBUG: + print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), + str(task_capacity), + str(running_impact), + str(remaining_volume))) + print("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] node_args = task_node['metadata'] impact = node_obj.task_impact if impact <= remaining_volume or running_impact == 0: - dependent_nodes = [{'type': graph.get_node_type(n), 'id': n.id} for n in graph.get_dependents()] + dependent_nodes = [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in graph.get_dependents(node_obj)] error_handler = handle_work_error.s(subtasks=dependent_nodes) start_status = node_obj.start(error_callback=error_handler) + if not start_status: + print("Job didn't start!") remaining_volume -= impact running_impact += impact + print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): paused = False @@ -193,18 +237,22 @@ def run_taskmanager(command_port): command_context = zmq.Context() command_socket = command_context.socket(zmq.REP) command_socket.bind(command_port) - last_rebuild = datetime.datetime.now() + if settings.DEBUG: + print("Listening on %s" % command_port) + last_rebuild = datetime.datetime.fromtimestamp(0) while True: try: message = command_socket.recv_json(flags=zmq.NOBLOCK) command_socket.send("1") - except zmq.core.error.ZMQError,e: + except zmq.error.ZMQError,e: message = None if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60: - if 'pause' in message: + if message is not None and 'pause' in message: + if settings.DEBUG: + print("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) - if not paused: + if not paused and graph is not None: process_graph(graph, task_capacity) last_rebuild = datetime.datetime.now() time.sleep(0.1) diff --git a/awx/main/models/base.py b/awx/main/models/base.py index 329a7284e9..32f1bcd5d9 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -370,7 +370,7 @@ class CommonTask(PrimordialModel): @property def can_start(self): - return bool(self.status == 'new') + return bool(self.status in ('new', 'waiting')) @property def task_impact(self): @@ -403,7 +403,7 @@ class CommonTask(PrimordialModel): opts = dict([(field, kwargs.get(field, '')) for field in needed]) if not all(opts.values()): return False - task_class().apply_async((self.pk, **opts), link_error=error_callback) + task_class().apply_async((self.pk,), opts, link_error=error_callback) return True @property diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index a48b40e7e4..c04ea78885 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -15,6 +15,9 @@ import uuid # PyYAML import yaml +# ZMQ +import zmq + # Django from django.conf import settings from django.db import models @@ -750,6 +753,12 @@ class InventoryUpdate(CommonTask): from awx.main.tasks import RunInventoryUpdate return RunInventoryUpdate + def is_blocked_by(self, obj): + if type(obj) == InventoryUpdate: + if self.inventory_source == obj.inventory_source: + return True + return False + @property def task_impact(self): return 50 @@ -759,5 +768,5 @@ class InventoryUpdate(CommonTask): signal_socket = signal_context.socket(zmq.REQ) signal_socket.connect(settings.TASK_COMMAND_PORT) signal_socket.send_json(dict(task_type="inventory_update", id=self.id, metadata=kwargs)) - self.socket.recv() + signal_socket.recv() return True diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 909ce22f73..760516476f 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -371,7 +371,7 @@ class Job(CommonTask): if type(obj) == InventoryUpdate: if obj.inventory_source in inventory_sources: inventory_sources_found.append(obj.inventory_source) - if not project_found and self.project.scm_update_on_launch:: + if not project_found and self.project.scm_update_on_launch: dependencies.append(self.project.project_updates.create()) if inventory_sources.count(): # and not has_setup_failures? Probably handled as an error scenario in the task runner for source in inventory_sources: @@ -389,7 +389,7 @@ class Job(CommonTask): signal_socket = signal_context.socket(zmq.REQ) signal_socket.connect(settings.TASK_COMMAND_PORT) signal_socket.send_json(dict(task_type="ansible_playbook", id=self.id)) - self.socket.recv() + signal_socket.recv() return True def start(self, error_callback, **kwargs): @@ -408,7 +408,7 @@ class Job(CommonTask): opts = stored_args if not all(opts.values()): return False - task_class().apply_async((self.pk, **opts), link_error=error_callback) + task_class().apply_async((self.pk,), opts, link_error=error_callback) return True class JobHostSummary(BaseModel): diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index aa05a6b69a..f5bf54c990 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -365,6 +365,12 @@ class ProjectUpdate(CommonTask): from awx.main.tasks import RunProjectUpdate return RunProjectUpdate + def is_blocked_by(self, obj): + if type(obj) == ProjectUpdate: + if self.project == obj.project: + return True + return False + @property def task_impact(self): return 20 @@ -374,7 +380,7 @@ class ProjectUpdate(CommonTask): signal_socket = signal_context.socket(zmq.REQ) signal_socket.connect(settings.TASK_COMMAND_PORT) signal_socket.send_json(dict(task_type="project_update", id=self.id, metadata=kwargs)) - self.socket.recv() + signal_socket.recv() return True def _update_parent_instance(self): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e83e465561..ef22e34a0c 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -349,6 +349,8 @@ else: CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" +TASK_COMMAND_PORT = "ipc:///tmp/task_command_receiver.ipc" + # Logging configuration. LOGGING = { 'version': 1,