diff --git a/awx/api/views.py b/awx/api/views.py index 551bb814e9..19e3fd425f 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2290,6 +2290,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView): new_job = obj.create_unified_job(**kv) result = new_job.signal_start(**kv) + if not result: data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start) new_job.delete() diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 855491f08c..b29b2e4d88 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -7,6 +7,10 @@ import datetime import logging import signal import time +import traceback + +from kombu import Connection, Exchange, Queue, Producer +from kombu.mixins import ConsumerMixin # Django from django.conf import settings @@ -17,6 +21,8 @@ from awx.main.models import * # noqa from awx.main.queue import FifoQueue from awx.main.tasks import handle_work_error, handle_work_success from awx.main.utils import get_system_task_capacity +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG # Celery from celery.task.control import inspect @@ -25,208 +31,6 @@ logger = logging.getLogger('awx.main.commands.run_task_system') queue = FifoQueue('tower_task_manager') -class SimpleDAG(object): - ''' A simple implementation of a directed acyclic graph ''' - - def __init__(self): - self.nodes = [] - self.edges = [] - - def __contains__(self, obj): - for node in self.nodes: - if node['node_object'] == obj: - return True - return False - - def __len__(self): - return len(self.nodes) - - def __iter__(self): - return self.nodes.__iter__() - - def generate_graphviz_plot(self): - def short_string_obj(obj): - if type(obj) == Job: - type_str = "Job" - if type(obj) == AdHocCommand: - type_str = "AdHocCommand" - elif type(obj) == InventoryUpdate: - type_str = "Inventory" - elif type(obj) == ProjectUpdate: - type_str = "Project" - elif type(obj) == WorkflowJob: - type_str = "Workflow" - else: - type_str = "Unknown" - type_str += "%s" % str(obj.id) - return type_str - - doc = """ - digraph g { - rankdir = LR - """ - for n in self.nodes: - doc += "%s [color = %s]\n" % ( - short_string_obj(n['node_object']), - "red" if n['node_object'].status == 'running' else "black", - ) - for from_node, to_node, label in self.edges: - doc += "%s -> %s [ label=\"%s\" ];\n" % ( - short_string_obj(self.nodes[from_node]['node_object']), - short_string_obj(self.nodes[to_node]['node_object']), - label, - ) - doc += "}\n" - gv_file = open('/tmp/graph.gv', 'w') - gv_file.write(doc) - gv_file.close() - - def add_node(self, obj, metadata=None): - if self.find_ord(obj) is None: - self.nodes.append(dict(node_object=obj, metadata=metadata)) - - def add_edge(self, from_obj, to_obj, label=None): - from_obj_ord = self.find_ord(from_obj) - to_obj_ord = self.find_ord(to_obj) - if from_obj_ord is None or to_obj_ord is None: - raise LookupError("Object not found") - self.edges.append((from_obj_ord, to_obj_ord, label)) - - def add_edges(self, edgelist): - for edge_pair in edgelist: - self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) - - def find_ord(self, obj): - for idx in range(len(self.nodes)): - if obj == self.nodes[idx]['node_object']: - return idx - return None - - def get_node_type(self, obj): - if type(obj) == Job: - return "job" - elif type(obj) == AdHocCommand: - return "ad_hoc_command" - elif type(obj) == InventoryUpdate: - return "inventory_update" - elif type(obj) == ProjectUpdate: - return "project_update" - elif type(obj) == SystemJob: - return "system_job" - elif type(obj) == WorkflowJob: - return "workflow_job" - return "unknown" - - def get_dependencies(self, obj, label=None): - antecedents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if node == this_ord and lbl == label: - antecedents.append(self.nodes[dep]) - else: - if node == this_ord: - antecedents.append(self.nodes[dep]) - return antecedents - - def get_dependents(self, obj, label=None): - decendents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if dep == this_ord and lbl == label: - decendents.append(self.nodes[node]) - else: - if dep == this_ord: - decendents.append(self.nodes[node]) - return decendents - - def get_leaf_nodes(self): - leafs = [] - for n in self.nodes: - if len(self.get_dependencies(n['node_object'])) < 1: - leafs.append(n) - return leafs - - def get_root_nodes(self): - roots = [] - for n in self.nodes: - if len(self.get_dependents(n['node_object'])) < 1: - roots.append(n) - return roots - -class WorkflowDAG(SimpleDAG): - def __init__(self, workflow_job=None): - super(WorkflowDAG, self).__init__() - if workflow_job: - self._init_graph(workflow_job) - - def _init_graph(self, workflow_job): - workflow_nodes = workflow_job.workflow_job_nodes.all() - for workflow_node in workflow_nodes: - self.add_node(workflow_node) - - for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: - for workflow_node in workflow_nodes: - related_nodes = getattr(workflow_node, node_type).all() - for related_node in related_nodes: - self.add_edge(workflow_node, related_node, node_type) - - def bfs_nodes_to_run(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - nodes_found = [] - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - nodes_found.append(n) - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - continue - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return [n['node_object'] for n in nodes_found] - - def is_workflow_done(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - return False - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - return False - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return True - def get_tasks(): """Fetch all Tower tasks that are relevant to the task management system. @@ -247,6 +51,7 @@ def get_tasks(): graph_project_updates + graph_system_jobs + graph_workflow_jobs, key=lambda task: task.created) + print("Returning all_actions %s" % len(all_actions)) return all_actions def get_running_workflow_jobs(): @@ -277,14 +82,16 @@ def do_spawn_workflow_jobs(): #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) -def rebuild_graph(message): +def rebuild_graph(): """Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, and creating dependencies for new tasks before generating directed edge relationships between those tasks. """ + ''' # Sanity check: Only do this on the primary node. if Instance.objects.my_role() == 'secondary': return None + ''' inspector = inspect() if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): @@ -297,6 +104,7 @@ def rebuild_graph(message): all_sorted_tasks = get_tasks() if not len(all_sorted_tasks): + print("All sorted task len is not? <%s, %s>" % (len(all_sorted_tasks), all_sorted_tasks)) return None active_tasks = [] @@ -417,53 +225,132 @@ def process_graph(graph, task_capacity): 'Remaining Capacity: %s' % (str(node_obj), str(impact), str(remaining_volume))) -def run_taskmanager(): - """Receive task start and finish signals to rebuild a dependency graph - and manage the actual running of tasks. - """ - def shutdown_handler(): - def _handler(signum, frame): - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) - return _handler - signal.signal(signal.SIGINT, shutdown_handler()) - signal.signal(signal.SIGTERM, shutdown_handler()) - paused = False - task_capacity = get_system_task_capacity() - last_rebuild = datetime.datetime.fromtimestamp(0) - # Attempt to pull messages off of the task system queue into perpetuity. - # - # A quick explanation of what is happening here: - # The popping messages off the queue bit is something of a sham. We remove - # the messages from the queue and then immediately throw them away. The - # `rebuild_graph` function, while it takes the message as an argument, - # ignores it. - # - # What actually happens is that we just check the database every 10 seconds - # to see what the task dependency graph looks like, and go do that. This - # is the job of the `rebuild_graph` function. - # - # There is some placeholder here: we may choose to actually use the message - # in the future. - while True: - # Pop a message off the queue. - # (If the queue is empty, None will be returned.) - message = queue.pop() +#logger = logging.getLogger('awx.main.scheduler') - # Parse out the message appropriately, rebuilding our graph if - # appropriate. - if (datetime.datetime.now() - last_rebuild).seconds > 10: - if message is not None and 'pause' in message: - logger.info("Pause command received: %s" % str(message)) - paused = message['pause'] - graph = rebuild_graph(message) - if not paused and graph is not None: - process_graph(graph, task_capacity) - last_rebuild = datetime.datetime.now() - time.sleep(0.1) +class CallbackBrokerWorker(ConsumerMixin): + def __init__(self, connection): + self.connection = connection + def get_consumers(self, Consumer, channel): + print("get_consumers() OK") + return [Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, + Exchange(settings.SCHEDULER_QUEUE, type='topic'), + routing_key='scheduler.job.launch'),], + accept=['json'], + callbacks=[self.process_job_launch,]), + Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, + Exchange(settings.SCHEDULER_QUEUE, type='topic'), + routing_key='scheduler.job.complete'),], + accept=['json'], + callbacks=[self.process_job_complete,] + )] + + def schedule(self): + task_capacity = get_system_task_capacity() + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + + def process_job_msg(self, body, message): + try: + if settings.DEBUG: + logger.info("Body: {}".format(body)) + logger.info("Message: {}".format(message)) + + if "msg_type" not in body: + raise Exception("Payload does not have a msg_type") + if "job_id" not in body: + raise Exception("Payload does not have a job_id") + + func = getattr(self, "process_%s" % body['msg_type'], None) + if not func: + raise AttributeError("No processor for message type %s" % body['msg_type']) + func(body) + + # Raised by processors when msg isn't in the expected form. + except LookupError as e: + logger.error(e) + except AttributeError as e: + logger.error(e) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + finally: + message.ack() + self.schedule() + + def process_job_launch(self, body, message): + print("process_job_launch()") + if "job_id" not in body: + raise KeyError("Payload does not contain job_id") + + ''' + Wait for job to exist. + The job is created in a transaction then the message is created, but + the transaction may not have completed. + + FIXME: We could generate the message in a Django signal handler. + OR, we could call an explicit commit in the view and then send the + message. + + ''' + retries = 10 + retry = 0 + while not UnifiedJob.objects.filter(id=body['job_id']).exists(): + time.sleep(0.3) + + if retry >= retries: + logger.error("Failed to process 'job_launch' message for job %d" % body['job_id']) + # ack the message so we don't build up the queue. + # + # The job can still be chosen to run during tower startup or + # when another job is started or completes + message.ack() + return + retry += 1 + + job = UnifiedJob.objects.get(id=body['job_id']) + + self.schedule() + message.ack() + + def process_job_complete(self, body, message): + print("process_job_complete()") + if "job_id" not in body: + raise KeyError("Payload does not contain job_id") + + # TODO: use list of finished status from jobs.py or unified_jobs.py + finished_status = ['successful', 'error', 'failed', 'completed'] + q = UnifiedJob.objects.filter(id=body['job_id']) + + # Ensure that the job is updated in the database before we call to + # schedule the next job. + retries = 10 + retry = 0 + while True: + # Job not found, most likely deleted. That's fine + if not q.exists(): + logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % body['job_id']) + break + + job = q[0] + if job.status in finished_status: + break + + time.sleep(0.3) + + if retry >= retries: + logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) + message.ack() + return + retry += 1 + + message.ack() + self.schedule() + class Command(NoArgsCommand): """Tower Task Management System This daemon is designed to reside between our tasks and celery and @@ -477,7 +364,11 @@ class Command(NoArgsCommand): help = 'Launch the Tower task management system' def handle_noargs(self, **options): - try: - run_taskmanager() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = CallbackBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + print('Terminating Task Management System') + + diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 950b6fc99b..6806ff7d16 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -852,6 +852,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") + from kombu import Connection, Exchange, Producer + connection = Connection(settings.BROKER_URL) + exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') + producer = Producer(connection) + producer.publish({ 'msg_type': 'job_launch', 'job_id': self.id }, + serializer='json', + compression='bzip2', + exchange=exchange, + declare=[exchange], + routing_key='scheduler.job.launch') + # Each type of unified job has a different Task class; get the # appropirate one. # task_type = get_type_for_model(self) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py new file mode 100644 index 0000000000..f04c60159a --- /dev/null +++ b/awx/main/scheduler/dag_simple.py @@ -0,0 +1,133 @@ + +from awx.main.models import * # noqa + +class SimpleDAG(object): + ''' A simple implementation of a directed acyclic graph ''' + + def __init__(self): + self.nodes = [] + self.edges = [] + + def __contains__(self, obj): + for node in self.nodes: + if node['node_object'] == obj: + return True + return False + + def __len__(self): + return len(self.nodes) + + def __iter__(self): + return self.nodes.__iter__() + + def generate_graphviz_plot(self): + def short_string_obj(obj): + if type(obj) == Job: + type_str = "Job" + if type(obj) == AdHocCommand: + type_str = "AdHocCommand" + elif type(obj) == InventoryUpdate: + type_str = "Inventory" + elif type(obj) == ProjectUpdate: + type_str = "Project" + elif type(obj) == WorkflowJob: + type_str = "Workflow" + else: + type_str = "Unknown" + type_str += "%s" % str(obj.id) + return type_str + + doc = """ + digraph g { + rankdir = LR + """ + for n in self.nodes: + doc += "%s [color = %s]\n" % ( + short_string_obj(n['node_object']), + "red" if n['node_object'].status == 'running' else "black", + ) + for from_node, to_node, label in self.edges: + doc += "%s -> %s [ label=\"%s\" ];\n" % ( + short_string_obj(self.nodes[from_node]['node_object']), + short_string_obj(self.nodes[to_node]['node_object']), + label, + ) + doc += "}\n" + gv_file = open('/tmp/graph.gv', 'w') + gv_file.write(doc) + gv_file.close() + + def add_node(self, obj, metadata=None): + if self.find_ord(obj) is None: + self.nodes.append(dict(node_object=obj, metadata=metadata)) + + def add_edge(self, from_obj, to_obj, label=None): + from_obj_ord = self.find_ord(from_obj) + to_obj_ord = self.find_ord(to_obj) + if from_obj_ord is None or to_obj_ord is None: + raise LookupError("Object not found") + self.edges.append((from_obj_ord, to_obj_ord, label)) + + def add_edges(self, edgelist): + for edge_pair in edgelist: + self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) + + def find_ord(self, obj): + for idx in range(len(self.nodes)): + if obj == self.nodes[idx]['node_object']: + return idx + return None + + def get_node_type(self, obj): + if type(obj) == Job: + return "job" + elif type(obj) == AdHocCommand: + return "ad_hoc_command" + elif type(obj) == InventoryUpdate: + return "inventory_update" + elif type(obj) == ProjectUpdate: + return "project_update" + elif type(obj) == SystemJob: + return "system_job" + elif type(obj) == WorkflowJob: + return "workflow_job" + return "unknown" + + def get_dependencies(self, obj, label=None): + antecedents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if node == this_ord and lbl == label: + antecedents.append(self.nodes[dep]) + else: + if node == this_ord: + antecedents.append(self.nodes[dep]) + return antecedents + + def get_dependents(self, obj, label=None): + decendents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if dep == this_ord and lbl == label: + decendents.append(self.nodes[node]) + else: + if dep == this_ord: + decendents.append(self.nodes[node]) + return decendents + + def get_leaf_nodes(self): + leafs = [] + for n in self.nodes: + if len(self.get_dependencies(n['node_object'])) < 1: + leafs.append(n) + return leafs + + def get_root_nodes(self): + roots = [] + for n in self.nodes: + if len(self.get_dependents(n['node_object'])) < 1: + roots.append(n) + return roots + diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py new file mode 100644 index 0000000000..1a8269c064 --- /dev/null +++ b/awx/main/scheduler/dag_workflow.py @@ -0,0 +1,74 @@ +from dag_simple import SimpleDAG + +class WorkflowDAG(SimpleDAG): + def __init__(self, workflow_job=None): + super(WorkflowDAG, self).__init__() + if workflow_job: + self._init_graph(workflow_job) + + def _init_graph(self, workflow_job): + workflow_nodes = workflow_job.workflow_job_nodes.all() + for workflow_node in workflow_nodes: + self.add_node(workflow_node) + + for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: + for workflow_node in workflow_nodes: + related_nodes = getattr(workflow_node, node_type).all() + for related_node in related_nodes: + self.add_edge(workflow_node, related_node, node_type) + + def bfs_nodes_to_run(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + nodes_found = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + nodes_found.append(n) + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + continue + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return [n['node_object'] for n in nodes_found] + + def is_workflow_done(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + return False + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + return False + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return True + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 097dca517d..86552b6404 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -31,6 +31,9 @@ except: # Pexpect import pexpect +# Kombu +from kombu import Connection, Exchange, Queue, Producer + # Celery from celery import Task, task from celery.signals import celeryd_init @@ -202,6 +205,18 @@ def _send_notification_templates(instance, status_str): for n in all_notification_templates], job_id=instance.id) + +def _send_job_complete_msg(instance): + connection = Connection(settings.BROKER_URL) + exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') + producer = Producer(connection) + producer.publish({ 'job_id': instance.id, 'msg_type': 'job_complete' }, + serializer='json', + compression='bzip2', + exchange=exchange, + declare=[exchange], + routing_key='scheduler.job.complete') + @task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -210,6 +225,8 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') + _send_job_complete_msg(instance) + @task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % @@ -238,6 +255,9 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') + + if first_instance: + _send_job_complete_msg(first_instance) @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 31c8b3b8f3..8a65d7d322 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -342,6 +342,7 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), + #Queue('scheduler', Exchange('scheduler'), routing_key='scheduler.job.#'), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) @@ -737,6 +738,7 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT CALLBACK_QUEUE = "callback_tasks" +SCHEDULER_QUEUE = "scheduler" TASK_COMMAND_PORT = 6559 @@ -1042,6 +1044,10 @@ LOGGING = { 'handlers': ['console', 'file', 'task_system'], 'propagate': False }, + 'awx.main.scheduler': { + 'handlers': ['console', 'file', 'task_system'], + 'propagate': False + }, 'awx.main.commands.run_fact_cache_receiver': { 'handlers': ['console', 'file', 'fact_receiver'], 'propagate': False