From f7e6a32444efa2f14af46367ff09a9dfa68e02bd Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 28 Jul 2022 15:38:26 -0400 Subject: [PATCH] Optimize task manager with debug toolbar, adjust prefetch (#12588) --- awx/main/analytics/subsystem_metrics.py | 12 +++++----- awx/main/dispatch/pool.py | 2 +- .../migrations/0165_task_manager_refactor.py | 24 +++++++++---------- awx/main/models/inventory.py | 6 ++--- awx/main/models/unified_jobs.py | 5 +--- awx/main/scheduler/task_manager.py | 10 ++++---- 6 files changed, 27 insertions(+), 32 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index f63ca1940d..d52e986c63 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -189,23 +189,23 @@ class Metrics: SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), + IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetIntM('task_manager_tasks_started', 'Number of tasks started'), SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), - SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'), SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'), + IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'), + IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), - SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), - SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'), + SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 64a08c3e7f..9741f83a08 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -403,7 +403,7 @@ class AutoscalePool(WorkerPool): if current_task and isinstance(current_task, dict): endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager'] current_task_name = current_task.get('task', '') - if any([current_task_name.endswith(e) for e in endings]): + if any(current_task_name.endswith(e) for e in endings): if 'started' not in current_task: w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] diff --git a/awx/main/migrations/0165_task_manager_refactor.py b/awx/main/migrations/0165_task_manager_refactor.py index f2dd13512d..2df6c6c2c2 100644 --- a/awx/main/migrations/0165_task_manager_refactor.py +++ b/awx/main/migrations/0165_task_manager_refactor.py @@ -1,4 +1,4 @@ -# Generated by Django 3.2.13 on 2022-07-12 14:33 +# Generated by Django 3.2.13 on 2022-08-10 14:03 from django.db import migrations, models @@ -10,16 +10,6 @@ class Migration(migrations.Migration): ] operations = [ - migrations.AddField( - model_name='workflowapproval', - name='expires', - field=models.DateTimeField( - default=None, - editable=False, - help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.', - null=True, - ), - ), migrations.AddField( model_name='unifiedjob', name='preferred_instance_groups_cache', @@ -30,6 +20,16 @@ class Migration(migrations.Migration): migrations.AddField( model_name='unifiedjob', name='task_impact', - field=models.PositiveIntegerField(default=0, editable=False), + field=models.PositiveIntegerField(default=0, editable=False, help_text='Number of forks an instance consumes when running this job.'), + ), + migrations.AddField( + model_name='workflowapproval', + name='expires', + field=models.DateTimeField( + default=None, + editable=False, + help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.', + null=True, + ), ), ] diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 26586a2e6d..e4cfc9f78d 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -338,10 +338,8 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES) failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True) total_hosts = active_hosts.count() - if total_hosts != self.total_hosts: - update_task_impact = True - else: - update_task_impact = False + # if total_hosts has changed, set update_task_impact to True + update_task_impact = total_hosts != self.total_hosts computed_fields = { 'has_active_failures': bool(failed_hosts.count()), 'total_hosts': total_hosts, diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 596eb8a4e0..aa26950844 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -706,10 +706,7 @@ class UnifiedJob( editable=False, help_text=_("A cached list with pk values from preferred instance groups."), ) - task_impact = models.PositiveIntegerField( - default=0, - editable=False, - ) + task_impact = models.PositiveIntegerField(default=0, editable=False, help_text=_("Number of forks an instance consumes when running this job.")) organization = models.ForeignKey( 'Organization', blank=True, diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index bac8379c8e..b219aaf3b0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -92,7 +92,7 @@ class TaskBase: .exclude(launch_type='sync') .exclude(polymorphic_ctype_id=wf_approval_ctype_id) .order_by('created') - .prefetch_related('instance_group') + .prefetch_related('dependent_jobs') ) self.all_tasks = [t for t in qs] @@ -100,7 +100,7 @@ class TaskBase: if not settings.IS_TESTING(): # increment task_manager_schedule_calls regardless if the other # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1) + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) # Only record metrics if the last time recording was more # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. # Prevents a short-duration task manager that runs directly after a @@ -216,7 +216,7 @@ class WorkflowManager(TaskBase): job.job_explanation = gettext_noop( "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + ).format(', '.join('<{}>'.format(tmp) for tmp in display_list)) else: logger.debug( 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( @@ -226,7 +226,7 @@ class WorkflowManager(TaskBase): if not job._resources_sufficient_for_launch(): can_start = False job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + "Job spawned from workflow could not start because it was missing a related resource such as project or inventory" ) if can_start: if workflow_job.start_args: @@ -236,7 +236,7 @@ class WorkflowManager(TaskBase): can_start = job.signal_start(**start_args) if not can_start: job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + "Job spawned from workflow could not start because it was not in the right state or required manual credentials" ) if not can_start: job.status = 'failed'