diff --git a/Makefile b/Makefile index fd9d87cd2e..52c30c4bb7 100644 --- a/Makefile +++ b/Makefile @@ -357,7 +357,6 @@ server_noattach: tmux rename-window 'Tower' tmux select-window -t tower:0 tmux split-window -v 'exec make celeryd' - tmux split-window -h 'exec make taskmanager' tmux new-window 'exec make receiver' tmux select-window -t tower:1 tmux rename-window 'Extra Services' @@ -397,7 +396,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver @@ -407,16 +406,6 @@ receiver: fi; \ $(PYTHON) manage.py run_callback_receiver -taskmanager: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/tower/bin/activate; \ - fi; \ - if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ - $(PYTHON) manage.py run_task_system; \ - else \ - while true; do sleep 2; done; \ - fi - socketservice: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ diff --git a/Procfile b/Procfile index 433417f70b..b8dd37a983 100644 --- a/Procfile +++ b/Procfile @@ -1,7 +1,6 @@ runserver: make runserver celeryd: make celeryd -taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher -flower: make flower \ No newline at end of file +flower: make flower diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py deleted file mode 100644 index b29b2e4d88..0000000000 --- a/awx/main/management/commands/run_task_system.py +++ /dev/null @@ -1,374 +0,0 @@ -#Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import os -import datetime -import logging -import signal -import time -import traceback - -from kombu import Connection, Exchange, Queue, Producer -from kombu.mixins import ConsumerMixin - -# Django -from django.conf import settings -from django.core.management.base import NoArgsCommand - -# AWX -from awx.main.models import * # noqa -from awx.main.queue import FifoQueue -from awx.main.tasks import handle_work_error, handle_work_success -from awx.main.utils import get_system_task_capacity -from awx.main.scheduler.dag_simple import SimpleDAG -from awx.main.scheduler.dag_workflow import WorkflowDAG - -# Celery -from celery.task.control import inspect - -logger = logging.getLogger('awx.main.commands.run_task_system') - -queue = FifoQueue('tower_task_manager') - -def get_tasks(): - """Fetch all Tower tasks that are relevant to the task management - system. - """ - RELEVANT_JOBS = ('pending', 'waiting', 'running') - # TODO: Replace this when we can grab all objects in a sane way. - graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] - graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] - graph_inventory_updates = [iu for iu in - InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_project_updates = [pu for pu in - ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_system_jobs = [sj for sj in - SystemJob.objects.filter(status__in=RELEVANT_JOBS)] - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] - all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + - graph_project_updates + graph_system_jobs + - graph_workflow_jobs, - key=lambda task: task.created) - print("Returning all_actions %s" % len(all_actions)) - return all_actions - -def get_running_workflow_jobs(): - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - -def do_spawn_workflow_jobs(): - workflow_jobs = get_running_workflow_jobs() - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - for spawn_node in spawn_nodes: - # TODO: Inject job template template params as kwargs. - # Make sure to take into account extra_vars merge logic - kv = {} - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - can_start = job.signal_start(**kv) - if not can_start: - job.status = 'failed' - job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" - job.save(update_fields=['status', 'job_explanation']) - job.socketio_emit_status("failed") - - # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? - #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - -def rebuild_graph(): - """Regenerate the task graph by refreshing known tasks from Tower, purging - orphaned running tasks, and creating dependencies for new tasks before - generating directed edge relationships between those tasks. - """ - ''' - # Sanity check: Only do this on the primary node. - if Instance.objects.my_role() == 'secondary': - return None - ''' - - inspector = inspect() - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None - - do_spawn_workflow_jobs() - - all_sorted_tasks = get_tasks() - if not len(all_sorted_tasks): - print("All sorted task len is not? <%s, %s>" % (len(all_sorted_tasks), all_sorted_tasks)) - return None - - active_tasks = [] - if active_task_queues is not None: - for queue in active_task_queues: - active_tasks += [at['id'] for at in active_task_queues[queue]] - else: - logger.error("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. - if not hasattr(settings, 'CELERY_UNIT_TEST'): - return None - - running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) - waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) - new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) - - # Check running tasks and make sure they are active in celery - logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_tasks): - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # NOTE: Pull status again and make sure it didn't finish in - # the meantime? - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) - task.save() - task.socketio_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) - logger.error("Task %s appears orphaned... marking as failed" % task) - - # Create and process dependencies for new tasks - for task in new_tasks: - logger.debug("Checking dependencies for: %s" % str(task)) - try: - task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) - except Exception, e: - logger.error("Failed processing dependencies for {}: {}".format(task, e)) - task.status = 'failed' - task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) - task.save() - task.socketio_emit_status("failed") - continue - logger.debug("New dependencies: %s" % str(task_dependencies)) - for dep in task_dependencies: - # We recalculate the created time for the moment to ensure the - # dependencies are always sorted in the right order relative to - # the dependent task. - time_delt = len(task_dependencies) - task_dependencies.index(dep) - dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) - dep.status = 'waiting' - dep.save() - waiting_tasks.insert(waiting_tasks.index(task), dep) - if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): - task.status = 'waiting' - task.save() - - # Rebuild graph - graph = SimpleDAG() - for task in running_tasks: - graph.add_node(task) - for wait_task in waiting_tasks[:50]: - node_dependencies = [] - for node in graph: - if wait_task.is_blocked_by(node['node_object']): - node_dependencies.append(node['node_object']) - graph.add_node(wait_task) - for dependency in node_dependencies: - graph.add_edge(wait_task, dependency) - if settings.DEBUG: - graph.generate_graphviz_plot() - return graph - -def process_graph(graph, task_capacity): - """Given a task dependency graph, start and manage tasks given their - priority and weight. - """ - leaf_nodes = graph.get_leaf_nodes() - running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) - running_impact = sum([t['node_object'].task_impact for t in running_nodes]) - ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) - remaining_volume = task_capacity - running_impact - logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - logger.info("Ready Nodes: %s" % str(ready_nodes)) - for task_node in ready_nodes: - node_obj = task_node['node_object'] - # NOTE: This could be used to pass metadata through the task system - # node_args = task_node['metadata'] - impact = node_obj.task_impact - if impact <= remaining_volume or running_impact == 0: - node_dependencies = graph.get_dependents(node_obj) - # Allow other tasks to continue if a job fails, even if they are - # other jobs. - if graph.get_node_type(node_obj) == 'job': - node_dependencies = [] - dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ - [{'type': graph.get_node_type(n['node_object']), - 'id': n['node_object'].id} for n in node_dependencies] - error_handler = handle_work_error.s(subtasks=dependent_nodes) - success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), - 'id': node_obj.id}) - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue - remaining_volume -= impact - running_impact += impact - logger.info('Started Node: %s (capacity hit: %s) ' - 'Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) - - -#logger = logging.getLogger('awx.main.scheduler') - -class CallbackBrokerWorker(ConsumerMixin): - - def __init__(self, connection): - self.connection = connection - - def get_consumers(self, Consumer, channel): - print("get_consumers() OK") - return [Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, - Exchange(settings.SCHEDULER_QUEUE, type='topic'), - routing_key='scheduler.job.launch'),], - accept=['json'], - callbacks=[self.process_job_launch,]), - Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, - Exchange(settings.SCHEDULER_QUEUE, type='topic'), - routing_key='scheduler.job.complete'),], - accept=['json'], - callbacks=[self.process_job_complete,] - )] - - def schedule(self): - task_capacity = get_system_task_capacity() - graph = rebuild_graph() - if graph: - process_graph(graph, task_capacity) - - def process_job_msg(self, body, message): - try: - if settings.DEBUG: - logger.info("Body: {}".format(body)) - logger.info("Message: {}".format(message)) - - if "msg_type" not in body: - raise Exception("Payload does not have a msg_type") - if "job_id" not in body: - raise Exception("Payload does not have a job_id") - - func = getattr(self, "process_%s" % body['msg_type'], None) - if not func: - raise AttributeError("No processor for message type %s" % body['msg_type']) - func(body) - - # Raised by processors when msg isn't in the expected form. - except LookupError as e: - logger.error(e) - except AttributeError as e: - logger.error(e) - except Exception as exc: - import traceback - traceback.print_exc() - logger.error('Callback Task Processor Raised Exception: %r', exc) - finally: - message.ack() - self.schedule() - - def process_job_launch(self, body, message): - print("process_job_launch()") - if "job_id" not in body: - raise KeyError("Payload does not contain job_id") - - ''' - Wait for job to exist. - The job is created in a transaction then the message is created, but - the transaction may not have completed. - - FIXME: We could generate the message in a Django signal handler. - OR, we could call an explicit commit in the view and then send the - message. - - ''' - retries = 10 - retry = 0 - while not UnifiedJob.objects.filter(id=body['job_id']).exists(): - time.sleep(0.3) - - if retry >= retries: - logger.error("Failed to process 'job_launch' message for job %d" % body['job_id']) - # ack the message so we don't build up the queue. - # - # The job can still be chosen to run during tower startup or - # when another job is started or completes - message.ack() - return - retry += 1 - - job = UnifiedJob.objects.get(id=body['job_id']) - - self.schedule() - message.ack() - - def process_job_complete(self, body, message): - print("process_job_complete()") - if "job_id" not in body: - raise KeyError("Payload does not contain job_id") - - # TODO: use list of finished status from jobs.py or unified_jobs.py - finished_status = ['successful', 'error', 'failed', 'completed'] - q = UnifiedJob.objects.filter(id=body['job_id']) - - # Ensure that the job is updated in the database before we call to - # schedule the next job. - retries = 10 - retry = 0 - while True: - # Job not found, most likely deleted. That's fine - if not q.exists(): - logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % body['job_id']) - break - - job = q[0] - if job.status in finished_status: - break - - time.sleep(0.3) - - if retry >= retries: - logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) - message.ack() - return - retry += 1 - - message.ack() - self.schedule() - -class Command(NoArgsCommand): - """Tower Task Management System - This daemon is designed to reside between our tasks and celery and - provide a mechanism for understanding the relationship between those tasks - and their dependencies. - - It also actively prevents situations in which Tower can get blocked - because it doesn't have an understanding of what is progressing through - celery. - """ - help = 'Launch the Tower task management system' - - def handle_noargs(self, **options): - with Connection(settings.BROKER_URL) as conn: - try: - worker = CallbackBrokerWorker(conn) - worker.run() - except KeyboardInterrupt: - print('Terminating Task Management System') - - diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 6806ff7d16..a81bcb6aca 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -852,16 +852,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") - from kombu import Connection, Exchange, Producer - connection = Connection(settings.BROKER_URL) - exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') - producer = Producer(connection) - producer.publish({ 'msg_type': 'job_launch', 'job_id': self.id }, - serializer='json', - compression='bzip2', - exchange=exchange, - declare=[exchange], - routing_key='scheduler.job.launch') + from awx.main.scheduler.tasks import run_job_launch + run_job_launch.delay(self.id) # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e69de29bb2..1c3a1bc515 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -0,0 +1,222 @@ +#Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +# Python +import datetime +import logging + +# Django +from django.conf import settings + +# AWX +from awx.main.models import * # noqa +from awx.main.tasks import handle_work_error, handle_work_success +from awx.main.utils import get_system_task_capacity +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG + +# Celery +from celery.task.control import inspect + +logger = logging.getLogger('awx.main.scheduler') + +def get_tasks(): + """Fetch all Tower tasks that are relevant to the task management + system. + """ + RELEVANT_JOBS = ('pending', 'waiting', 'running') + # TODO: Replace this when we can grab all objects in a sane way. + graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] + graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] + graph_inventory_updates = [iu for iu in + InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_project_updates = [pu for pu in + ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_system_jobs = [sj for sj in + SystemJob.objects.filter(status__in=RELEVANT_JOBS)] + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] + all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + + graph_project_updates + graph_system_jobs + + graph_workflow_jobs, + key=lambda task: task.created) + return all_actions + +def get_running_workflow_jobs(): + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + +def do_spawn_workflow_jobs(): + workflow_jobs = get_running_workflow_jobs() + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + for spawn_node in spawn_nodes: + # TODO: Inject job template template params as kwargs. + # Make sure to take into account extra_vars merge logic + kv = {} + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + can_start = job.signal_start(**kv) + if not can_start: + job.status = 'failed' + job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" + job.save(update_fields=['status', 'job_explanation']) + job.socketio_emit_status("failed") + + # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? + #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + +def rebuild_graph(): + """Regenerate the task graph by refreshing known tasks from Tower, purging + orphaned running tasks, and creating dependencies for new tasks before + generating directed edge relationships between those tasks. + """ + ''' + # Sanity check: Only do this on the primary node. + if Instance.objects.my_role() == 'secondary': + return None + ''' + + inspector = inspect() + if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): + active_task_queues = inspector.active() + else: + logger.warn("Ignoring celery task inspector") + active_task_queues = None + + do_spawn_workflow_jobs() + + all_sorted_tasks = get_tasks() + if not len(all_sorted_tasks): + return None + + active_tasks = [] + if active_task_queues is not None: + for queue in active_task_queues: + active_tasks += [at['id'] for at in active_task_queues[queue]] + else: + logger.error("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. + if not hasattr(settings, 'CELERY_UNIT_TEST'): + return None + + running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) + waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) + new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) + + # Check running tasks and make sure they are active in celery + logger.debug("Active celery tasks: " + str(active_tasks)) + for task in list(running_tasks): + if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + # NOTE: Pull status again and make sure it didn't finish in + # the meantime? + task.status = 'failed' + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) + task.save() + task.socketio_emit_status("failed") + running_tasks.pop(running_tasks.index(task)) + logger.error("Task %s appears orphaned... marking as failed" % task) + + # Create and process dependencies for new tasks + for task in new_tasks: + logger.debug("Checking dependencies for: %s" % str(task)) + try: + task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) + except Exception, e: + logger.error("Failed processing dependencies for {}: {}".format(task, e)) + task.status = 'failed' + task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) + task.save() + task.socketio_emit_status("failed") + continue + logger.debug("New dependencies: %s" % str(task_dependencies)) + for dep in task_dependencies: + # We recalculate the created time for the moment to ensure the + # dependencies are always sorted in the right order relative to + # the dependent task. + time_delt = len(task_dependencies) - task_dependencies.index(dep) + dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) + dep.status = 'waiting' + dep.save() + waiting_tasks.insert(waiting_tasks.index(task), dep) + if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): + task.status = 'waiting' + task.save() + + # Rebuild graph + graph = SimpleDAG() + for task in running_tasks: + graph.add_node(task) + for wait_task in waiting_tasks[:50]: + node_dependencies = [] + for node in graph: + if wait_task.is_blocked_by(node['node_object']): + node_dependencies.append(node['node_object']) + graph.add_node(wait_task) + for dependency in node_dependencies: + graph.add_edge(wait_task, dependency) + if settings.DEBUG: + graph.generate_graphviz_plot() + return graph + +def process_graph(graph, task_capacity): + """Given a task dependency graph, start and manage tasks given their + priority and weight. + """ + leaf_nodes = graph.get_leaf_nodes() + running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) + running_impact = sum([t['node_object'].task_impact for t in running_nodes]) + ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) + remaining_volume = task_capacity - running_impact + logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) + logger.info("Ready Nodes: %s" % str(ready_nodes)) + for task_node in ready_nodes: + node_obj = task_node['node_object'] + # NOTE: This could be used to pass metadata through the task system + # node_args = task_node['metadata'] + impact = node_obj.task_impact + if impact <= remaining_volume or running_impact == 0: + node_dependencies = graph.get_dependents(node_obj) + # Allow other tasks to continue if a job fails, even if they are + # other jobs. + if graph.get_node_type(node_obj) == 'job': + node_dependencies = [] + dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ + [{'type': graph.get_node_type(n['node_object']), + 'id': n['node_object'].id} for n in node_dependencies] + error_handler = handle_work_error.s(subtasks=dependent_nodes) + success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), + 'id': node_obj.id}) + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + node_obj.status = 'failed' + if node_obj.job_explanation: + node_obj.job_explanation += ' ' + node_obj.job_explanation += 'Task failed pre-start check.' + node_obj.save() + continue + remaining_volume -= impact + running_impact += impact + logger.info('Started Node: %s (capacity hit: %s) ' + 'Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) + + + +def schedule(): + task_capacity = get_system_task_capacity() + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index f04c60159a..79b20520e2 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -1,5 +1,12 @@ -from awx.main.models import * # noqa +from awx.main.models import ( + Job, + AdHocCommand, + InventoryUpdate, + ProjectUpdate, + WorkflowJob, + SystemJob, +) class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 1a8269c064..c891b2ec32 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,4 +1,6 @@ -from dag_simple import SimpleDAG + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG class WorkflowDAG(SimpleDAG): def __init__(self, workflow_job=None): @@ -41,8 +43,6 @@ class WorkflowDAG(SimpleDAG): elif job.status in ['successful']: children_success = self.get_dependencies(obj, 'success_nodes') nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") return [n['node_object'] for n in nodes_found] def is_workflow_done(self): @@ -68,7 +68,5 @@ class WorkflowDAG(SimpleDAG): elif job.status in ['successful']: children_success = self.get_dependencies(obj, 'success_nodes') nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") return True diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py new file mode 100644 index 0000000000..343bdd1546 --- /dev/null +++ b/awx/main/scheduler/tasks.py @@ -0,0 +1,79 @@ + +# Python +import logging +import time + +# Celery +from celery import task + +# AWX +from awx.main.models import UnifiedJob +from awx.main.scheduler import schedule + +logger = logging.getLogger('awx.main.scheduler') + +# TODO: move logic to UnifiedJob model and use bind=True feature of celery. +# Would we need the request loop then? I think so. Even if we get the in-memory +# updated model, the call to schedule() may get stale data. + +@task +def run_job_launch(job_id): + # Wait for job to exist. + # The job is created in a transaction then the message is created, but + # the transaction may not have completed. + + # FIXME: We could generate the message in a Django signal handler. + # OR, we could call an explicit commit in the view and then send the + # message. + + retries = 10 + retry = 0 + while not UnifiedJob.objects.filter(id=job_id).exists(): + time.sleep(0.3) + + if retry >= retries: + logger.error("Failed to process 'job_launch' message for job %d" % job_id) + # ack the message so we don't build up the queue. + # + # The job can still be chosen to run during tower startup or + # when another job is started or completes + return + retry += 1 + + # "Safe" to get the job now since it exists. + # Really, there is a race condition from exists to get + + # TODO: while not loop should call get wrapped in a try except + #job = UnifiedJob.objects.get(id=job_id) + + schedule() + +@task +def run_job_complete(job_id): + # TODO: use list of finished status from jobs.py or unified_jobs.py + finished_status = ['successful', 'error', 'failed', 'completed'] + q = UnifiedJob.objects.filter(id=job_id) + + # Ensure that the job is updated in the database before we call to + # schedule the next job. + retries = 10 + retry = 0 + while True: + # Job not found, most likely deleted. That's fine + if not q.exists(): + logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % job_id) + break + + job = q[0] + if job.status in finished_status: + break + + time.sleep(0.3) + + if retry >= retries: + logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) + return + retry += 1 + + schedule() + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 86552b6404..31db196a9b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -31,9 +31,6 @@ except: # Pexpect import pexpect -# Kombu -from kombu import Connection, Exchange, Queue, Producer - # Celery from celery import Task, task from celery.signals import celeryd_init @@ -50,18 +47,18 @@ from django.contrib.auth.models import User from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob -from awx.main.queue import FifoQueue from awx.main.conf import tower_settings from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, emit_websocket_notification, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) +from awx.main.scheduler.dag_workflow import WorkflowDAG __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', 'handle_work_success', 'update_inventory_computed_fields', 'send_notifications', 'run_administrative_checks', - 'run_workflow_job'] + 'RunJobLaunch'] HIDDEN_PASSWORD = '**********' @@ -182,14 +179,6 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task(queue='default') -def notify_task_runner(metadata_dict): - """Add the given task into the Tower task manager's queue, to be consumed - by the task system. - """ - queue = FifoQueue('tower_task_manager') - queue.push(metadata_dict) - def _send_notification_templates(instance, status_str): if status_str not in ['succeeded', 'failed']: raise ValueError("status_str must be either succeeded or failed") @@ -206,17 +195,6 @@ def _send_notification_templates(instance, status_str): job_id=instance.id) -def _send_job_complete_msg(instance): - connection = Connection(settings.BROKER_URL) - exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') - producer = Producer(connection) - producer.publish({ 'job_id': instance.id, 'msg_type': 'job_complete' }, - serializer='json', - compression='bzip2', - exchange=exchange, - declare=[exchange], - routing_key='scheduler.job.complete') - @task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -225,7 +203,8 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') - _send_job_complete_msg(instance) + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(instance.id) @task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): @@ -256,8 +235,14 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') + # We only send 1 job complete message since all the job completion message + # handling does is trigger the scheduler. If we extend the functionality of + # what the job complete message handler does then we may want to send a + # completion event for each job here. if first_instance: - _send_job_complete_msg(first_instance) + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(first_instance.id) + pass @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): @@ -323,10 +308,6 @@ class BaseTask(Task): logger.error('Failed to update %s after %d retries.', self.model._meta.object_name, _attempt) - def signal_finished(self, pk): - pass - # notify_task_runner(dict(complete=pk)) - def get_path_to(self, *args): ''' Return absolute path relative to this file. @@ -1690,7 +1671,7 @@ class RunWorkflowJob(BaseTask): model = WorkflowJob def run(self, pk, **kwargs): - from awx.main.management.commands.run_task_system import WorkflowDAG + print("I'm a running a workflow job") ''' Run the job/task and capture its output. ''' diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 6b35297a07..34be4081b8 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -30,7 +30,7 @@ from django.utils.encoding import force_text # AWX from awx.main.models import * # noqa -from awx.main.management.commands.run_task_system import run_taskmanager +from awx.main.management.commands.run_callback_receiver import CallbackReceiver from awx.main.utils import get_ansible_version from awx.main.task_engine import TaskEngager as LicenseWriter from awx.sso.backends import LDAPSettings @@ -654,18 +654,6 @@ class BaseTestMixin(MockCommonlySlowTestMixin): u'expected no traceback, got:\n%s' % job.result_traceback) - - def start_taskmanager(self, command_port): - self.start_redis() - self.taskmanager_process = Process(target=run_taskmanager, - args=(command_port,)) - self.taskmanager_process.start() - - def terminate_taskmanager(self): - if hasattr(self, 'taskmanager_process'): - self.taskmanager_process.terminate() - self.stop_redis() - class BaseTest(BaseTestMixin, django.test.TestCase): ''' Base class for unit tests. diff --git a/awx/main/tests/unit/commands/test_run_task_system.py b/awx/main/tests/unit/scheduler/test_dag.py similarity index 97% rename from awx/main/tests/unit/commands/test_run_task_system.py rename to awx/main/tests/unit/scheduler/test_dag.py index bc62394b21..84fb2d37f2 100644 --- a/awx/main/tests/unit/commands/test_run_task_system.py +++ b/awx/main/tests/unit/scheduler/test_dag.py @@ -1,10 +1,12 @@ -from awx.main.management.commands.run_task_system import ( - SimpleDAG, - WorkflowDAG, -) + +# Python +import pytest + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.models import Job from awx.main.models.workflow import WorkflowJobNode -import pytest @pytest.fixture def dag_root(): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8a65d7d322..20a80ecca4 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -339,10 +339,11 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), - #Queue('scheduler', Exchange('scheduler'), routing_key='scheduler.job.#'), + Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#'), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) @@ -354,7 +355,11 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', 'routing_key': 'jobs'}, 'awx.main.tasks.run_system_job': {'queue': 'jobs', - 'routing_key': 'jobs'}}) + 'routing_key': 'jobs'}, + 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.launch'}, + 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.complete'},}) CELERYBEAT_SCHEDULE = { 'tower_scheduler': { @@ -1040,7 +1045,7 @@ LOGGING = { 'handlers': ['console', 'file', 'socketio_service'], 'propagate': False }, - 'awx.main.commands.run_task_system': { + 'awx.main.tasks': { 'handlers': ['console', 'file', 'task_system'], 'propagate': False },