Add concept of priority to job templates and jobs

This adds concept of priority to jobs.
The task manager now orders on priority, then created.
All rules around instance group capacity etc still apply. So even if a
job has very high priority, if there is not available capacity in the
available instance groups, it will not be scheduled.

Higher number is higher priority.
Default priority is 0.

For dependencies spawned from other jobs, assign them the priority of
the job that caused them to be created.

Still need to add prompt on launch stuff for priority to be consistent.
This commit is contained in:
Elijah DeLee 2025-03-11 11:40:38 -04:00
parent 628a0e6a36
commit 97d03e434e
6 changed files with 49 additions and 9 deletions

View File

@ -0,0 +1,27 @@
# Generated by Django 4.2.16 on 2025-03-11 14:40
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0201_delete_token_cleanup_job'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='priority',
field=models.PositiveIntegerField(
default=0,
editable=False,
help_text='Relative priority to other jobs. The higher the number, the higher the priority. Jobs with equivalent prioirty are started based on available capacity and launch time.',
),
),
migrations.AddField(
model_name='unifiedjobtemplate',
name='priority',
field=models.PositiveIntegerField(default=0),
),
]

View File

@ -298,6 +298,7 @@ class JobTemplate(
'organization',
'survey_passwords',
'labels',
'priority',
'credentials',
'job_slice_number',
'job_slice_count',
@ -1175,7 +1176,7 @@ class SystemJobTemplate(UnifiedJobTemplate, SystemJobOptions):
@classmethod
def _get_unified_job_field_names(cls):
return ['name', 'description', 'organization', 'job_type', 'extra_vars']
return ['name', 'description', 'organization', 'priority', 'job_type', 'extra_vars']
def get_absolute_url(self, request=None):
return reverse('api:system_job_template_detail', kwargs={'pk': self.pk}, request=request)

View File

@ -354,7 +354,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin, CustomVirtualEn
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in ProjectOptions._meta.fields) | set(['name', 'description', 'organization'])
return set(f.name for f in ProjectOptions._meta.fields) | set(['name', 'description', 'priority', 'organization'])
def clean_organization(self):
if self.pk:

View File

@ -118,6 +118,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
default=None,
editable=False,
)
priority = models.PositiveIntegerField(
null=False,
default=0,
editable=True,
)
current_job = models.ForeignKey(
'UnifiedJob',
null=True,
@ -585,6 +590,11 @@ class UnifiedJob(
default=None,
editable=False,
)
priority = models.PositiveIntegerField(
default=0,
editable=False,
help_text=_("Relative priority to other jobs. The higher the number, the higher the priority. Jobs with equivalent prioirty are started based on available capacity and launch time."),
)
emitted_events = models.PositiveIntegerField(
default=0,
editable=False,

View File

@ -416,7 +416,7 @@ class WorkflowJobOptions(LaunchTimeConfigBase):
@classmethod
def _get_unified_job_field_names(cls):
r = set(f.name for f in WorkflowJobOptions._meta.fields) | set(
['name', 'description', 'organization', 'survey_passwords', 'labels', 'limit', 'scm_branch', 'job_tags', 'skip_tags']
['name', 'description', 'organization', 'survey_passwords', 'labels', 'limit', 'scm_branch', 'priority', 'job_tags', 'skip_tags']
)
r.remove('char_prompts') # needed due to copying launch config to launch config
return r

View File

@ -97,7 +97,7 @@ class TaskBase:
UnifiedJob.objects.filter(**filter_args)
.exclude(launch_type='sync')
.exclude(polymorphic_ctype_id=wf_approval_ctype_id)
.order_by('created')
.order_by('-priority','created')
.prefetch_related('dependent_jobs')
)
self.all_tasks = [t for t in qs]
@ -286,7 +286,7 @@ class WorkflowManager(TaskBase):
@timeit
def get_tasks(self, filter_args):
self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args)]
self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args).order_by('-priority', 'created')]
@timeit
def _schedule(self):
@ -336,12 +336,14 @@ class DependencyManager(TaskBase):
return bool(((update.finished + timedelta(seconds=cache_timeout))) < tz_now())
def get_or_create_project_update(self, project_id):
def get_or_create_project_update(self, task):
project_id = task.project_id
priority = task.priority
project = self.all_projects.get(project_id, None)
if project is not None:
latest_project_update = project.project_updates.filter(job_type='check').order_by("-created").first()
if self.should_update_again(latest_project_update, project.scm_update_cache_timeout):
project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency'))
project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency', priority=priority))
project_task.signal_start()
return [project_task]
else:
@ -349,7 +351,7 @@ class DependencyManager(TaskBase):
return []
def gen_dep_for_job(self, task):
dependencies = self.get_or_create_project_update(task.project_id)
dependencies = self.get_or_create_project_update(task)
try:
start_args = json.loads(decrypt_field(task, field_name="start_args"))
@ -361,7 +363,7 @@ class DependencyManager(TaskBase):
continue
latest_inventory_update = inventory_source.inventory_updates.order_by("-created").first()
if self.should_update_again(latest_inventory_update, inventory_source.update_cache_timeout):
inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency'))
inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency', priority=task.priority))
inventory_task.signal_start()
dependencies.append(inventory_task)
else: