Cache preferred instance groups

When creating unified job, stash the list of pk values from the
instance groups returned from preferred_instance_groups so that the
task management system does not need to call out to this method
repeatedly.

.preferred_instance_groups_cache is the new field
This commit is contained in:
Seth Foster 2022-07-11 15:07:55 -04:00
parent b94b3a1e91
commit 957b2b7188
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028
8 changed files with 62 additions and 6 deletions

View File

@ -20,4 +20,11 @@ class Migration(migrations.Migration):
null=True,
),
),
migrations.AddField(
model_name='unifiedjob',
name='preferred_instance_groups_cache',
field=models.JSONField(
blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True
),
),
]

View File

@ -90,6 +90,9 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
extra_vars_dict = VarsDictProperty('extra_vars', True)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
def clean_inventory(self):
inv = self.inventory
if not inv:

View File

@ -1213,6 +1213,9 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
extra_vars_dict = VarsDictProperty('extra_vars', True)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@classmethod
def _get_parent_field_name(cls):
return 'system_job_template'

View File

@ -513,6 +513,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
help_text=_('The SCM Revision discovered by this update for the given project and branch.'),
)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
def _get_parent_field_name(self):
return 'project'

View File

@ -381,6 +381,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
unified_job.survey_passwords = new_job_passwords
kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch
unified_job.preferred_instance_groups_cache = [ig.pk for ig in unified_job.preferred_instance_groups]
unified_job._set_default_dependencies_processed()
from awx.main.signals import disable_activity_stream, activity_stream_create
with disable_activity_stream():
@ -693,6 +697,13 @@ class UnifiedJob(
on_delete=polymorphic.SET_NULL,
help_text=_('The Instance group the job was run under'),
)
preferred_instance_groups_cache = models.JSONField(
blank=True,
null=True,
default=None,
editable=False,
help_text=_("A cached list with pk values from preferred instance groups."),
)
organization = models.ForeignKey(
'Organization',
blank=True,
@ -808,6 +819,9 @@ class UnifiedJob(
update_fields = self._update_parent_instance_no_save(parent_instance)
parent_instance.save(update_fields=update_fields)
def _set_default_dependencies_processed(self):
pass
def save(self, *args, **kwargs):
"""Save the job, with current status, to the database.
Ensure that all data is consistent before doing so.
@ -1514,8 +1528,8 @@ class UnifiedJob(
'state': state,
'work_unit_id': self.work_unit_id,
}
if self.unified_job_template:
extra["template_name"] = self.unified_job_template.name
if self.name:
extra["task_name"] = self.name
if state == "blocked" and blocked_by:
blocked_by_msg = f"{blocked_by._meta.model_name}-{blocked_by.id}"
msg = f"{self._meta.model_name}-{self.id} blocked by {blocked_by_msg}"

View File

@ -623,6 +623,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
)
is_sliced_job = models.BooleanField(default=False)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@property
def workflow_nodes(self):
return self.workflow_job_nodes
@ -800,6 +803,9 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
on_delete=models.SET_NULL,
)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@classmethod
def _get_unified_job_template_class(cls):
return WorkflowApprovalTemplate

View File

@ -512,6 +512,12 @@ class TaskManager(TaskBase):
ScheduleTaskManager().schedule()
from awx.main.tasks.system import handle_work_error, handle_work_success
# update capacity for control node and execution node
if task.controller_node:
self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
if task.execution_node:
self.instances[task.execution_node].consume_capacity(task.task_impact)
dependent_tasks = dependent_tasks or []
task_actual = {
@ -598,7 +604,8 @@ class TaskManager(TaskBase):
continue
found_acceptable_queue = False
preferred_instance_groups = task.preferred_instance_groups
preferred_instance_groups = self.instance_groups.get_instance_groups_from_task_cache(task)
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
@ -618,7 +625,6 @@ class TaskManager(TaskBase):
# All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups
if task.capacity_type == 'control':
task.execution_node = control_instance.hostname
control_instance.consume_capacity(control_impact)
execution_instance = self.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
@ -649,9 +655,7 @@ class TaskManager(TaskBase):
control_instance = execution_instance
task.controller_node = execution_instance.hostname
control_instance.consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
task.log_lifecycle("controller_node_chosen")
execution_instance.consume_capacity(task.task_impact)
task.log_lifecycle("execution_node_chosen")
logger.debug(
"Starting {} in group {} instance {} (remaining_capacity={})".format(

View File

@ -67,6 +67,7 @@ class TaskManagerInstanceGroups:
def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None):
self.instance_groups = dict()
self.controlplane_ig = None
self.pk_ig_map = dict()
if instance_groups is not None: # for testing
self.instance_groups = instance_groups
@ -81,6 +82,7 @@ class TaskManagerInstanceGroups:
instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname
],
)
self.pk_ig_map[instance_group.pk] = instance_group
def get_remaining_capacity(self, group_name):
instances = self.instance_groups[group_name]['instances']
@ -121,3 +123,17 @@ class TaskManagerInstanceGroups:
elif i.capacity > largest_instance.capacity:
largest_instance = i
return largest_instance
def get_instance_groups_from_task_cache(self, task):
igs = []
if task.preferred_instance_groups_cache:
for pk in task.preferred_instance_groups_cache:
ig = self.pk_ig_map.get(pk, None)
if ig:
igs.append(ig)
else:
logger.warn(f"Unknown instance group with pk {pk} for task {task}")
if len(igs) == 0:
logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}")
return task.global_instance_groups
return igs