mirror of
https://github.com/ansible/awx.git
synced 2026-03-07 11:41:08 -03:30
Fix some task runner bugs and round out the implementation
This commit is contained in:
@@ -33,9 +33,9 @@ from celery.task.control import inspect
|
|||||||
|
|
||||||
class SimpleDAG(object):
|
class SimpleDAG(object):
|
||||||
|
|
||||||
def __init__(self, nodes=[], edges=[]):
|
def __init__(self):
|
||||||
self.nodes = nodes
|
self.nodes = []
|
||||||
self.edges = edges
|
self.edges = []
|
||||||
|
|
||||||
def __contains__(self, obj):
|
def __contains__(self, obj):
|
||||||
for node in self.nodes:
|
for node in self.nodes:
|
||||||
@@ -50,14 +50,27 @@ class SimpleDAG(object):
|
|||||||
return self.nodes.__iter__()
|
return self.nodes.__iter__()
|
||||||
|
|
||||||
def generate_graphviz_plot(self):
|
def generate_graphviz_plot(self):
|
||||||
|
def short_string_obj(obj):
|
||||||
|
if type(obj) == Job:
|
||||||
|
type_str = "Job"
|
||||||
|
elif type(obj) == InventoryUpdate:
|
||||||
|
type_str = "Inventory"
|
||||||
|
elif type(obj) == ProjectUpdate:
|
||||||
|
type_str = "Project"
|
||||||
|
else:
|
||||||
|
type_str = "Unknown"
|
||||||
|
type_str += "-%s" % str(obj.id)
|
||||||
|
return type_str
|
||||||
|
|
||||||
doc = """
|
doc = """
|
||||||
digraph g {
|
digraph g {
|
||||||
rankdir = LR
|
rankdir = LR
|
||||||
"""
|
"""
|
||||||
for n in self.nodes:
|
for n in self.nodes:
|
||||||
doc += "%s [color = %s]\n" % (str(n), "red" if n.status == 'running' else "black")
|
doc += "%s [color = %s]\n" % (short_string_obj(n['node_object']), "red" if n['node_object'].status == 'running' else "black")
|
||||||
for from, to in self.edges:
|
for from_node, to_node in self.edges:
|
||||||
doc += "%s -> %s;\n" % (str(self.nodes[from]), str(self.nodes[to]))
|
doc += "%s -> %s;\n" % (short_string_obj(self.nodes[from_node]['node_object']),
|
||||||
|
short_string_obj(self.nodes[to_node]['node_object']))
|
||||||
doc += "}"
|
doc += "}"
|
||||||
gv_file = open('/tmp/graph.gv', 'w')
|
gv_file = open('/tmp/graph.gv', 'w')
|
||||||
gv_file.write(doc)
|
gv_file.write(doc)
|
||||||
@@ -69,14 +82,14 @@ class SimpleDAG(object):
|
|||||||
|
|
||||||
def add_edge(self, from_obj, to_obj):
|
def add_edge(self, from_obj, to_obj):
|
||||||
from_obj_ord = self.find_ord(from_obj)
|
from_obj_ord = self.find_ord(from_obj)
|
||||||
to_obj_ord = self.find_ord(from_obj)
|
to_obj_ord = self.find_ord(to_obj)
|
||||||
if from_obj_ord is None or to_obj_ord is None:
|
if from_obj_ord is None or to_obj_ord is None:
|
||||||
raise LookupError("Object not found")
|
raise LookupError("Object not found")
|
||||||
self.edges.append((from_obj_ord, to_obj_ord))
|
self.edges.append((from_obj_ord, to_obj_ord))
|
||||||
|
|
||||||
def add_edges(self, edgelist):
|
def add_edges(self, edgelist):
|
||||||
for from_obj, to_obj in edgelist:
|
for edge_pair in edgelist:
|
||||||
self.add_edge(from_obj, to_obj)
|
self.add_edge(edge_pair[0], edge_pair[1])
|
||||||
|
|
||||||
def find_ord(self, obj):
|
def find_ord(self, obj):
|
||||||
for idx in range(len(self.nodes)):
|
for idx in range(len(self.nodes)):
|
||||||
@@ -95,7 +108,7 @@ class SimpleDAG(object):
|
|||||||
|
|
||||||
def get_dependencies(self, obj):
|
def get_dependencies(self, obj):
|
||||||
antecedents = []
|
antecedents = []
|
||||||
this_ord = find_ord(self, obj)
|
this_ord = self.find_ord(obj)
|
||||||
for node, dep in self.edges:
|
for node, dep in self.edges:
|
||||||
if node == this_ord:
|
if node == this_ord:
|
||||||
antecedents.append(self.nodes[dep])
|
antecedents.append(self.nodes[dep])
|
||||||
@@ -103,18 +116,18 @@ class SimpleDAG(object):
|
|||||||
|
|
||||||
def get_dependents(self, obj):
|
def get_dependents(self, obj):
|
||||||
decendents = []
|
decendents = []
|
||||||
this_ord = find_ord(self, obj)
|
this_ord = self.find_ord(obj)
|
||||||
for node, dep in self.edges:
|
for node, dep in self.edges:
|
||||||
if dep == this_ord:
|
if dep == this_ord:
|
||||||
decendents.append(self.nodes[node])
|
decendents.append(self.nodes[node])
|
||||||
return decendents
|
return decendents
|
||||||
|
|
||||||
def get_leaf_nodes():
|
def get_leaf_nodes(self):
|
||||||
leafs = []
|
leafs = []
|
||||||
for n in self.nodes:
|
for n in self.nodes:
|
||||||
if len(self.get_dependencies(n)) < 1:
|
if len(self.get_dependencies(n['node_object'])) < 1:
|
||||||
leafs.append(n)
|
leafs.append(n)
|
||||||
return n
|
return leafs
|
||||||
|
|
||||||
def get_tasks():
|
def get_tasks():
|
||||||
# TODO: Replace this when we can grab all objects in a sane way
|
# TODO: Replace this when we can grab all objects in a sane way
|
||||||
@@ -122,22 +135,29 @@ def get_tasks():
|
|||||||
graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))]
|
graph_inventory_updates = [iu for iu in InventoryUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))]
|
||||||
graph_project_updates = [pu for pu in ProjectUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))]
|
graph_project_updates = [pu for pu in ProjectUpdate.objects.filter(status__in=('new', 'waiting', 'pending', 'running'))]
|
||||||
all_actions = sorted(graph_jobs + graph_inventory_updates + graph_project_updates, key=lambda task: task.created)
|
all_actions = sorted(graph_jobs + graph_inventory_updates + graph_project_updates, key=lambda task: task.created)
|
||||||
|
return all_actions
|
||||||
|
|
||||||
def rebuild_graph(message):
|
def rebuild_graph(message):
|
||||||
inspector = inspect()
|
inspector = inspect()
|
||||||
active_task_queues = inspector.active()
|
active_task_queues = inspector.active()
|
||||||
active_tasks = []
|
active_tasks = []
|
||||||
for queue in active_task_queues:
|
for queue in active_task_queues:
|
||||||
active_tasks += active_task_queues[queue]
|
active_tasks += [at['id'] for at in active_task_queues[queue]]
|
||||||
|
|
||||||
all_sorted_tasks = get_tasks()
|
all_sorted_tasks = get_tasks()
|
||||||
|
if not len(all_sorted_tasks):
|
||||||
|
return None
|
||||||
|
|
||||||
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
||||||
waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks)
|
waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks)
|
||||||
new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks)
|
new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks)
|
||||||
|
|
||||||
# Check running tasks and make sure they are active in celery
|
# Check running tasks and make sure they are active in celery
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Active celery tasks: " + str(active_tasks))
|
||||||
for task in list(running_tasks):
|
for task in list(running_tasks):
|
||||||
if task.celery_task_id not in active_tasks:
|
if task.celery_task_id not in active_tasks:
|
||||||
|
# Pull status again and make sure it didn't finish in the meantime
|
||||||
task.status = 'failed'
|
task.status = 'failed'
|
||||||
task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
|
task.result_traceback += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
|
||||||
task.save()
|
task.save()
|
||||||
@@ -147,45 +167,69 @@ def rebuild_graph(message):
|
|||||||
|
|
||||||
# Create and process dependencies for new tasks
|
# Create and process dependencies for new tasks
|
||||||
for task in new_tasks:
|
for task in new_tasks:
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Checking dependencies for: %s" % str(task))
|
||||||
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario
|
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("New dependencies: %s" % str(task_dependencies))
|
||||||
for dep in task_dependencies:
|
for dep in task_dependencies:
|
||||||
# We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task
|
# We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task
|
||||||
time_delt = len(task_dependencies) - task_dependencies.index(dep)
|
time_delt = len(task_dependencies) - task_dependencies.index(dep)
|
||||||
dep.created = task.created - datetime.timedelta(seconds=1+time_delt)
|
dep.created = task.created - datetime.timedelta(seconds=1+time_delt)
|
||||||
dep.save()
|
dep.save()
|
||||||
waiting_tasks.insert(dep, waiting_tasks.index(task))
|
waiting_tasks.insert(dep, waiting_tasks.index(task))
|
||||||
|
task.status = 'waiting'
|
||||||
|
task.save()
|
||||||
|
|
||||||
# Rebuild graph
|
# Rebuild graph
|
||||||
graph = SimpleDAG()
|
graph = SimpleDAG()
|
||||||
|
print("Graph nodes: " + str(graph.nodes))
|
||||||
for task in running_tasks:
|
for task in running_tasks:
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Adding running task: %s to graph" % str(task))
|
||||||
graph.add_node(task)
|
graph.add_node(task)
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Waiting Tasks: %s" % str(waiting_tasks))
|
||||||
for wait_task in waiting_tasks:
|
for wait_task in waiting_tasks:
|
||||||
node_dependencies = []
|
node_dependencies = []
|
||||||
for node in graph:
|
for node in graph:
|
||||||
if wait_task.is_blocked_by(node['node_objects']):
|
if wait_task.is_blocked_by(node['node_object']):
|
||||||
node_dependencies.append(node)
|
if settings.DEBUG:
|
||||||
|
print("Waiting task %s is blocked by %s" % (str(wait_task), node['node_object']))
|
||||||
|
node_dependencies.append(node['node_object'])
|
||||||
graph.add_node(wait_task)
|
graph.add_node(wait_task)
|
||||||
graph.add_edges([(wait_task, n) for n in node_dependencies])
|
for dependency in node_dependencies:
|
||||||
|
graph.add_edge(wait_task, dependency)
|
||||||
if settings.DEBUG:
|
if settings.DEBUG:
|
||||||
|
print("Graph Edges: %s" % str(graph.edges))
|
||||||
graph.generate_graphviz_plot()
|
graph.generate_graphviz_plot()
|
||||||
return graph
|
return graph
|
||||||
|
|
||||||
def process_graph(graph, task_capacity):
|
def process_graph(graph, task_capacity):
|
||||||
leaf_nodes = graph.get_leaf_nodes()
|
leaf_nodes = graph.get_leaf_nodes()
|
||||||
running_nodes = filter(lambda x['node_object'].status == 'running', leaf_nodes)
|
running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes)
|
||||||
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
||||||
ready_nodes = filter(lambda x['node_object'].status != 'running', leaf_nodes)
|
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
|
||||||
remaining_volume = task_capacity - running_impact
|
remaining_volume = task_capacity - running_impact
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes),
|
||||||
|
str(task_capacity),
|
||||||
|
str(running_impact),
|
||||||
|
str(remaining_volume)))
|
||||||
|
print("Ready Nodes: %s" % str(ready_nodes))
|
||||||
for task_node in ready_nodes:
|
for task_node in ready_nodes:
|
||||||
node_obj = task_node['node_object']
|
node_obj = task_node['node_object']
|
||||||
node_args = task_node['metadata']
|
node_args = task_node['metadata']
|
||||||
impact = node_obj.task_impact
|
impact = node_obj.task_impact
|
||||||
if impact <= remaining_volume or running_impact == 0:
|
if impact <= remaining_volume or running_impact == 0:
|
||||||
dependent_nodes = [{'type': graph.get_node_type(n), 'id': n.id} for n in graph.get_dependents()]
|
dependent_nodes = [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in graph.get_dependents(node_obj)]
|
||||||
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
error_handler = handle_work_error.s(subtasks=dependent_nodes)
|
||||||
start_status = node_obj.start(error_callback=error_handler)
|
start_status = node_obj.start(error_callback=error_handler)
|
||||||
|
if not start_status:
|
||||||
|
print("Job didn't start!")
|
||||||
remaining_volume -= impact
|
remaining_volume -= impact
|
||||||
running_impact += impact
|
running_impact += impact
|
||||||
|
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
|
||||||
|
|
||||||
def run_taskmanager(command_port):
|
def run_taskmanager(command_port):
|
||||||
paused = False
|
paused = False
|
||||||
@@ -193,18 +237,22 @@ def run_taskmanager(command_port):
|
|||||||
command_context = zmq.Context()
|
command_context = zmq.Context()
|
||||||
command_socket = command_context.socket(zmq.REP)
|
command_socket = command_context.socket(zmq.REP)
|
||||||
command_socket.bind(command_port)
|
command_socket.bind(command_port)
|
||||||
last_rebuild = datetime.datetime.now()
|
if settings.DEBUG:
|
||||||
|
print("Listening on %s" % command_port)
|
||||||
|
last_rebuild = datetime.datetime.fromtimestamp(0)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = command_socket.recv_json(flags=zmq.NOBLOCK)
|
message = command_socket.recv_json(flags=zmq.NOBLOCK)
|
||||||
command_socket.send("1")
|
command_socket.send("1")
|
||||||
except zmq.core.error.ZMQError,e:
|
except zmq.error.ZMQError,e:
|
||||||
message = None
|
message = None
|
||||||
if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60:
|
if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60:
|
||||||
if 'pause' in message:
|
if message is not None and 'pause' in message:
|
||||||
|
if settings.DEBUG:
|
||||||
|
print("Pause command received: %s" % str(message))
|
||||||
paused = message['pause']
|
paused = message['pause']
|
||||||
graph = rebuild_graph(message)
|
graph = rebuild_graph(message)
|
||||||
if not paused:
|
if not paused and graph is not None:
|
||||||
process_graph(graph, task_capacity)
|
process_graph(graph, task_capacity)
|
||||||
last_rebuild = datetime.datetime.now()
|
last_rebuild = datetime.datetime.now()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|||||||
@@ -370,7 +370,7 @@ class CommonTask(PrimordialModel):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def can_start(self):
|
def can_start(self):
|
||||||
return bool(self.status == 'new')
|
return bool(self.status in ('new', 'waiting'))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def task_impact(self):
|
def task_impact(self):
|
||||||
@@ -403,7 +403,7 @@ class CommonTask(PrimordialModel):
|
|||||||
opts = dict([(field, kwargs.get(field, '')) for field in needed])
|
opts = dict([(field, kwargs.get(field, '')) for field in needed])
|
||||||
if not all(opts.values()):
|
if not all(opts.values()):
|
||||||
return False
|
return False
|
||||||
task_class().apply_async((self.pk, **opts), link_error=error_callback)
|
task_class().apply_async((self.pk,), opts, link_error=error_callback)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
@@ -15,6 +15,9 @@ import uuid
|
|||||||
# PyYAML
|
# PyYAML
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
# ZMQ
|
||||||
|
import zmq
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import models
|
from django.db import models
|
||||||
@@ -750,6 +753,12 @@ class InventoryUpdate(CommonTask):
|
|||||||
from awx.main.tasks import RunInventoryUpdate
|
from awx.main.tasks import RunInventoryUpdate
|
||||||
return RunInventoryUpdate
|
return RunInventoryUpdate
|
||||||
|
|
||||||
|
def is_blocked_by(self, obj):
|
||||||
|
if type(obj) == InventoryUpdate:
|
||||||
|
if self.inventory_source == obj.inventory_source:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def task_impact(self):
|
def task_impact(self):
|
||||||
return 50
|
return 50
|
||||||
@@ -759,5 +768,5 @@ class InventoryUpdate(CommonTask):
|
|||||||
signal_socket = signal_context.socket(zmq.REQ)
|
signal_socket = signal_context.socket(zmq.REQ)
|
||||||
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
||||||
signal_socket.send_json(dict(task_type="inventory_update", id=self.id, metadata=kwargs))
|
signal_socket.send_json(dict(task_type="inventory_update", id=self.id, metadata=kwargs))
|
||||||
self.socket.recv()
|
signal_socket.recv()
|
||||||
return True
|
return True
|
||||||
|
|||||||
@@ -371,7 +371,7 @@ class Job(CommonTask):
|
|||||||
if type(obj) == InventoryUpdate:
|
if type(obj) == InventoryUpdate:
|
||||||
if obj.inventory_source in inventory_sources:
|
if obj.inventory_source in inventory_sources:
|
||||||
inventory_sources_found.append(obj.inventory_source)
|
inventory_sources_found.append(obj.inventory_source)
|
||||||
if not project_found and self.project.scm_update_on_launch::
|
if not project_found and self.project.scm_update_on_launch:
|
||||||
dependencies.append(self.project.project_updates.create())
|
dependencies.append(self.project.project_updates.create())
|
||||||
if inventory_sources.count(): # and not has_setup_failures? Probably handled as an error scenario in the task runner
|
if inventory_sources.count(): # and not has_setup_failures? Probably handled as an error scenario in the task runner
|
||||||
for source in inventory_sources:
|
for source in inventory_sources:
|
||||||
@@ -389,7 +389,7 @@ class Job(CommonTask):
|
|||||||
signal_socket = signal_context.socket(zmq.REQ)
|
signal_socket = signal_context.socket(zmq.REQ)
|
||||||
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
||||||
signal_socket.send_json(dict(task_type="ansible_playbook", id=self.id))
|
signal_socket.send_json(dict(task_type="ansible_playbook", id=self.id))
|
||||||
self.socket.recv()
|
signal_socket.recv()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def start(self, error_callback, **kwargs):
|
def start(self, error_callback, **kwargs):
|
||||||
@@ -408,7 +408,7 @@ class Job(CommonTask):
|
|||||||
opts = stored_args
|
opts = stored_args
|
||||||
if not all(opts.values()):
|
if not all(opts.values()):
|
||||||
return False
|
return False
|
||||||
task_class().apply_async((self.pk, **opts), link_error=error_callback)
|
task_class().apply_async((self.pk,), opts, link_error=error_callback)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
class JobHostSummary(BaseModel):
|
class JobHostSummary(BaseModel):
|
||||||
|
|||||||
@@ -365,6 +365,12 @@ class ProjectUpdate(CommonTask):
|
|||||||
from awx.main.tasks import RunProjectUpdate
|
from awx.main.tasks import RunProjectUpdate
|
||||||
return RunProjectUpdate
|
return RunProjectUpdate
|
||||||
|
|
||||||
|
def is_blocked_by(self, obj):
|
||||||
|
if type(obj) == ProjectUpdate:
|
||||||
|
if self.project == obj.project:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def task_impact(self):
|
def task_impact(self):
|
||||||
return 20
|
return 20
|
||||||
@@ -374,7 +380,7 @@ class ProjectUpdate(CommonTask):
|
|||||||
signal_socket = signal_context.socket(zmq.REQ)
|
signal_socket = signal_context.socket(zmq.REQ)
|
||||||
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
signal_socket.connect(settings.TASK_COMMAND_PORT)
|
||||||
signal_socket.send_json(dict(task_type="project_update", id=self.id, metadata=kwargs))
|
signal_socket.send_json(dict(task_type="project_update", id=self.id, metadata=kwargs))
|
||||||
self.socket.recv()
|
signal_socket.recv()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _update_parent_instance(self):
|
def _update_parent_instance(self):
|
||||||
|
|||||||
@@ -349,6 +349,8 @@ else:
|
|||||||
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556"
|
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556"
|
||||||
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc"
|
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc"
|
||||||
|
|
||||||
|
TASK_COMMAND_PORT = "ipc:///tmp/task_command_receiver.ipc"
|
||||||
|
|
||||||
# Logging configuration.
|
# Logging configuration.
|
||||||
LOGGING = {
|
LOGGING = {
|
||||||
'version': 1,
|
'version': 1,
|
||||||
|
|||||||
Reference in New Issue
Block a user