From 306562cd670c38b9fb1ae07e941c417ac471c046 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 20 Oct 2016 15:05:02 -0400 Subject: [PATCH] inventory updates running correctly --- awx/main/models/inventory.py | 2 +- awx/main/models/unified_jobs.py | 10 +- awx/main/scheduler/__init__.py | 77 ++++-- awx/main/scheduler/dependency_graph.py | 82 +++++- awx/main/scheduler/partial.py | 66 ++++- awx/main/scheduler/tasks.py | 57 ----- awx/main/tests/functional/test_partial.py | 111 +++++--- awx/main/tests/unit/scheduler/conftest.py | 238 ++++++++++++++++++ .../test_scheduler_inventory_update.py | 85 +++++++ .../unit/scheduler/test_scheduler_job.py | 66 +++++ .../test_scheduler_project_update.py | 123 +-------- 11 files changed, 681 insertions(+), 236 deletions(-) create mode 100644 awx/main/tests/unit/scheduler/conftest.py create mode 100644 awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py create mode 100644 awx/main/tests/unit/scheduler/test_scheduler_job.py diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 6fb3e2f992..c77868759e 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -1089,7 +1089,7 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions): def _get_unified_job_field_names(cls): return ['name', 'description', 'source', 'source_path', 'source_script', 'source_vars', 'schedule', 'credential', 'source_regions', 'instance_filters', 'group_by', 'overwrite', 'overwrite_vars', - 'timeout'] + 'timeout', 'launch_type',] def save(self, *args, **kwargs): # If update_fields has been specified, add our field names to it, diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 674bedbffe..19bc265c18 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -13,7 +13,7 @@ from StringIO import StringIO # Django from django.conf import settings -from django.db import models +from django.db import models, connection from django.core.exceptions import NON_FIELD_ERRORS from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now @@ -835,6 +835,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique return (True, opts) + def start_celery_task(self, opts, error_callback, success_callback): + task_class = self._get_task_class() + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + def start(self, error_callback, success_callback, **kwargs): ''' Start the task running via Celery. @@ -842,7 +846,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique task_class = self._get_task_class() (res, opts) = self.pre_start(**kwargs) if res: - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + self.start_celery_task(opts, error_callback, success_callback) return res def signal_start(self, **kwargs): @@ -871,7 +875,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.websocket_emit_status("pending") from awx.main.scheduler.tasks import run_job_launch - run_job_launch.delay(self.id) + connection.on_commit(lambda: run_job_launch.delay(self.id)) # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 0711528c56..e704b3ef8a 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -8,7 +8,7 @@ from sets import Set # Django from django.conf import settings -from django.db import transaction +from django.db import transaction, connection from django.db.utils import DatabaseError # AWX @@ -20,8 +20,10 @@ from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.partial import ( JobDict, ProjectUpdateDict, - InventoryUpdateDict, ProjectUpdateLatestDict, + InventoryUpdateDict, + InventoryUpdateLatestDict, + InventorySourceDict, ) # Celery @@ -72,11 +74,34 @@ class Scheduler(): return ProjectUpdateLatestDict.filter_partial(list(project_ids)) + # TODO: Consider a database query for this logic + def get_latest_inventory_update_tasks(self, all_sorted_tasks): + inventory_ids = Set() + for task in all_sorted_tasks: + if type(task) == JobDict: + inventory_ids.add(task['inventory_id']) + + return InventoryUpdateLatestDict.filter_partial(list(inventory_ids)) + + def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs + # TODO: Consider a database query for this logic + def get_inventory_source_tasks(self, all_sorted_tasks): + inventory_ids = Set() + results = [] + for task in all_sorted_tasks: + if type(task) is JobDict: + inventory_ids.add(task['inventory_id']) + + for inventory_id in inventory_ids: + results.append((inventory_id, InventorySourceDict.filter_partial(inventory_id))) + + return results + def spawn_workflow_graph_jobs(self, workflow_jobs): # TODO: Consider using transaction.atomic for workflow_job in workflow_jobs: @@ -134,8 +159,6 @@ class Scheduler(): def start_task(self, task, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success - #print("start_task() <%s, %s> with deps %s" % (task.get_job_type_str(), task['id'], dependent_tasks)) - # TODO: spawn inventory and project updates task_actual = { 'type':task.get_job_type_str(), @@ -148,10 +171,8 @@ class Scheduler(): job_obj = task.get_full() job_obj.status = 'waiting' - job_obj.save() - #print("For real, starting job <%s, %s>" % (type(job_obj), job_obj.id)) - start_status = job_obj.start(error_callback=error_handler, success_callback=success_handler) + (start_status, opts) = job_obj.pre_start() if not start_status: job_obj.status = 'failed' if job_obj.job_explanation: @@ -163,6 +184,8 @@ class Scheduler(): self.consume_capacity(task) + connection.on_commit(lambda: job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)) + def process_runnable_tasks(self, runnable_tasks): for i, task in enumerate(runnable_tasks): # TODO: maybe batch process new tasks. @@ -179,10 +202,20 @@ class Scheduler(): dep.save() project_task = ProjectUpdateDict.get_partial(dep.id) - #waiting_tasks.insert(waiting_tasks.index(task), dep) return project_task + def create_inventory_update(self, task, inventory_source_task): + dep = InventorySource.objects.get(id=inventory_source_task['id']).create_inventory_update(launch_type='dependency') + + dep.created = task['created'] - timedelta(seconds=2) + dep.status = 'waiting' + dep.save() + + inventory_task = InventoryUpdateDict.get_partial(dep.id) + + return inventory_task + def generate_dependencies(self, task): dependencies = [] # TODO: What if the project is null ? @@ -191,12 +224,24 @@ class Scheduler(): self.graph.should_update_related_project(task): project_task = self.create_project_update(task) dependencies.append(project_task) - # Inventory created 2 seconds behind + # Inventory created 2 seconds behind job + + for inventory_source_task in self.graph.get_inventory_sources(task['inventory_id']): + if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']): + inventory_task = self.create_inventory_update(task, inventory_source_task) + dependencies.append(inventory_task) return dependencies def process_latest_project_updates(self, latest_project_updates): - for task in latest_project_updates: - self.graph.add_latest_project_update(task) + map(lambda task: self.graph.add_latest_project_update(task), latest_project_updates) + + def process_latest_inventory_updates(self, latest_inventory_updates): + map(lambda task: self.graph.add_latest_inventory_update(task), latest_inventory_updates) + + def process_inventory_sources(self, inventory_id_sources): + #map(lambda inventory_id, inventory_sources: self.graph.add_inventory_sources(inventory_id, inventory_sources), inventory_id_sources) + for inventory_id, inventory_sources in inventory_id_sources: + self.graph.add_inventory_sources(inventory_id, inventory_sources) def process_dependencies(self, dependent_task, dependency_tasks): for task in dependency_tasks: @@ -205,7 +250,6 @@ class Scheduler(): if not self.graph.is_job_blocked(task): self.graph.add_job(task) if not self.would_exceed_capacity(task): - #print("process_dependencies() going to run project update <%s, %s>" % (task['id'], task['project_id'])) self.start_task(task, [dependent_task]) else: self.graph.add_job(task) @@ -214,7 +258,6 @@ class Scheduler(): for task in pending_tasks: if not self.graph.is_job_blocked(task): - #print("process_pending_tasks() generating deps for job <%s, %s, %s>" % (task['id'], task['project_id'], task.model)) dependencies = self.generate_dependencies(task) self.process_dependencies(task, dependencies) @@ -222,7 +265,6 @@ class Scheduler(): if not self.graph.is_job_blocked(task): self.graph.add_job(task) if not self.would_exceed_capacity(task): - #print("Starting the original task <%s, %s>" % (task.get_job_type_str(), task['id'])) self.start_task(task) else: self.graph.add_job(task) @@ -272,7 +314,6 @@ class Scheduler(): def consume_capacity(self, task): self.capacity_used += task.task_impact() - #print("Capacity used %s vs total %s" % (self.capacity_used, self.capacity_total)) def get_remaining_capacity(self): return (self.capacity_total - self.capacity_used) @@ -320,6 +361,12 @@ class Scheduler(): latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) self.process_latest_project_updates(latest_project_updates) + latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) + self.process_latest_inventory_updates(latest_inventory_updates) + + inventory_id_sources = self.get_inventory_source_tasks(all_sorted_tasks) + self.process_inventory_sources(inventory_id_sources) + self.process_tasks(all_sorted_tasks) #print("Finished schedule()") diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 5ecea91385..14a77ab697 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -6,7 +6,12 @@ class DependencyGraph(object): PROJECT_UPDATES = 'project_updates' INVENTORY_UPDATES = 'inventory_updates' JOB_TEMPLATE_JOBS = 'job_template_jobs' + INVENTORY_SOURCE_UPDATES = 'inventory_source_updates' + LATEST_PROJECT_UPDATES = 'latest_project_updates' + LATEST_INVENTORY_UPDATES = 'latest_inventory_updates' + + INVENTORY_SOURCES = 'inventory_source_ids' def __init__(self, *args, **kwargs): self.data = {} @@ -16,13 +21,29 @@ class DependencyGraph(object): self.data[self.INVENTORY_UPDATES] = {} # job_template_id -> True / False self.data[self.JOB_TEMPLATE_JOBS] = {} + # inventory_source_id -> True / False + self.data[self.INVENTORY_SOURCE_UPDATES] = {} - # project_id -> latest ProjectUpdateDict + # project_id -> latest ProjectUpdateLatestDict self.data[self.LATEST_PROJECT_UPDATES] = {} + # inventory_source_id -> latest InventoryUpdateLatestDict + self.data[self.LATEST_INVENTORY_UPDATES] = {} + + # inventory_id -> [inventory_source_ids] + self.data[self.INVENTORY_SOURCES] = {} def add_latest_project_update(self, job): self.data[self.LATEST_PROJECT_UPDATES][job['project_id']] = job + def add_latest_inventory_update(self, job): + self.data[self.LATEST_INVENTORY_UPDATES][job['inventory_source_id']] = job + + def add_inventory_sources(self, inventory_id, inventory_sources): + self.data[self.INVENTORY_SOURCES][inventory_id] = inventory_sources + + def get_inventory_sources(self, inventory_id): + return self.data[self.INVENTORY_SOURCES].get(inventory_id, []) + def get_now(self): return tz_now() @@ -61,25 +82,59 @@ class DependencyGraph(object): return False - def add_project_update(self, job): + def should_update_related_inventory_source(self, job, inventory_source_id): + now = self.get_now() + latest_inventory_update = self.data[self.LATEST_INVENTORY_UPDATES].get(inventory_source_id, None) + if not latest_inventory_update: + return True + + # TODO: Other finished, failed cases? i.e. error ? + if latest_inventory_update['status'] == 'failed': + return True + + ''' + This is a bit of fuzzy logic. + If the latest inventory update has a created time == job_created_time-2 + then consider the inventory update found. This is so we don't enter an infinite loop + of updating the project when cache timeout is 0. + ''' + if latest_inventory_update['inventory_source__update_cache_timeout'] == 0 and \ + latest_inventory_update['launch_type'] == 'dependency' and \ + latest_inventory_update['created'] == job['created'] - timedelta(seconds=2): + return False + + ''' + Normal, expected, cache timeout logic + ''' + timeout_seconds = timedelta(seconds=latest_inventory_update['inventory_source__update_cache_timeout']) + if (latest_inventory_update['finished'] + timeout_seconds) < now: + return True + + return False + + def mark_project_update(self, job): self.data[self.PROJECT_UPDATES][job['project_id']] = False - def add_inventory_update(self, job): + def mark_inventory_update(self, inventory_id): + self.data[self.INVENTORY_UPDATES][inventory_id] = False + + def mark_inventory_source_update(self, inventory_source_id): + self.data[self.INVENTORY_SOURCE_UPDATES][inventory_source_id] = False + + def mark_job_template_job(self, job): self.data[self.INVENTORY_UPDATES][job['inventory_id']] = False - - def add_job_template_job(self, job): + self.data[self.PROJECT_UPDATES][job['project_id']] = False self.data[self.JOB_TEMPLATE_JOBS][job['job_template_id']] = False - def can_project_update_run(self, job): return self.data[self.PROJECT_UPDATES].get(job['project_id'], True) - def can_inventory_update_run(self, job): - return self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) + def can_inventory_update_run(self, inventory_source_id): + return self.data[self.INVENTORY_SOURCE_UPDATES].get(inventory_source_id, True) def can_job_run(self, job): if self.can_project_update_run(job) is True and \ - self.can_inventory_update_run(job) is True: + self.data[self.INVENTORY_UPDATES].get(job['inventory_id'], True) is True: if job['allow_simultaneous'] is False: return self.data[self.JOB_TEMPLATE_JOBS].get(job['job_template_id'], True) else: @@ -90,17 +145,18 @@ class DependencyGraph(object): if type(job) is ProjectUpdateDict: return not self.can_project_update_run(job) elif type(job) is InventoryUpdateDict: - return not self.can_inventory_update_run(job) + return not self.can_inventory_update_run(job['inventory_source_id']) elif type(job) is JobDict: return not self.can_job_run(job) def add_job(self, job): if type(job) is ProjectUpdateDict: - self.add_project_update(job) + self.mark_project_update(job) elif type(job) is InventoryUpdateDict: - self.add_inventory_update(job) + self.mark_inventory_update(job['inventory_source__inventory_id']) + self.mark_inventory_source_update(job['inventory_source_id']) elif type(job) is JobDict: - self.add_job_template_job(job) + self.mark_job_template_job(job) def add_jobs(self, jobs): for j in jobs: diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py index 16c6597f99..e3677dab2c 100644 --- a/awx/main/scheduler/partial.py +++ b/awx/main/scheduler/partial.py @@ -4,6 +4,7 @@ from awx.main.models import ( Job, ProjectUpdate, InventoryUpdate, + InventorySource, ) class PartialModelDict(object): @@ -57,7 +58,7 @@ class JobDict(PartialModelDict): 'id', 'status', 'job_template_id', 'inventory_id', 'project_id', 'launch_type', 'limit', 'allow_simultaneous', 'created', 'job_type', 'celery_task_id', 'project__scm_update_on_launch', - 'forks', + 'forks', 'inventory__inventory_sources', ) model = Job @@ -69,7 +70,9 @@ class JobDict(PartialModelDict): class ProjectUpdateDict(PartialModelDict): FIELDS = ( - 'id', 'status', 'project_id', 'created', 'celery_task_id', 'launch_type', 'project__scm_update_cache_timeout', 'project__scm_update_on_launch', + 'id', 'status', 'project_id', 'created', 'celery_task_id', + 'launch_type', 'project__scm_update_cache_timeout', + 'project__scm_update_on_launch', ) model = ProjectUpdate @@ -81,23 +84,29 @@ class ProjectUpdateDict(PartialModelDict): class ProjectUpdateLatestDict(ProjectUpdateDict): FIELDS = ( - 'id', 'status', 'project_id', 'created', 'finished', 'project__scm_update_cache_timeout', 'launch_type', 'project__scm_update_on_launch', + 'id', 'status', 'project_id', 'created', 'finished', + 'project__scm_update_cache_timeout', + 'launch_type', 'project__scm_update_on_launch', ) model = ProjectUpdate @classmethod def filter_partial(cls, project_ids): # TODO: This can shurley be made more efficient + # * shouldn't have to do a query per inventory_id + # * shouldn't have to call .values() on all the results, only to get the first result results = [] for project_id in project_ids: - qs = cls.model.objects.filter(project_id=project_id, status__in=['waiting', 'successful', 'failed']).order_by('-finished') + qs = cls.model.objects.filter(project_id=project_id, status__in=['waiting', 'successful', 'failed']).order_by('-finished', '-started', '-created',) if qs.count() > 0: results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0])) return results class InventoryUpdateDict(PartialModelDict): + #'inventory_source__update_on_launch', + #'inventory_source__update_cache_timeout', FIELDS = ( - 'id', 'status', 'created', 'celery_task_id', + 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', 'inventory_source__inventory_id', ) model = InventoryUpdate @@ -107,3 +116,50 @@ class InventoryUpdateDict(PartialModelDict): def task_impact(self): return 20 +class InventoryUpdateLatestDict(InventoryUpdateDict): + #'inventory_source__update_on_launch', + #'inventory_source__update_cache_timeout', + FIELDS = ( + 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', + 'finished', 'inventory_source__update_cache_timeout', 'launch_type', + ) + model = InventoryUpdate + + @classmethod + def filter_partial(cls, inventory_ids): + # TODO: This can shurley be made more efficient + # * shouldn't have to do a query per inventory_id nor per inventory_source_id + # * shouldn't have to call .values() on all the results, only to get the first result + results = [] + for inventory_id in inventory_ids: + inventory_source_ids = InventorySource.objects.filter(inventory_id=inventory_id, + update_on_launch=True).values_list('id', flat=True) + # Find the most recent inventory update for each inventory source + for inventory_source_id in inventory_source_ids: + qs = cls.model.objects.filter(inventory_source_id=inventory_source_id, + status__in=['waiting', 'successful', 'failed'], + inventory_source__update_on_launch=True).order_by('-finished', '-started', '-created') + if qs.count() > 0: + results.append(cls(cls.model.objects.filter(id=qs[0].id).values(*cls.get_db_values())[0])) + return results + +class InventorySourceDict(PartialModelDict): + FIELDS = ( + 'id', + ) + model = InventorySource + + def get_job_type_str(self): + return 'inventory_source' + + def task_impact(self): + return 20 + + @classmethod + # TODO: Optimize this to run the query once + def filter_partial(cls, inventory_id): + kv = { + 'inventory_id': inventory_id, + 'update_on_launch': True, + } + return [cls(o) for o in cls.model.objects.filter(**kv).values(*cls.get_db_values())] diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index ef0334e316..ba1ddaeecc 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -21,67 +21,10 @@ logger = logging.getLogger('awx.main.scheduler') @task def run_job_launch(job_id): - ''' - # Wait for job to exist. - # The job is created in a transaction then the message is created, but - # the transaction may not have completed. - - # FIXME: We could generate the message in a Django signal handler. - # OR, we could call an explicit commit in the view and then send the - # message. - - retries = 10 - retry = 0 - while not UnifiedJob.objects.filter(id=job_id).exists(): - time.sleep(0.3) - - if retry >= retries: - logger.error("Failed to process 'job_launch' message for job %d" % job_id) - # ack the message so we don't build up the queue. - # - # The job can still be chosen to run during tower startup or - # when another job is started or completes - return - retry += 1 - - # "Safe" to get the job now since it exists. - # Really, there is a race condition from exists to get - - # TODO: while not loop should call get wrapped in a try except - #job = UnifiedJob.objects.get(id=job_id) - ''' - Scheduler().schedule() @task def run_job_complete(job_id): - ''' - # TODO: use list of finished status from jobs.py or unified_jobs.py - finished_status = ['successful', 'error', 'failed', 'completed'] - q = UnifiedJob.objects.filter(id=job_id) - - # Ensure that the job is updated in the database before we call to - # schedule the next job. - retries = 10 - retry = 0 - while True: - # Job not found, most likely deleted. That's fine - if not q.exists(): - logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % job_id) - break - - job = q[0] - if job.status in finished_status: - break - - time.sleep(0.3) - - if retry >= retries: - logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) - return - retry += 1 - ''' - Scheduler().schedule() @task diff --git a/awx/main/tests/functional/test_partial.py b/awx/main/tests/functional/test_partial.py index 69ad71c4df..0ab84dc901 100644 --- a/awx/main/tests/functional/test_partial.py +++ b/awx/main/tests/functional/test_partial.py @@ -6,44 +6,48 @@ from datetime import timedelta # AWX from awx.main.models import ( + Organization, + Inventory, + Group, Project, ProjectUpdate, + InventoryUpdate, + InventorySource, ) from awx.main.scheduler.partial import ( ProjectUpdateLatestDict, + InventoryUpdateDict, + InventoryUpdateLatestDict, ) - @pytest.fixture -def failed_project_update(): - p = Project.objects.create(name="proj1") - pu = ProjectUpdate.objects.create(project=p, status='failed', finished=tz_now() - timedelta(seconds=20)) - - return (p, pu) - -@pytest.fixture -def successful_project_update(): - p = Project.objects.create(name="proj1") - pu = ProjectUpdate.objects.create(project=p, status='successful', finished=tz_now() - timedelta(seconds=20)) - - return (p, pu) - -# Failed project updates newer than successful ones -@pytest.fixture -def multiple_project_updates(): - p = Project.objects.create(name="proj1") - - epoch = tz_now() - - successful_pus = [ProjectUpdate.objects.create(project=p, - status='successful', - finished=epoch - timedelta(seconds=100 + i)) for i in xrange(0, 5)] - failed_pus = [ProjectUpdate.objects.create(project=p, - status='failed', - finished=epoch - timedelta(seconds=100 - len(successful_pus) + i)) for i in xrange(0, 5)] - return (p, failed_pus, successful_pus) +def org(): + return Organization.objects.create(name="org1") class TestProjectUpdateLatestDictDict(): + @pytest.fixture + def successful_project_update(self): + p = Project.objects.create(name="proj1") + pu = ProjectUpdate.objects.create(project=p, status='successful', finished=tz_now() - timedelta(seconds=20)) + + return (p, pu) + + # Failed project updates newer than successful ones + @pytest.fixture + def multiple_project_updates(self): + p = Project.objects.create(name="proj1") + + epoch = tz_now() + + successful_pus = [ProjectUpdate.objects.create(project=p, + status='successful', + finished=epoch - timedelta(seconds=100 + i)) for i in xrange(0, 5)] + failed_pus = [ProjectUpdate.objects.create(project=p, + status='failed', + finished=epoch - timedelta(seconds=100 - len(successful_pus) + i)) for i in xrange(0, 5)] + return (p, failed_pus, successful_pus) + + @pytest.mark.django_db class TestFilterPartial(): def test_project_update_successful(self, successful_project_update): @@ -63,3 +67,54 @@ class TestProjectUpdateLatestDictDict(): assert failed_pus[0].id == tasks[0]['id'] +class TestInventoryUpdateDict(): + @pytest.fixture + def waiting_inventory_update(self, org): + i = Inventory.objects.create(name='inv1', organization=org) + g = Group.objects.create(name='group1', inventory=i) + #Inventory.groups.add(g) + inv_src = InventorySource.objects.create(group=g) + iu = InventoryUpdate.objects.create(inventory_source=inv_src, status='waiting') + return iu + + @pytest.mark.django_db + class TestFilterPartial(): + def test_simple(self, waiting_inventory_update): + tasks = InventoryUpdateDict.filter_partial(status=['waiting']) + + assert 1 == len(tasks) + assert waiting_inventory_update.id == tasks[0]['id'] + +class TestInventoryUpdateLatestDict(): + @pytest.fixture + def inventory(self, org): + i = Inventory.objects.create(name='inv1', organization=org) + return i + + @pytest.fixture + def inventory_updates(self, inventory): + g1 = Group.objects.create(name='group1', inventory=inventory) + g2 = Group.objects.create(name='group2', inventory=inventory) + g3 = Group.objects.create(name='group3', inventory=inventory) + + inv_src1 = InventorySource.objects.create(group=g1, update_on_launch=True, inventory=inventory) + inv_src2 = InventorySource.objects.create(group=g2, update_on_launch=False, inventory=inventory) + inv_src3 = InventorySource.objects.create(group=g3, update_on_launch=True, inventory=inventory) + + iu1 = InventoryUpdate.objects.create(inventory_source=inv_src1, status='successful') + iu2 = InventoryUpdate.objects.create(inventory_source=inv_src2, status='waiting') + iu3 = InventoryUpdate.objects.create(inventory_source=inv_src3, status='waiting') + return [iu1, iu2, iu3] + + @pytest.mark.django_db + def test_filter_partial(self, inventory, inventory_updates): + + tasks = InventoryUpdateLatestDict.filter_partial([inventory.id]) + + inventory_updates_expected = [inventory_updates[0], inventory_updates[2]] + + assert 2 == len(tasks) + for i, inventory_update in enumerate(inventory_updates_expected): + assert inventory_update.id == tasks[i]['id'] + + diff --git a/awx/main/tests/unit/scheduler/conftest.py b/awx/main/tests/unit/scheduler/conftest.py new file mode 100644 index 0000000000..d8a71d456e --- /dev/null +++ b/awx/main/tests/unit/scheduler/conftest.py @@ -0,0 +1,238 @@ + +# Python +import pytest +from datetime import timedelta + +# Django +from django.utils.timezone import now as tz_now + +# awx +from awx.main.scheduler.partial import ( + JobDict, + ProjectUpdateDict, + InventoryUpdateDict, + InventorySourceDict, +) +from awx.main.scheduler import Scheduler + + +@pytest.fixture +def epoch(): + return tz_now() + +@pytest.fixture +def scheduler_factory(mocker, epoch): + def fn(tasks=[], inventory_sources=[], latest_project_updates=[], latest_inventory_updates=[], create_project_update=None, create_inventory_update=None): + sched = Scheduler() + sched.capacity_total = 999999999 + + sched.graph.get_now = lambda: epoch + + def no_create_inventory_update(task, ignore): + raise RuntimeError("create_inventory_update should not be called") + def no_create_project_update(task): + raise RuntimeError("create_project_update should not be called") + + mocker.patch.object(sched, 'get_tasks', return_value=tasks) + mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources) + mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates) + mocker.patch.object(sched, 'get_latest_inventory_update_tasks', return_value=latest_inventory_updates) + create_project_update_mock = mocker.patch.object(sched, 'create_project_update', return_value=create_project_update) + create_inventory_update_mock = mocker.patch.object(sched, 'create_inventory_update', return_value=create_inventory_update) + mocker.patch.object(sched, 'start_task') + + if not create_project_update: + create_project_update_mock.side_effect = no_create_project_update + if not create_inventory_update: + create_inventory_update_mock.side_effect = no_create_inventory_update + return sched + return fn + +@pytest.fixture +def project_update_factory(epoch): + def fn(): + return ProjectUpdateDict({ + 'id': 1, + 'created': epoch - timedelta(seconds=100), + 'project_id': 1, + 'project__scm_update_cache_timeout': 0, + 'celery_task_id': '', + 'launch_type': 'dependency', + 'project__scm_update_on_launch': True, + }) + return fn + +@pytest.fixture +def pending_project_update(project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'pending' + return project_update + +@pytest.fixture +def waiting_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'waiting' + return project_update + +@pytest.fixture +def running_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'running' + return project_update + +@pytest.fixture +def successful_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['finished'] = epoch - timedelta(seconds=90) + project_update['status'] = 'successful' + return project_update + +@pytest.fixture +def successful_project_update_cache_expired(epoch, project_update_factory): + project_update = project_update_factory() + + project_update['status'] = 'successful' + project_update['created'] = epoch - timedelta(seconds=120) + project_update['finished'] = epoch - timedelta(seconds=110) + project_update['project__scm_update_cache_timeout'] = 1 + return project_update + +@pytest.fixture +def failed_project_update(epoch, project_update_factory): + project_update = project_update_factory() + project_update['finished'] = epoch - timedelta(seconds=90) + project_update['status'] = 'failed' + return project_update + +@pytest.fixture +def inventory_update_factory(epoch): + def fn(): + return InventoryUpdateDict({ + 'id': 1, + 'created': epoch - timedelta(seconds=101), + 'inventory_id': 1, + 'celery_task_id': '', + 'status': 'pending', + 'launch_type': 'dependency', + 'inventory_source_id': 1, + 'inventory_source__inventory_id': 1, + }) + return fn + +@pytest.fixture +def inventory_update_latest_factory(epoch): + def fn(): + return InventoryUpdateDict({ + 'id': 1, + 'created': epoch - timedelta(seconds=101), + 'inventory_id': 1, + 'celery_task_id': '', + 'status': 'pending', + 'launch_type': 'dependency', + 'inventory_source_id': 1, + 'finished': None, + }) + return fn + +@pytest.fixture +def inventory_update_latest(inventory_update_latest_factory): + return inventory_update_latest_factory() + +@pytest.fixture +def successful_inventory_update_latest(inventory_update_latest_factory): + iu = inventory_update_latest_factory() + iu['status'] = 'successful' + iu['finished'] = iu['created'] + timedelta(seconds=10) + return iu + +@pytest.fixture +def failed_inventory_update_latest(inventory_update_latest_factory): + iu = inventory_update_latest_factory() + iu['status'] = 'failed' + return iu + +@pytest.fixture +def pending_inventory_update(epoch, inventory_update_factory): + inventory_update = inventory_update_factory() + inventory_update['status'] = 'pending' + return inventory_update + +@pytest.fixture +def waiting_inventory_update(epoch, inventory_update_factory): + inventory_update = inventory_update_factory() + inventory_update['status'] = 'waiting' + return inventory_update + +@pytest.fixture +def failed_inventory_update(epoch, inventory_update_factory): + inventory_update = inventory_update_factory() + inventory_update['status'] = 'failed' + return inventory_update + +@pytest.fixture +def running_inventory_update(epoch, inventory_update_factory): + inventory_update = inventory_update_factory() + inventory_update['status'] = 'running' + return inventory_update + +@pytest.fixture +def successful_inventory_update(epoch, inventory_update_factory): + inventory_update = inventory_update_factory() + inventory_update['finished'] = epoch - timedelta(seconds=90) + inventory_update['status'] = 'successful' + return inventory_update + +''' +Job +''' +@pytest.fixture +def job_factory(epoch): + def fn(project__scm_update_on_launch=True, inventory__inventory_sources=[]): + return JobDict({ + 'id': 1, + 'status': 'pending', + 'job_template_id': 1, + 'project_id': 1, + 'inventory_id': 1, + 'launch_type': 'manual', + 'allow_simultaneous': False, + 'created': epoch - timedelta(seconds=99), + 'celery_task_id': '', + 'project__scm_update_on_launch': project__scm_update_on_launch, + 'inventory__inventory_sources': inventory__inventory_sources, + 'forks': 5 + }) + return fn + +@pytest.fixture +def pending_job(job_factory): + job = job_factory() + job['status'] = 'pending' + return job + +@pytest.fixture +def running_job(job_factory): + job = job_factory() + job['status'] = 'running' + return job + +''' +Inventory id -> [InventorySourceDict, ...] +''' +@pytest.fixture +def inventory_source_factory(): + def fn(id=1): + return InventorySourceDict({ + 'id': id, + }) + return fn + +@pytest.fixture +def inventory_id_sources(inventory_source_factory): + return [ + (1, [ + inventory_source_factory(id=1), + inventory_source_factory(id=2), + ]), + ] + diff --git a/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py new file mode 100644 index 0000000000..09125df527 --- /dev/null +++ b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py @@ -0,0 +1,85 @@ + +# Python +import pytest +from datetime import timedelta + +@pytest.fixture +def pending_job(job_factory): + return job_factory(project__scm_update_on_launch=False, inventory__inventory_sources=['1']) + +@pytest.fixture +def successful_inventory_update_latest(inventory_update_latest_factory): + iu = inventory_update_latest_factory() + iu['inventory_source__update_cache_timeout'] = 100 + iu['status'] = 'successful' + iu['finished'] = iu['created'] + timedelta(seconds=10) + return iu + +@pytest.fixture +def successful_inventory_update_latest_cache_expired(inventory_update_latest_factory): + iu = inventory_update_latest_factory() + iu['inventory_source__update_cache_timeout'] = 1 + iu['finished'] = iu['created'] + timedelta(seconds=2) + return iu + +class TestStartInventoryUpdate(): + def test_pending(self, scheduler_factory, pending_inventory_update): + scheduler = scheduler_factory(tasks=[pending_inventory_update]) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_inventory_update) + +class TestInventoryUpdateBlocked(): + def test_running_inventory_update(self, epoch, scheduler_factory, running_inventory_update, pending_inventory_update): + running_inventory_update['created'] = epoch - timedelta(seconds=100) + pending_inventory_update['created'] = epoch - timedelta(seconds=90) + + scheduler = scheduler_factory(tasks=[running_inventory_update, pending_inventory_update]) + + scheduler._schedule() + + def test_waiting_inventory_update(self, epoch, scheduler_factory, waiting_inventory_update, pending_inventory_update): + waiting_inventory_update['created'] = epoch - timedelta(seconds=100) + pending_inventory_update['created'] = epoch - timedelta(seconds=90) + + scheduler = scheduler_factory(tasks=[waiting_inventory_update, pending_inventory_update]) + + scheduler._schedule() + +class TestCreateDependentInventoryUpdate(): + + def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources): + scheduler = scheduler_factory(tasks=[pending_job], + create_inventory_update=waiting_inventory_update, + inventory_sources=inventory_id_sources) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job]) + + def test_cache_hit(self, scheduler_factory, pending_job, successful_inventory_update, successful_inventory_update_latest): + scheduler = scheduler_factory(tasks=[successful_inventory_update, pending_job], + latest_inventory_updates=[successful_inventory_update_latest]) + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_job) + + def test_cache_miss(self, scheduler_factory, pending_job, successful_inventory_update, successful_inventory_update_latest_cache_expired, waiting_inventory_update, inventory_id_sources): + scheduler = scheduler_factory(tasks=[successful_inventory_update, pending_job], + latest_inventory_updates=[successful_inventory_update_latest_cache_expired], + create_inventory_update=waiting_inventory_update, + inventory_sources=inventory_id_sources) + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job]) + + def test_last_update_failed(self, scheduler_factory, pending_job, failed_inventory_update, failed_inventory_update_latest, waiting_inventory_update, inventory_id_sources): + scheduler = scheduler_factory(tasks=[failed_inventory_update, pending_job], + latest_inventory_updates=[failed_inventory_update_latest], + create_inventory_update=waiting_inventory_update, + inventory_sources=inventory_id_sources) + scheduler._schedule() + + scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job]) + diff --git a/awx/main/tests/unit/scheduler/test_scheduler_job.py b/awx/main/tests/unit/scheduler/test_scheduler_job.py new file mode 100644 index 0000000000..37af2ead05 --- /dev/null +++ b/awx/main/tests/unit/scheduler/test_scheduler_job.py @@ -0,0 +1,66 @@ + +# Python +import pytest +from datetime import timedelta + +# awx +from awx.main.scheduler.partial import ( + JobDict, + ProjectUpdateDict, +) + +# TODO: wherever get_latest_rpoject_update_task() is stubbed and returns a +# ProjectUpdateDict. We should instead return a ProjectUpdateLatestDict() +# For now, this is ok since the fields on deviate that much. + +class TestJobBlocked(): + def test_inventory_update_waiting(self, scheduler_factory, waiting_inventory_update, pending_job): + scheduler = scheduler_factory(tasks=[waiting_inventory_update, pending_job]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + + def test_inventory_update_running(self, scheduler_factory, running_inventory_update, pending_job, inventory_source_factory, inventory_id_sources): + scheduler = scheduler_factory(tasks=[running_inventory_update, pending_job], + inventory_sources=inventory_id_sources) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + + def test_project_update_running(self, scheduler_factory, pending_job, running_project_update): + scheduler = scheduler_factory(tasks=[running_project_update, pending_job]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + assert scheduler.create_project_update.call_count == 0 + + def test_project_update_waiting(self, scheduler_factory, pending_job, waiting_project_update): + scheduler = scheduler_factory(tasks=[waiting_project_update, pending_job], + latest_project_updates=[waiting_project_update]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() + assert scheduler.create_project_update.call_count == 0 + +class TestJob(): + @pytest.fixture + def successful_project_update(self, project_update_factory): + project_update = project_update_factory() + project_update['status'] = 'successful' + project_update['finished'] = project_update['created'] + timedelta(seconds=10) + project_update['project__scm_update_cache_timeout'] = 3600 + return project_update + + def test_existing_dependencies_finished(self, scheduler_factory, successful_project_update, successful_inventory_update_latest, pending_job): + scheduler = scheduler_factory(tasks=[successful_project_update, pending_job], + latest_project_updates=[successful_project_update], + latest_inventory_updates=[successful_inventory_update_latest]) + + scheduler._schedule() + + scheduler.start_task.assert_called_with(pending_job) + diff --git a/awx/main/tests/unit/scheduler/test_scheduler_project_update.py b/awx/main/tests/unit/scheduler/test_scheduler_project_update.py index 54add63d51..e0fcbc3b1e 100644 --- a/awx/main/tests/unit/scheduler/test_scheduler_project_update.py +++ b/awx/main/tests/unit/scheduler/test_scheduler_project_update.py @@ -17,98 +17,6 @@ from awx.main.scheduler import Scheduler # ProjectUpdateDict. We should instead return a ProjectUpdateLatestDict() # For now, this is ok since the fields on deviate that much. -@pytest.fixture -def epoch(): - return tz_now() - - -@pytest.fixture -def scheduler_factory(mocker, epoch): - def fn(tasks=[], latest_project_updates=[], create_project_update=None): - sched = Scheduler() - sched.capacity_total = 999999999 - - sched.graph.get_now = lambda: epoch - - mocker.patch.object(sched, 'get_tasks', return_value=tasks) - mocker.patch.object(sched, 'get_latest_project_update_tasks', return_value=latest_project_updates) - mocker.patch.object(sched, 'create_project_update', return_value=create_project_update) - mocker.patch.object(sched, 'start_task') - return sched - return fn - -@pytest.fixture -def project_update_factory(epoch): - def fn(): - return ProjectUpdateDict({ - 'id': 1, - 'created': epoch - timedelta(seconds=100), - 'project_id': 1, - 'project__scm_update_cache_timeout': 0, - 'celery_task_id': '', - 'launch_type': 'dependency', - 'project__scm_update_on_launch': True, - }) - return fn - -@pytest.fixture -def pending_project_update(project_update_factory): - project_update = project_update_factory() - project_update['status'] = 'pending' - return project_update - -@pytest.fixture -def waiting_project_update(epoch, project_update_factory): - project_update = project_update_factory() - project_update['status'] = 'waiting' - return project_update - -@pytest.fixture -def pending_job(epoch): - return JobDict({ - 'id': 1, - 'status': 'pending', - 'job_template_id': 1, - 'project_id': 1, - 'inventory_id': 1, - 'launch_type': 'manual', - 'allow_simultaneous': False, - 'created': epoch - timedelta(seconds=99), - 'celery_task_id': '', - 'project__scm_update_on_launch': True, - 'forks': 5 - }) - -@pytest.fixture -def running_project_update(epoch, project_update_factory): - project_update = project_update_factory() - project_update['status'] = 'running' - return project_update - -@pytest.fixture -def successful_project_update(epoch, project_update_factory): - project_update = project_update_factory() - project_update['finished'] = epoch - timedelta(seconds=90) - project_update['status'] = 'successful' - return project_update - -@pytest.fixture -def successful_project_update_cache_expired(epoch, project_update_factory): - project_update = project_update_factory() - - project_update['status'] = 'successful' - project_update['created'] = epoch - timedelta(seconds=120) - project_update['finished'] = epoch - timedelta(seconds=110) - project_update['project__scm_update_cache_timeout'] = 1 - return project_update - -@pytest.fixture -def failed_project_update(epoch, project_update_factory): - project_update = project_update_factory() - project_update['finished'] = epoch - timedelta(seconds=90) - project_update['status'] = 'failed' - return project_update - class TestStartProjectUpdate(): def test(self, scheduler_factory, pending_project_update): scheduler = scheduler_factory(tasks=[pending_project_update]) @@ -164,31 +72,18 @@ class TestCreateDependentProjectUpdate(): scheduler.start_task.assert_called_with(waiting_project_update, [pending_job]) - -class TestJobBlockedOnProjectUpdate(): - def test(self, scheduler_factory, pending_job, waiting_project_update): - scheduler = scheduler_factory(tasks=[waiting_project_update, pending_job], - latest_project_updates=[waiting_project_update]) - - scheduler._schedule() - - scheduler.start_task.assert_not_called() - assert scheduler.create_project_update.call_count == 0 - - def test_project_running(self, scheduler_factory, pending_job, running_project_update): - scheduler = scheduler_factory(tasks=[running_project_update, pending_job]) - - scheduler._schedule() - - scheduler.start_task.assert_not_called() - assert scheduler.create_project_update.call_count == 0 - class TestProjectUpdateBlocked(): - def test(self, scheduler_factory, running_project_update, pending_project_update): - scheduler = scheduler_factory(tasks=[running_project_update, pending_project_update], - latest_project_updates=[running_project_update]) + def test_projct_update_running(self, scheduler_factory, running_project_update, pending_project_update): + scheduler = scheduler_factory(tasks=[running_project_update, pending_project_update]) scheduler._schedule() scheduler.start_task.assert_not_called() assert scheduler.create_project_update.call_count == 0 + def test_job_running(self, scheduler_factory, running_job, pending_project_update): + scheduler = scheduler_factory(tasks=[running_job, pending_project_update]) + + scheduler._schedule() + + scheduler.start_task.assert_not_called() +