From 0a47d05d266183bb8086f7cde03a282f970598de Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 7 Jul 2022 23:23:01 -0400 Subject: [PATCH] split schedule_task_manager into 3 each call to schedule_task_manager becomes one of ScheduleTaskManager ScheduleDependencyManager ScheduleWorkflowManager --- awx/api/views/__init__.py | 4 +- awx/main/models/unified_jobs.py | 4 +- awx/main/models/workflow.py | 6 +- awx/main/scheduler/task_manager.py | 19 +++++-- awx/main/scheduler/tasks.py | 31 +++------- awx/main/tasks/system.py | 6 +- awx/main/utils/common.py | 91 ++++++++++++++++++------------ 7 files changed, 88 insertions(+), 73 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0d46c05834..f67ab6622d 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -93,7 +93,7 @@ from awx.main.utils import ( get_object_or_400, getattrd, get_pk_from_dict, - schedule_task_manager, + ScheduleWorkflowManager, ignore_inventory_computed_fields, ) from awx.main.utils.encryption import encrypt_value @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager() + ScheduleWorkflowManager().schedule() return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 33331a0235..f1702cddba 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -45,7 +45,7 @@ from awx.main.utils.common import ( get_type_for_model, parse_yaml_or_json, getattr_dne, - schedule_task_manager, + ScheduleDependencyManager, get_event_partition_epoch, get_capacity_type, ) @@ -1357,7 +1357,7 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager() + ScheduleDependencyManager().schedule() # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 38896ae827..ea0ef2f9ea 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -40,7 +40,7 @@ from awx.main.models.mixins import ( from awx.main.models.jobs import LaunchTimeConfigBase, LaunchTimeConfig, JobTemplate from awx.main.models.credential import Credential from awx.main.redact import REPLACE_STR -from awx.main.utils import schedule_task_manager +from awx.main.utils import ScheduleWorkflowManager __all__ = [ @@ -816,7 +816,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager() + ScheduleWorkflowManager().schedule() return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -825,7 +825,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager() + ScheduleWorkflowManager().schedule() return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 044f095be5..8c18f5343a 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -33,7 +33,12 @@ from awx.main.models import ( ) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock -from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager +from awx.main.utils import ( + get_type_for_model, + ScheduleTaskManager, + ScheduleDependencyManager, + ScheduleWorkflowManager, +) from awx.main.utils.common import create_partition from awx.main.signals import disable_activity_stream from awx.main.constants import ACTIVE_STATES @@ -118,7 +123,7 @@ class TaskBase: logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") return logger.debug(f"Starting {self.prefix} Scheduler") - with task_manager_bulk_reschedule(): + with self.schedule_manager.task_manager_bulk_reschedule(): # if sigterm due to timeout, still record metrics signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) self._schedule() @@ -128,6 +133,7 @@ class TaskBase: class WorkflowManager(TaskBase): def __init__(self): + self.schedule_manager = ScheduleWorkflowManager() super().__init__(prefix="workflow_manager") @timeit @@ -136,6 +142,7 @@ class WorkflowManager(TaskBase): for workflow_job in workflow_jobs: if self.timed_out(): logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") + self.schedule_manager.schedule() # Do not process any more workflow jobs. Stop here. # Maybe we should schedule another WorkflowManager run break @@ -174,7 +181,7 @@ class WorkflowManager(TaskBase): if status_changed: if workflow_job.spawned_by_workflow: - schedule_task_manager() + ScheduleWorkflowManager().schedule() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') @@ -298,6 +305,7 @@ class WorkflowManager(TaskBase): class DependencyManager(TaskBase): def __init__(self): + self.schedule_manager = ScheduleDependencyManager() super().__init__(prefix="dependency_manager") def create_project_update(self, task, project_id=None): @@ -476,7 +484,7 @@ class DependencyManager(TaskBase): if len(all_sorted_tasks) > 0: self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) self.process_tasks(all_sorted_tasks) - schedule_task_manager() + ScheduleTaskManager().schedule() class TaskManager(TaskBase): @@ -496,6 +504,7 @@ class TaskManager(TaskBase): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.time_delta_job_explanation = timedelta(seconds=30) + self.schedule_manager = ScheduleTaskManager() super().__init__(prefix="task_manager") def after_lock_init(self, all_sorted_tasks): @@ -541,7 +550,7 @@ class TaskManager(TaskBase): self.start_task_limit -= 1 if self.start_task_limit == 0: # schedule another run immediately after this task manager - schedule_task_manager() + ScheduleTaskManager().schedule() from awx.main.tasks.system import handle_work_error, handle_work_success dependent_tasks = dependent_tasks or [] diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 9d18073fc5..b762e1c429 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -13,38 +13,23 @@ from awx.main.dispatch import get_local_queuename logger = logging.getLogger('awx.main.scheduler') -@task(queue=get_local_queuename) -def task_manager(): - prefix = 'task' +def run_manager(manager, prefix): if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") return + manager().schedule() - TaskManager().schedule() + +@task(queue=get_local_queuename) +def task_manager(): + run_manager(TaskManager, "task") @task(queue=get_local_queuename) def dependency_manager(): - prefix = 'dependency' - if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: - logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") - return - DependencyManager().schedule() + run_manager(DependencyManager, "dependency") @task(queue=get_local_queuename) def workflow_manager(): - prefix = 'workflow' - if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: - logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") - return - WorkflowManager().schedule() - - -def run_task_manager(): - if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: - logger.debug("Not running task managers, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") - return - task_manager() - dependency_manager() - workflow_manager() + run_manager(WorkflowManager, "workflow") diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index b828326339..30e1fdddda 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -53,7 +53,7 @@ from awx.main.dispatch import get_local_queuename, reaper from awx.main.utils.common import ( ignore_inventory_computed_fields, ignore_inventory_group_removal, - schedule_task_manager, + ScheduleWorkflowManager, ) from awx.main.utils.external_logging import reconfigure_rsyslog @@ -667,7 +667,7 @@ def handle_work_success(task_actual): if not instance: return - schedule_task_manager() + ScheduleWorkflowManager().schedule() @task(queue=get_local_queuename) @@ -709,7 +709,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager() + ScheduleWorkflowManager().schedule() pass diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index bf29b1f3a2..c0cf302d66 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -78,8 +78,9 @@ __all__ = [ 'IllegalArgumentError', 'get_custom_venv_choices', 'get_external_account', - 'task_manager_bulk_reschedule', - 'schedule_task_manager', + 'ScheduleTaskManager', + 'ScheduleDependencyManager', + 'ScheduleWorkflowManager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout', @@ -846,6 +847,59 @@ def get_mem_effective_capacity(mem_bytes): _inventory_updates = threading.local() _task_manager = threading.local() +_dependency_manager = threading.local() +_workflow_manager = threading.local() + + +class ScheduleManager: + def __init__(self, manager, manager_threading_local): + self.manager = manager + self.manager_threading_local = manager_threading_local + + def schedule(self): + if getattr(self.manager_threading_local, 'bulk_reschedule', False): + self.manager_threading_local.needs_scheduling = True + return + from django.db import connection + + # runs right away if not in transaction + connection.on_commit(lambda: self.manager.delay()) + + @contextlib.contextmanager + def task_manager_bulk_reschedule(self): + """Context manager to avoid submitting task multiple times.""" + try: + previous_flag = getattr(self.manager_threading_local, 'bulk_reschedule', False) + previous_value = getattr(self.manager_threading_local, 'needs_scheduling', False) + self.manager_threading_local.bulk_reschedule = True + self.manager_threading_local.needs_scheduling = False + yield + finally: + self.manager_threading_local.bulk_reschedule = previous_flag + if self.manager_threading_local.needs_scheduling: + self.schedule() + self.manager_threading_local.needs_scheduling = previous_value + + +class ScheduleTaskManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import task_manager + + super().__init__(task_manager, _task_manager) + + +class ScheduleDependencyManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import dependency_manager + + super().__init__(dependency_manager, _dependency_manager) + + +class ScheduleWorkflowManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import workflow_manager + + super().__init__(workflow_manager, _workflow_manager) @contextlib.contextmanager @@ -861,39 +915,6 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(): - from awx.main.scheduler.tasks import task_manager, dependency_manager, workflow_manager - from django.db import connection - - # runs right away if not in transaction - connection.on_commit(lambda: task_manager.delay()) - connection.on_commit(lambda: dependency_manager.delay()) - connection.on_commit(lambda: workflow_manager.delay()) - - -@contextlib.contextmanager -def task_manager_bulk_reschedule(): - """Context manager to avoid submitting task multiple times.""" - try: - previous_flag = getattr(_task_manager, 'bulk_reschedule', False) - previous_value = getattr(_task_manager, 'needs_scheduling', False) - _task_manager.bulk_reschedule = True - _task_manager.needs_scheduling = False - yield - finally: - _task_manager.bulk_reschedule = previous_flag - if _task_manager.needs_scheduling: - _schedule_task_manager() - _task_manager.needs_scheduling = previous_value - - -def schedule_task_manager(): - if getattr(_task_manager, 'bulk_reschedule', False): - _task_manager.needs_scheduling = True - return - _schedule_task_manager() - - @contextlib.contextmanager def ignore_inventory_group_removal(): """