Optimize task manager with debug toolbar, adjust prefetch (#12588)

This commit is contained in:
Alan Rominger
2022-07-28 15:38:26 -04:00
committed by Seth Foster
parent e6f8852b05
commit f7e6a32444
6 changed files with 27 additions and 32 deletions

View File

@@ -189,23 +189,23 @@ class Metrics:
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('task_manager_tasks_started', 'Number of tasks started'), SetIntM('task_manager_tasks_started', 'Number of tasks started'),
SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
] ]
# turn metric list into dictionary with the metric name as a key # turn metric list into dictionary with the metric name as a key
self.METRICS = {} self.METRICS = {}

View File

@@ -403,7 +403,7 @@ class AutoscalePool(WorkerPool):
if current_task and isinstance(current_task, dict): if current_task and isinstance(current_task, dict):
endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager'] endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager']
current_task_name = current_task.get('task', '') current_task_name = current_task.get('task', '')
if any([current_task_name.endswith(e) for e in endings]): if any(current_task_name.endswith(e) for e in endings):
if 'started' not in current_task: if 'started' not in current_task:
w.managed_tasks[current_task['uuid']]['started'] = time.time() w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started'] age = time.time() - current_task['started']

View File

@@ -1,4 +1,4 @@
# Generated by Django 3.2.13 on 2022-07-12 14:33 # Generated by Django 3.2.13 on 2022-08-10 14:03
from django.db import migrations, models from django.db import migrations, models
@@ -10,16 +10,6 @@ class Migration(migrations.Migration):
] ]
operations = [ operations = [
migrations.AddField(
model_name='workflowapproval',
name='expires',
field=models.DateTimeField(
default=None,
editable=False,
help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.',
null=True,
),
),
migrations.AddField( migrations.AddField(
model_name='unifiedjob', model_name='unifiedjob',
name='preferred_instance_groups_cache', name='preferred_instance_groups_cache',
@@ -30,6 +20,16 @@ class Migration(migrations.Migration):
migrations.AddField( migrations.AddField(
model_name='unifiedjob', model_name='unifiedjob',
name='task_impact', name='task_impact',
field=models.PositiveIntegerField(default=0, editable=False), field=models.PositiveIntegerField(default=0, editable=False, help_text='Number of forks an instance consumes when running this job.'),
),
migrations.AddField(
model_name='workflowapproval',
name='expires',
field=models.DateTimeField(
default=None,
editable=False,
help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.',
null=True,
),
), ),
] ]

View File

@@ -338,10 +338,8 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES) active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES)
failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True) failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True)
total_hosts = active_hosts.count() total_hosts = active_hosts.count()
if total_hosts != self.total_hosts: # if total_hosts has changed, set update_task_impact to True
update_task_impact = True update_task_impact = total_hosts != self.total_hosts
else:
update_task_impact = False
computed_fields = { computed_fields = {
'has_active_failures': bool(failed_hosts.count()), 'has_active_failures': bool(failed_hosts.count()),
'total_hosts': total_hosts, 'total_hosts': total_hosts,

View File

@@ -706,10 +706,7 @@ class UnifiedJob(
editable=False, editable=False,
help_text=_("A cached list with pk values from preferred instance groups."), help_text=_("A cached list with pk values from preferred instance groups."),
) )
task_impact = models.PositiveIntegerField( task_impact = models.PositiveIntegerField(default=0, editable=False, help_text=_("Number of forks an instance consumes when running this job."))
default=0,
editable=False,
)
organization = models.ForeignKey( organization = models.ForeignKey(
'Organization', 'Organization',
blank=True, blank=True,

View File

@@ -92,7 +92,7 @@ class TaskBase:
.exclude(launch_type='sync') .exclude(launch_type='sync')
.exclude(polymorphic_ctype_id=wf_approval_ctype_id) .exclude(polymorphic_ctype_id=wf_approval_ctype_id)
.order_by('created') .order_by('created')
.prefetch_related('instance_group') .prefetch_related('dependent_jobs')
) )
self.all_tasks = [t for t in qs] self.all_tasks = [t for t in qs]
@@ -100,7 +100,7 @@ class TaskBase:
if not settings.IS_TESTING(): if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other # increment task_manager_schedule_calls regardless if the other
# metrics are recorded # metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1) s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
# Only record metrics if the last time recording was more # Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a # Prevents a short-duration task manager that runs directly after a
@@ -216,7 +216,7 @@ class WorkflowManager(TaskBase):
job.job_explanation = gettext_noop( job.job_explanation = gettext_noop(
"Workflow Job spawned from workflow could not start because it " "Workflow Job spawned from workflow could not start because it "
"would result in recursion (spawn order, most recent first: {})" "would result in recursion (spawn order, most recent first: {})"
).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) ).format(', '.join('<{}>'.format(tmp) for tmp in display_list))
else: else:
logger.debug( logger.debug(
'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format(
@@ -226,7 +226,7 @@ class WorkflowManager(TaskBase):
if not job._resources_sufficient_for_launch(): if not job._resources_sufficient_for_launch():
can_start = False can_start = False
job.job_explanation = gettext_noop( job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" "Job spawned from workflow could not start because it was missing a related resource such as project or inventory"
) )
if can_start: if can_start:
if workflow_job.start_args: if workflow_job.start_args:
@@ -236,7 +236,7 @@ class WorkflowManager(TaskBase):
can_start = job.signal_start(**start_args) can_start = job.signal_start(**start_args)
if not can_start: if not can_start:
job.job_explanation = gettext_noop( job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" "Job spawned from workflow could not start because it was not in the right state or required manual credentials"
) )
if not can_start: if not can_start:
job.status = 'failed' job.status = 'failed'