diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 1c693a6398..4c20e01e08 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -275,7 +275,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): def _get_unified_job_field_names(cls): return ['name', 'description', 'local_path', 'scm_type', 'scm_url', 'scm_branch', 'scm_clean', 'scm_delete_on_update', - 'credential', 'schedule', 'timeout'] + 'credential', 'schedule', 'timeout', 'launch_type',] def save(self, *args, **kwargs): new_instance = not bool(self.pk) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 6ecdc09b37..0711528c56 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -2,77 +2,107 @@ # All Rights Reserved # Python -import datetime +from datetime import timedelta import logging +from sets import Set # Django from django.conf import settings from django.db import transaction +from django.db.utils import DatabaseError # AWX from awx.main.models import * # noqa -from awx.main.utils import get_system_task_capacity -from awx.main.scheduler.dag_simple import SimpleDAG +#from awx.main.scheduler.dag_simple import SimpleDAG from awx.main.scheduler.dag_workflow import WorkflowDAG +from awx.main.scheduler.dependency_graph import DependencyGraph +from awx.main.scheduler.partial import ( + JobDict, + ProjectUpdateDict, + InventoryUpdateDict, + ProjectUpdateLatestDict, +) + # Celery from celery.task.control import inspect logger = logging.getLogger('awx.main.scheduler') -def get_tasks(): - """Fetch all Tower tasks that are relevant to the task management - system. - """ - RELEVANT_JOBS = ('pending', 'waiting', 'running') - # TODO: Replace this when we can grab all objects in a sane way. - graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] - graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] - graph_inventory_updates = [iu for iu in - InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_project_updates = [pu for pu in - ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_system_jobs = [sj for sj in - SystemJob.objects.filter(status__in=RELEVANT_JOBS)] - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] - all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + - graph_project_updates + graph_system_jobs + - graph_workflow_jobs, - key=lambda task: task.created) - return all_actions +class Scheduler(): + def __init__(self): + self.graph = DependencyGraph() + self.capacity_total = 200 + self.capacity_used = 0 -def get_running_workflow_jobs(): - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs + def _get_tasks_with_status(self, status_list): -def spawn_workflow_graph_jobs(workflow_jobs): - # TODO: Consider using transaction.atomic - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - for spawn_node in spawn_nodes: - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - can_start = job.signal_start(**kv) - if not can_start: - job.status = 'failed' - job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status("failed") + graph_jobs = JobDict.filter_partial(status=status_list) + ''' + graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(**kv)] + graph_inventory_updates = [iu for iu in + InventoryUpdate.objects.filter(**kv)] + ''' + graph_inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) + graph_project_updates = ProjectUpdateDict.filter_partial(status=status_list) + ''' + graph_system_jobs = [sj for sj in + SystemJob.objects.filter(**kv)] + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(**kv)] + all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + + graph_project_updates + graph_system_jobs + + graph_workflow_jobs, + key=lambda task: task.created) + ''' + all_actions = sorted(graph_jobs + graph_project_updates + graph_inventory_updates, + key=lambda task: task['created']) + return all_actions - # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? - #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + def get_tasks(self): + RELEVANT_JOBS = ('pending', 'waiting', 'running') + return self._get_tasks_with_status(RELEVANT_JOBS) -# See comment in tasks.py::RunWorkflowJob::run() -def process_finished_workflow_jobs(workflow_jobs): - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - if dag.is_workflow_done(): - with transaction.atomic(): + # TODO: Consider a database query for this logic + def get_latest_project_update_tasks(self, all_sorted_tasks): + project_ids = Set() + for task in all_sorted_tasks: + if type(task) == JobDict: + project_ids.add(task['project_id']) + + return ProjectUpdateLatestDict.filter_partial(list(project_ids)) + + def get_running_workflow_jobs(self): + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + + def spawn_workflow_graph_jobs(self, workflow_jobs): + # TODO: Consider using transaction.atomic + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + for spawn_node in spawn_nodes: + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + can_start = job.signal_start(**kv) + if not can_start: + job.status = 'failed' + job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status("failed") + + # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? + #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + # See comment in tasks.py::RunWorkflowJob::run() + def process_finished_workflow_jobs(self, workflow_jobs): + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + if dag.is_workflow_done(): + # TODO: detect if wfj failed if workflow_job._has_failed(): workflow_job.status = 'failed' else: @@ -80,178 +110,248 @@ def process_finished_workflow_jobs(workflow_jobs): workflow_job.save() workflow_job.websocket_emit_status(workflow_job.status) -def rebuild_graph(): - """Regenerate the task graph by refreshing known tasks from Tower, purging - orphaned running tasks, and creating dependencies for new tasks before - generating directed edge relationships between those tasks. - """ - ''' - # Sanity check: Only do this on the primary node. - if Instance.objects.my_role() == 'secondary': - return None - ''' + def get_activate_tasks(self): + inspector = inspect() + if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): + active_task_queues = inspector.active() + else: + logger.warn("Ignoring celery task inspector") + active_task_queues = None - inspector = inspect() - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None + active_tasks = [] + if active_task_queues is not None: + for queue in active_task_queues: + active_tasks += [at['id'] for at in active_task_queues[queue]] + else: + logger.error("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. + if not hasattr(settings, 'CELERY_UNIT_TEST'): + return None - all_sorted_tasks = get_tasks() - if not len(all_sorted_tasks): - return None + return active_tasks - active_tasks = [] - if active_task_queues is not None: - for queue in active_task_queues: - active_tasks += [at['id'] for at in active_task_queues[queue]] - else: - logger.error("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. - if not hasattr(settings, 'CELERY_UNIT_TEST'): - return None + def start_task(self, task, dependent_tasks=[]): + from awx.main.tasks import handle_work_error, handle_work_success - running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) - running_celery_tasks = filter(lambda t: type(t) != WorkflowJob, running_tasks) - waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) - new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) + #print("start_task() <%s, %s> with deps %s" % (task.get_job_type_str(), task['id'], dependent_tasks)) + + # TODO: spawn inventory and project updates + task_actual = { + 'type':task.get_job_type_str(), + 'id': task['id'], + } + dependencies = [{'type': t.get_job_type_str(), 'id': t['id']} for t in dependent_tasks] + + error_handler = handle_work_error.s(subtasks=[task_actual] + dependencies) + success_handler = handle_work_success.s(task_actual=task_actual) + + job_obj = task.get_full() + job_obj.status = 'waiting' + job_obj.save() - # Check running tasks and make sure they are active in celery - logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_celery_tasks): - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # NOTE: Pull status again and make sure it didn't finish in - # the meantime? - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) - task.save() - task.websocket_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) - logger.error("Task %s appears orphaned... marking as failed" % task) + #print("For real, starting job <%s, %s>" % (type(job_obj), job_obj.id)) + start_status = job_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + job_obj.status = 'failed' + if job_obj.job_explanation: + job_obj.job_explanation += ' ' + job_obj.job_explanation += 'Task failed pre-start check.' + job_obj.save() + # TODO: run error handler to fail sub-tasks and send notifications + return - # Create and process dependencies for new tasks - for task in new_tasks: - logger.debug("Checking dependencies for: %s" % str(task)) - try: - task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) - except Exception, e: - logger.error("Failed processing dependencies for {}: {}".format(task, e)) - task.status = 'failed' - task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) - task.save() - task.websocket_emit_status("failed") - continue - logger.debug("New dependencies: %s" % str(task_dependencies)) - for dep in task_dependencies: - # We recalculate the created time for the moment to ensure the - # dependencies are always sorted in the right order relative to - # the dependent task. - time_delt = len(task_dependencies) - task_dependencies.index(dep) - dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) - dep.status = 'waiting' - dep.save() - waiting_tasks.insert(waiting_tasks.index(task), dep) - if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): - task.status = 'waiting' - task.save() + self.consume_capacity(task) - # Rebuild graph - graph = SimpleDAG() - for task in running_tasks: - graph.add_node(task) - for wait_task in waiting_tasks[:50]: - node_dependencies = [] - for node in graph: - if wait_task.is_blocked_by(node['node_object']): - node_dependencies.append(node['node_object']) - graph.add_node(wait_task) - for dependency in node_dependencies: - graph.add_edge(wait_task, dependency) - if settings.DEBUG: - graph.generate_graphviz_plot() - return graph + def process_runnable_tasks(self, runnable_tasks): + for i, task in enumerate(runnable_tasks): + # TODO: maybe batch process new tasks. + # Processing a new task individually seems to be expensive + self.graph.add_job(task) -def process_graph(graph, task_capacity): - """Given a task dependency graph, start and manage tasks given their - priority and weight. - """ - from awx.main.tasks import handle_work_error, handle_work_success + def create_project_update(self, task): + dep = Project.objects.get(id=task['project_id']).create_project_update(launch_type='dependency') - leaf_nodes = graph.get_leaf_nodes() - running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) - running_impact = sum([t['node_object'].task_impact for t in running_nodes]) - ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) - remaining_volume = task_capacity - running_impact - logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - logger.info("Ready Nodes: %s" % str(ready_nodes)) - for task_node in ready_nodes: - node_obj = task_node['node_object'] - # NOTE: This could be used to pass metadata through the task system - # node_args = task_node['metadata'] - impact = node_obj.task_impact - if impact <= remaining_volume or running_impact == 0: - node_dependencies = graph.get_dependents(node_obj) - # Allow other tasks to continue if a job fails, even if they are - # other jobs. + # TODO: Consider using milliseconds or microseconds + # Project created 1 seconds behind + dep.created = task['created'] - timedelta(seconds=1) + dep.status = 'waiting' + dep.save() - node_type = graph.get_node_type(node_obj) - if node_type == 'job': - # clear dependencies because a job can block (not necessarily - # depend) on other jobs that share the same job template + project_task = ProjectUpdateDict.get_partial(dep.id) + #waiting_tasks.insert(waiting_tasks.index(task), dep) + + return project_task + + def generate_dependencies(self, task): + dependencies = [] + # TODO: What if the project is null ? + if type(task) is JobDict: + if task['project__scm_update_on_launch'] is True and \ + self.graph.should_update_related_project(task): + project_task = self.create_project_update(task) + dependencies.append(project_task) + # Inventory created 2 seconds behind + return dependencies + + def process_latest_project_updates(self, latest_project_updates): + for task in latest_project_updates: + self.graph.add_latest_project_update(task) + + def process_dependencies(self, dependent_task, dependency_tasks): + for task in dependency_tasks: + # ProjectUpdate or InventoryUpdate may be blocked by another of + # the same type. + if not self.graph.is_job_blocked(task): + self.graph.add_job(task) + if not self.would_exceed_capacity(task): + #print("process_dependencies() going to run project update <%s, %s>" % (task['id'], task['project_id'])) + self.start_task(task, [dependent_task]) + else: + self.graph.add_job(task) + + def process_pending_tasks(self, pending_tasks): + for task in pending_tasks: + + if not self.graph.is_job_blocked(task): + #print("process_pending_tasks() generating deps for job <%s, %s, %s>" % (task['id'], task['project_id'], task.model)) + dependencies = self.generate_dependencies(task) + self.process_dependencies(task, dependencies) + + # Spawning deps might have blocked us + if not self.graph.is_job_blocked(task): + self.graph.add_job(task) + if not self.would_exceed_capacity(task): + #print("Starting the original task <%s, %s>" % (task.get_job_type_str(), task['id'])) + self.start_task(task) + else: + self.graph.add_job(task) + + # Stop processing tasks if we know we are out of capacity + if self.get_remaining_capacity() <= 0: + return + + def fail_inconsistent_running_jobs(self, active_tasks, all_sorted_tasks): + for i, task in enumerate(all_sorted_tasks): + if task['status'] != 'running': + continue + + if (task['celery_task_id'] not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + # NOTE: Pull status again and make sure it didn't finish in + # the meantime? + # TODO: try catch the getting of the job. The job COULD have been deleted + task_obj = task.get_full() + task_obj.status = 'failed' + task_obj.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) + task_obj.save() + task_obj.websocket_emit_status("failed") + + all_sorted_tasks.pop(i) + logger.error("Task %s appears orphaned... marking as failed" % task) + + def process_celery_tasks(self, active_tasks, all_sorted_tasks): + + ''' + Rectify tower db <-> celery inconsistent view of jobs state + ''' + # Check running tasks and make sure they are active in celery + logger.debug("Active celery tasks: " + str(active_tasks)) + all_sorted_tasks = self.fail_inconsistent_running_jobs(active_tasks, + all_sorted_tasks) + + def calculate_capacity_used(self, tasks): + self.capacity_used = 0 + for t in tasks: + self.capacity_used += t.task_impact() + + def would_exceed_capacity(self, task): + return (task.task_impact() + self.capacity_used > self.capacity_total) + + def consume_capacity(self, task): + self.capacity_used += task.task_impact() + #print("Capacity used %s vs total %s" % (self.capacity_used, self.capacity_total)) + + def get_remaining_capacity(self): + return (self.capacity_total - self.capacity_used) + + def process_tasks(self, all_sorted_tasks): + + # TODO: Process new tasks + running_tasks = filter(lambda t: t['status'] == 'running', all_sorted_tasks) + runnable_tasks = filter(lambda t: t['status'] in ['waiting', 'running'], all_sorted_tasks) + + self.calculate_capacity_used(running_tasks) + + self.process_runnable_tasks(runnable_tasks) + + pending_tasks = filter(lambda t: t['status'] == 'pending', all_sorted_tasks) + self.process_pending_tasks(pending_tasks) + + + ''' + def do_graph_things(): + # Rebuild graph + graph = SimpleDAG() + for task in running_tasks: + graph.add_node(task) + #for wait_task in waiting_tasks[:50]: + for wait_task in waiting_tasks: node_dependencies = [] + for node in graph: + if wait_task.is_blocked_by(node['node_object']): + node_dependencies.append(node['node_object']) + graph.add_node(wait_task) + for dependency in node_dependencies: + graph.add_edge(wait_task, dependency) + if settings.DEBUG: + graph.generate_graphviz_plot() + return graph + ''' + #return do_graph_things() - # Make the workflow_job look like it's started by setting status to - # running, but don't make a celery Task for it. - # Introduce jobs from the workflow so they are candidates to run. - # Call process_graph() again to allow choosing for run, the - # created candidate jobs. - elif node_type == 'workflow_job': - node_obj.start() - spawn_workflow_graph_jobs([node_obj]) - return process_graph(graph, task_capacity) + def _schedule(self): + all_sorted_tasks = self.get_tasks() + if len(all_sorted_tasks) > 0: + #self.process_celery_tasks(active_tasks, all_sorted_tasks) - dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ - [{'type': graph.get_node_type(n['node_object']), - 'id': n['node_object'].id} for n in node_dependencies] - error_handler = handle_work_error.s(subtasks=dependent_nodes) - success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), - 'id': node_obj.id}) - with transaction.atomic(): - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue - remaining_volume -= impact - running_impact += impact - logger.info('Started Node: %s (capacity hit: %s) ' - 'Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) + latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) + self.process_latest_project_updates(latest_project_updates) -def schedule(): - with transaction.atomic(): - # Lock - Instance.objects.select_for_update().all()[0] + self.process_tasks(all_sorted_tasks) - task_capacity = get_system_task_capacity() + #print("Finished schedule()") - workflow_jobs = get_running_workflow_jobs() - process_finished_workflow_jobs(workflow_jobs) - spawn_workflow_graph_jobs(workflow_jobs) + def schedule(self): + with transaction.atomic(): + #t1 = datetime.now() + # Lock + try: + Instance.objects.select_for_update(nowait=True).all()[0] + except DatabaseError: + return - graph = rebuild_graph() - if graph: - process_graph(graph, task_capacity) + #workflow_jobs = get_running_workflow_jobs() + #process_finished_workflow_jobs(workflow_jobs) + #spawn_workflow_graph_jobs(workflow_jobs) + + ''' + Get tasks known by celery + ''' + ''' + active_tasks = self.get_activate_tasks() + # Communication with celery failed :(, return + if active_tasks is None: + return None + ''' + self._schedule() # Unlock, due to transaction ending + #t2 = datetime.now() + #t_diff = t2 - t1 + #print("schedule() time %s" % (t_diff.total_seconds())) + + + diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py new file mode 100644 index 0000000000..5ecea91385 --- /dev/null +++ b/awx/main/scheduler/dependency_graph.py @@ -0,0 +1,108 @@ +from datetime import timedelta +from django.utils.timezone import now as tz_now + +from awx.main.scheduler.partial import JobDict, ProjectUpdateDict, InventoryUpdateDict +class DependencyGraph(object): + PROJECT_UPDATES = 'project_updates' + INVENTORY_UPDATES = 'inventory_updates' + JOB_TEMPLATE_JOBS = 'job_template_jobs' + LATEST_PROJECT_UPDATES = 'latest_project_updates' + + def __init__(self, *args, **kwargs): + self.data = {} + # project_id -> True / False + self.data[self.PROJECT_UPDATES] = {} + # inventory_id -> True / False + self.data[self.INVENTORY_UPDATES] = {} + # job_template_id -> True / False + self.data[self.JOB_TEMPLATE_JOBS] = {} + + # project_id -> latest ProjectUpdateDict + self.data[self.LATEST_PROJECT_UPDATES] = {} + + def add_latest_project_update(self, job): + self.data[self.LATEST_PROJECT_UPDATES][job['project_id']] = job + + def get_now(self): + return tz_now() + + ''' + JobDict + + Presume that job is related to a project that is update on launch + ''' + def should_update_related_project(self, job): + now = self.get_now() + latest_project_update = self.data[self.LATEST_PROJECT_UPDATES].get(job['project_id'], None) + if not latest_project_update: + return True + + # TODO: Other finished, failed cases? i.e. error ? + if latest_project_update['status'] == 'failed': + return True + + ''' + This is a bit of fuzzy logic. + If the latest project update has a created time == job_created_time-1 + then consider the project update found. This is so we don't enter an infinite loop + of updating the project when cache timeout is 0. + ''' + if latest_project_update['project__scm_update_cache_timeout'] == 0 and \ + latest_project_update['launch_type'] == 'dependency' and \ + latest_project_update['created'] == job['created'] - timedelta(seconds=1): + return False + + ''' + Normal, expected, cache timeout logic + ''' + timeout_seconds = timedelta(seconds=latest_project_update['project__scm_update_cache_timeout']) + if (latest_project_update['finished'] + timeout_seconds) < now: + return True + + return False + + def add_project_update(self, job): + self.data[self.PROJECT_UPDATES][job['project_id']] = False + + def add_inventory_update(self, job): + self.data[self.INVENTORY_UPDATES][job['inventory_id']] = False + + def add_job_template_job(self, job): + self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False + + + def can_project_update_run(self, job): + return self.data[self.PROJECT_UPDATES].get(job['project_id'], True) + + def can_inventory_update_run(self, job): + return self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) + + def can_job_run(self, job): + if self.can_project_update_run(job) is True and \ + self.can_inventory_update_run(job) is True: + if job['allow_simultaneous'] is False: + return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True) + else: + return True + return False + + def is_job_blocked(self, job): + if type(job) is ProjectUpdateDict: + return not self.can_project_update_run(job) + elif type(job) is InventoryUpdateDict: + return not self.can_inventory_update_run(job) + elif type(job) is JobDict: + return not self.can_job_run(job) + + def add_job(self, job): + if type(job) is ProjectUpdateDict: + self.add_project_update(job) + elif type(job) is InventoryUpdateDict: + self.add_inventory_update(job) + elif type(job) is JobDict: + self.add_job_template_job(job) + + def add_jobs(self, jobs): + for j in jobs: + self.add_job(j) + diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py new file mode 100644 index 0000000000..16c6597f99 --- /dev/null +++ b/awx/main/scheduler/partial.py @@ -0,0 +1,109 @@ + +# AWX +from awx.main.models import ( + Job, + ProjectUpdate, + InventoryUpdate, +) + +class PartialModelDict(object): + FIELDS = () + model = None + data = None + + def __init__(self, data): + if type(data) is not dict: + raise RuntimeError("Expected data to be of type dict not %s" % type(data)) + self.data = data + + def __getitem__(self, index): + return self.data[index] + + def __setitem__(self, key, value): + self.data[key] = value + + def get(self, key, **kwargs): + return self.data.get(key, **kwargs) + + def get_full(self): + return self.model.objects.get(id=self.data['id']) + + def refresh_partial(self): + return self.__class__(self.model.objects.filter(id=self.data['id']).values(*self.__class__.get_db_values())[0]) + + @classmethod + def get_partial(cls, id): + return cls(cls.model.objects.filter(id=id).values(*cls.get_db_values())[0]) + + @classmethod + def get_db_values(cls): + return cls.FIELDS + + @classmethod + def filter_partial(cls, status=[]): + kv = { + 'status__in': status + } + return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] + + def get_job_type_str(self): + raise RuntimeError("Inherit and implement me") + + def task_impact(self): + raise RuntimeError("Inherit and implement me") + +class JobDict(PartialModelDict): + FIELDS = ( + 'id', 'status', 'job_template_id', 'inventory_id', 'project_id', + 'launch_type', 'limit', 'allow_simultaneous', 'created', + 'job_type', 'celery_task_id', 'project__scm_update_on_launch', + 'forks', + ) + model = Job + + def get_job_type_str(self): + return 'job' + + def task_impact(self): + return (5 if self.data['forks'] == 0 else self.data['forks']) * 10 + +class ProjectUpdateDict(PartialModelDict): + FIELDS = ( + 'id', 'status', 'project_id', 'created', 'celery_task_id', 'launch_type', 'project__scm_update_cache_timeout', 'project__scm_update_on_launch', + ) + model = ProjectUpdate + + def get_job_type_str(self): + return 'project_update' + + def task_impact(self): + return 10 + +class ProjectUpdateLatestDict(ProjectUpdateDict): + FIELDS = ( + 'id', 'status', 'project_id', 'created', 'finished', 'project__scm_update_cache_timeout', 'launch_type', 'project__scm_update_on_launch', + ) + model = ProjectUpdate + + @classmethod + def filter_partial(cls, project_ids): + # TODO: This can shurley be made more efficient + results = [] + for project_id in project_ids: + qs = cls.model.objects.filter(project_id=project_id, status__in=['waiting', 'successful', 'failed']).order_by('-finished') + if qs.count() > 0: + results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0])) + return results + +class InventoryUpdateDict(PartialModelDict): + FIELDS = ( + 'id', 'status', 'created', 'celery_task_id', + ) + model = InventoryUpdate + + def get_job_type_str(self): + return 'inventory_update' + + def task_impact(self): + return 20 + diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 343bdd1546..ef0334e316 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,14 +1,17 @@ # Python import logging -import time + +# Django +from django.db import transaction +from django.db.utils import DatabaseError # Celery from celery import task # AWX -from awx.main.models import UnifiedJob -from awx.main.scheduler import schedule +from awx.main.models import Instance +from awx.main.scheduler import Scheduler logger = logging.getLogger('awx.main.scheduler') @@ -18,6 +21,7 @@ logger = logging.getLogger('awx.main.scheduler') @task def run_job_launch(job_id): + ''' # Wait for job to exist. # The job is created in a transaction then the message is created, but # the transaction may not have completed. @@ -45,11 +49,13 @@ def run_job_launch(job_id): # TODO: while not loop should call get wrapped in a try except #job = UnifiedJob.objects.get(id=job_id) + ''' - schedule() + Scheduler().schedule() @task def run_job_complete(job_id): + ''' # TODO: use list of finished status from jobs.py or unified_jobs.py finished_status = ['successful', 'error', 'failed', 'completed'] q = UnifiedJob.objects.filter(id=job_id) @@ -74,6 +80,29 @@ def run_job_complete(job_id): logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) return retry += 1 + ''' - schedule() + Scheduler().schedule() + +@task +def run_scheduler(): + Scheduler().schedule() + +@task +def run_fail_inconsistent_running_jobs(): + return + print("run_fail_inconsistent_running_jobs() running") + with transaction.atomic(): + # Lock + try: + Instance.objects.select_for_update(nowait=True).all()[0] + scheduler = Scheduler() + active_tasks = scheduler.get_activate_tasks() + if active_tasks is None: + return None + + all_sorted_tasks = scheduler.get_tasks() + scheduler.process_celery_tasks(active_tasks, all_sorted_tasks) + except DatabaseError: + return diff --git a/awx/main/tests/functional/test_partial.py b/awx/main/tests/functional/test_partial.py new file mode 100644 index 0000000000..69ad71c4df --- /dev/null +++ b/awx/main/tests/functional/test_partial.py @@ -0,0 +1,65 @@ + +# Python +import pytest +from django.utils.timezone import now as tz_now +from datetime import timedelta + +# AWX +from awx.main.models import ( + Project, + ProjectUpdate, +) +from awx.main.scheduler.partial import ( + ProjectUpdateLatestDict, +) + + +@pytest.fixture +def failed_project_update(): + p = Project.objects.create(name="proj1") + pu = ProjectUpdate.objects.create(project=p, status='failed', finished=tz_now() - timedelta(seconds=20)) + + return (p, pu) + +@pytest.fixture +def successful_project_update(): + p = Project.objects.create(name="proj1") + pu = ProjectUpdate.objects.create(project=p, status='successful', finished=tz_now() - timedelta(seconds=20)) + + return (p, pu) + +# Failed project updates newer than successful ones +@pytest.fixture +def multiple_project_updates(): + p = Project.objects.create(name="proj1") + + epoch = tz_now() + + successful_pus = [ProjectUpdate.objects.create(project=p, + status='successful', + finished=epoch - timedelta(seconds=100 + i)) for i in xrange(0, 5)] + failed_pus = [ProjectUpdate.objects.create(project=p, + status='failed', + finished=epoch - timedelta(seconds=100 - len(successful_pus) + i)) for i in xrange(0, 5)] + return (p, failed_pus, successful_pus) + +class TestProjectUpdateLatestDictDict(): + @pytest.mark.django_db + class TestFilterPartial(): + def test_project_update_successful(self, successful_project_update): + (project, project_update) = successful_project_update + + tasks = ProjectUpdateLatestDict.filter_partial(project_ids=[project.id]) + + assert 1 == len(tasks) + assert project_update.id == tasks[0]['id'] + + def test_correct_project_update(self, multiple_project_updates): + (project, failed_pus, successful_pus) = multiple_project_updates + + tasks = ProjectUpdateLatestDict.filter_partial(project_ids=[project.id]) + + assert 1 == len(tasks) + assert failed_pus[0].id == tasks[0]['id'] + + diff --git a/awx/main/tests/unit/scheduler/__init__.py b/awx/main/tests/unit/scheduler/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/main/tests/unit/scheduler/test_dependency_graph.py b/awx/main/tests/unit/scheduler/test_dependency_graph.py new file mode 100644 index 0000000000..081f175027 --- /dev/null +++ b/awx/main/tests/unit/scheduler/test_dependency_graph.py @@ -0,0 +1,121 @@ + +# Python +import pytest +from datetime import timedelta + +# Django +from django.utils.timezone import now as tz_now + +# AWX +from awx.main.scheduler.dependency_graph import DependencyGraph +from awx.main.scheduler.partial import ProjectUpdateDict + +@pytest.fixture +def graph(): + return DependencyGraph() + +@pytest.fixture +def job(): + return dict(project_id=1) + +@pytest.fixture +def unsuccessful_last_project(graph, job): + pu = ProjectUpdateDict(dict(id=1, + project__scm_update_cache_timeout=999999, + project_id=1, + status='failed', + created='3', + finished='3',)) + + graph.add_latest_project_update(pu) + + return graph + +@pytest.fixture +def last_dependent_project(graph): + now = tz_now() + + job = { + 'project_id': 1, + 'created': now, + } + pu = ProjectUpdateDict(dict(id=1, project_id=1, status='waiting', + project__scm_update_cache_timeout=0, + launch_type='dependency', + created=now - timedelta(seconds=1),)) + + graph.add_latest_project_update(pu) + + return (graph, job) + +@pytest.fixture +def timedout_project_update(graph, job): + now = tz_now() + + job = { + 'project_id': 1, + 'created': now, + } + pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful', + project__scm_update_cache_timeout=10, + launch_type='dependency', + created=now - timedelta(seconds=100), + finished=now - timedelta(seconds=11),)) + + graph.add_latest_project_update(pu) + + return (graph, job) + +@pytest.fixture +def not_timedout_project_update(graph, job): + now = tz_now() + + job = { + 'project_id': 1, + 'created': now, + } + pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful', + project__scm_update_cache_timeout=3600, + launch_type='dependency', + created=now - timedelta(seconds=100), + finished=now - timedelta(seconds=11),)) + + graph.add_latest_project_update(pu) + + return (graph, job) + + +class TestShouldUpdateRelatedProject(): + + def test_no_project_updates(self, graph, job): + actual = graph.should_update_related_project(job) + + assert True is actual + + def test_timedout_project_update(self, timedout_project_update): + (graph, job) = timedout_project_update + + actual = graph.should_update_related_project(job) + + assert True is actual + + def test_not_timedout_project_update(self, not_timedout_project_update): + (graph, job) = not_timedout_project_update + + actual = graph.should_update_related_project(job) + + assert False is actual + + def test_unsuccessful_last_project(self, unsuccessful_last_project, job): + graph = unsuccessful_last_project + + actual = graph.should_update_related_project(job) + + assert True is actual + + def test_last_dependent_project(self, last_dependent_project): + (graph, job) = last_dependent_project + + actual = graph.should_update_related_project(job) + assert False is actual + diff --git a/awx/main/tests/unit/scheduler/test_scheduler_project_update.py b/awx/main/tests/unit/scheduler/test_scheduler_project_update.py new file mode 100644 index 0000000000..54add63d51 --- /dev/null +++ b/awx/main/tests/unit/scheduler/test_scheduler_project_update.py @@ -0,0 +1,194 @@ + +# Python +import pytest +from datetime import timedelta + +# Django +from django.utils.timezone import now as tz_now + +# awx +from awx.main.scheduler.partial import ( + JobDict, + ProjectUpdateDict, +) +from awx.main.scheduler import Scheduler + +# TODO: wherever get_latest_rpoject_update_task() is stubbed and returns a +# ProjectUpdateDict. We should instead return a ProjectUpdateLatestDict() +# For now, this is ok since the fields on deviate that much. + +@pytest.fixture +def epoch(): + return tz_now() + + +@pytest.fixture +def scheduler_factory(mocker, epoch): + def fn(tasks=[], latest_project_updates=[], create_project_update=None): + sched = Scheduler() + sched.capacity_total = 999999999 + + sched.graph.get_now = lambda: epoch + + mocker.patch.object(sched, 'get_tasks', return_value=tasks) + mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates) + mocker.patch.object(sched, 'create_project_update', return_value=create_project_update) + mocker.patch.object(sched, 'start_task') + return sched + return fn + +@pytest.fixture +def project_update_factory(epoch): + def fn(): + return ProjectUpdateDict({ + 'id': 1, + 'created': epoch - timedelta(seconds=100), + 'project_id': 1, + 'project__scm_update_cache_timeout': 0, + 'celery_task_id': '', + 'launch_type': 'dependency', + 'project__scm_update_on_launch': True, + }) + return fn + +@pytest.fixture +def pending_project_update(project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'pending' + return project_update + +@pytest.fixture +def waiting_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'waiting' + return project_update + +@pytest.fixture +def pending_job(epoch): + return JobDict({ + 'id': 1, + 'status': 'pending', + 'job_template_id': 1, + 'project_id': 1, + 'inventory_id': 1, + 'launch_type': 'manual', + 'allow_simultaneous': False, + 'created': epoch - timedelta(seconds=99), + 'celery_task_id': '', + 'project__scm_update_on_launch': True, + 'forks': 5 + }) + +@pytest.fixture +def running_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'running' + return project_update + +@pytest.fixture +def successful_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['finished'] = epoch - timedelta(seconds=90) + project_update['status'] = 'successful' + return project_update + +@pytest.fixture +def successful_project_update_cache_expired(epoch, project_update_factory): + project_update = project_update_factory() + + project_update['status'] = 'successful' + project_update['created'] = epoch - timedelta(seconds=120) + project_update['finished'] = epoch - timedelta(seconds=110) + project_update['project__scm_update_cache_timeout'] = 1 + return project_update + +@pytest.fixture +def failed_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['finished'] = epoch - timedelta(seconds=90) + project_update['status'] = 'failed' + return project_update + +class TestStartProjectUpdate(): + def test(self, scheduler_factory, pending_project_update): + scheduler = scheduler_factory(tasks=[pending_project_update]) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_project_update) + assert scheduler.create_project_update.call_count == 0 + + ''' + Explicit project update should always run. They should not use cache logic. + ''' + def test_cache_oblivious(self, scheduler_factory, successful_project_update, pending_project_update): + scheduler = scheduler_factory(tasks=[pending_project_update], + latest_project_updates=[successful_project_update]) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_project_update) + assert scheduler.create_project_update.call_count == 0 + + +class TestCreateDependentProjectUpdate(): + + def test(self, scheduler_factory, pending_job, waiting_project_update): + scheduler = scheduler_factory(tasks=[pending_job], + create_project_update=waiting_project_update) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_project_update, [pending_job]) + + def test_cache_hit(self, scheduler_factory, pending_job, successful_project_update): + scheduler = scheduler_factory(tasks=[successful_project_update, pending_job], + latest_project_updates=[successful_project_update]) + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_job) + + def test_cache_miss(self, scheduler_factory, pending_job, successful_project_update_cache_expired, waiting_project_update): + scheduler = scheduler_factory(tasks=[successful_project_update_cache_expired, pending_job], + latest_project_updates=[successful_project_update_cache_expired], + create_project_update=waiting_project_update) + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_project_update, [pending_job]) + + def test_last_update_failed(self, scheduler_factory, pending_job, failed_project_update, waiting_project_update): + scheduler = scheduler_factory(tasks=[failed_project_update, pending_job], + latest_project_updates=[failed_project_update], + create_project_update=waiting_project_update) + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_project_update, [pending_job]) + + +class TestJobBlockedOnProjectUpdate(): + def test(self, scheduler_factory, pending_job, waiting_project_update): + scheduler = scheduler_factory(tasks=[waiting_project_update, pending_job], + latest_project_updates=[waiting_project_update]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + assert scheduler.create_project_update.call_count == 0 + + def test_project_running(self, scheduler_factory, pending_job, running_project_update): + scheduler = scheduler_factory(tasks=[running_project_update, pending_job]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + assert scheduler.create_project_update.call_count == 0 + +class TestProjectUpdateBlocked(): + def test(self, scheduler_factory, running_project_update, pending_project_update): + scheduler = scheduler_factory(tasks=[running_project_update, pending_project_update], + latest_project_updates=[running_project_update]) + scheduler._schedule() + + scheduler.start_task.assert_not_called() + assert scheduler.create_project_update.call_count == 0 + diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 9c6ed0950b..a5c7975920 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -392,6 +392,14 @@ CELERYBEAT_SCHEDULE = { 'task': 'awx.main.tasks.cluster_node_heartbeat', 'schedule': timedelta(seconds=60) }, + 'task_scheduler': { + 'task': 'awx.main.scheduler.tasks.run_scheduler', + 'schedule': timedelta(seconds=10) + }, + 'task_fail_inconsistent_running_jobs': { + 'task': 'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs', + 'schedule': timedelta(seconds=30) + }, } # Django Caching Configuration