From 97d03e434ee08710cf326bcdf470ddcce396ad67 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Tue, 11 Mar 2025 11:40:38 -0400 Subject: [PATCH] Add concept of priority to job templates and jobs This adds concept of priority to jobs. The task manager now orders on priority, then created. All rules around instance group capacity etc still apply. So even if a job has very high priority, if there is not available capacity in the available instance groups, it will not be scheduled. Higher number is higher priority. Default priority is 0. For dependencies spawned from other jobs, assign them the priority of the job that caused them to be created. Still need to add prompt on launch stuff for priority to be consistent. --- ...ob_priority_unifiedjobtemplate_priority.py | 27 +++++++++++++++++++ awx/main/models/jobs.py | 3 ++- awx/main/models/projects.py | 2 +- awx/main/models/unified_jobs.py | 10 +++++++ awx/main/models/workflow.py | 2 +- awx/main/scheduler/task_manager.py | 14 +++++----- 6 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 awx/main/migrations/0202_unifiedjob_priority_unifiedjobtemplate_priority.py diff --git a/awx/main/migrations/0202_unifiedjob_priority_unifiedjobtemplate_priority.py b/awx/main/migrations/0202_unifiedjob_priority_unifiedjobtemplate_priority.py new file mode 100644 index 0000000000..97ec428654 --- /dev/null +++ b/awx/main/migrations/0202_unifiedjob_priority_unifiedjobtemplate_priority.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.16 on 2025-03-11 14:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0201_delete_token_cleanup_job'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='priority', + field=models.PositiveIntegerField( + default=0, + editable=False, + help_text='Relative priority to other jobs. The higher the number, the higher the priority. Jobs with equivalent prioirty are started based on available capacity and launch time.', + ), + ), + migrations.AddField( + model_name='unifiedjobtemplate', + name='priority', + field=models.PositiveIntegerField(default=0), + ), + ] diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 6fde8d159e..f46d0a9a6a 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -298,6 +298,7 @@ class JobTemplate( 'organization', 'survey_passwords', 'labels', + 'priority', 'credentials', 'job_slice_number', 'job_slice_count', @@ -1175,7 +1176,7 @@ class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions): @classmethod def _get_unified_job_field_names(cls): - return ['name', 'description', 'organization', 'job_type', 'extra_vars'] + return ['name', 'description', 'organization', 'priority', 'job_type', 'extra_vars'] def get_absolute_url(self, request=None): return reverse('api:system_job_template_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index d0b22830ee..a1bb73460b 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -354,7 +354,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin, CustomVirtualEn @classmethod def _get_unified_job_field_names(cls): - return set(f.name for f in ProjectOptions._meta.fields) | set(['name', 'description', 'organization']) + return set(f.name for f in ProjectOptions._meta.fields) | set(['name', 'description', 'priority', 'organization']) def clean_organization(self): if self.pk: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index da9bca0df2..bf2c51ecd8 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -118,6 +118,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn default=None, editable=False, ) + priority = models.PositiveIntegerField( + null=False, + default=0, + editable=True, + ) current_job = models.ForeignKey( 'UnifiedJob', null=True, @@ -585,6 +590,11 @@ class UnifiedJob( default=None, editable=False, ) + priority = models.PositiveIntegerField( + default=0, + editable=False, + help_text=_("Relative priority to other jobs. The higher the number, the higher the priority. Jobs with equivalent prioirty are started based on available capacity and launch time."), + ) emitted_events = models.PositiveIntegerField( default=0, editable=False, diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 88df81faf0..5eea27a321 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -416,7 +416,7 @@ class WorkflowJobOptions(LaunchTimeConfigBase): @classmethod def _get_unified_job_field_names(cls): r = set(f.name for f in WorkflowJobOptions._meta.fields) | set( - ['name', 'description', 'organization', 'survey_passwords', 'labels', 'limit', 'scm_branch', 'job_tags', 'skip_tags'] + ['name', 'description', 'organization', 'survey_passwords', 'labels', 'limit', 'scm_branch', 'priority', 'job_tags', 'skip_tags'] ) r.remove('char_prompts') # needed due to copying launch config to launch config return r diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 835e99d4b0..3988248dd9 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -97,7 +97,7 @@ class TaskBase: UnifiedJob.objects.filter(**filter_args) .exclude(launch_type='sync') .exclude(polymorphic_ctype_id=wf_approval_ctype_id) - .order_by('created') + .order_by('-priority','created') .prefetch_related('dependent_jobs') ) self.all_tasks = [t for t in qs] @@ -286,7 +286,7 @@ class WorkflowManager(TaskBase): @timeit def get_tasks(self, filter_args): - self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args)] + self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args).order_by('-priority', 'created')] @timeit def _schedule(self): @@ -336,12 +336,14 @@ class DependencyManager(TaskBase): return bool(((update.finished + timedelta(seconds=cache_timeout))) < tz_now()) - def get_or_create_project_update(self, project_id): + def get_or_create_project_update(self, task): + project_id = task.project_id + priority = task.priority project = self.all_projects.get(project_id, None) if project is not None: latest_project_update = project.project_updates.filter(job_type='check').order_by("-created").first() if self.should_update_again(latest_project_update, project.scm_update_cache_timeout): - project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency')) + project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency', priority=priority)) project_task.signal_start() return [project_task] else: @@ -349,7 +351,7 @@ class DependencyManager(TaskBase): return [] def gen_dep_for_job(self, task): - dependencies = self.get_or_create_project_update(task.project_id) + dependencies = self.get_or_create_project_update(task) try: start_args = json.loads(decrypt_field(task, field_name="start_args")) @@ -361,7 +363,7 @@ class DependencyManager(TaskBase): continue latest_inventory_update = inventory_source.inventory_updates.order_by("-created").first() if self.should_update_again(latest_inventory_update, inventory_source.update_cache_timeout): - inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency')) + inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency', priority=task.priority)) inventory_task.signal_start() dependencies.append(inventory_task) else: