split schedule_task_manager into 3

each call to schedule_task_manager becomes one of

ScheduleTaskManager
ScheduleDependencyManager
ScheduleWorkflowManager
This commit is contained in:
Seth Foster 2022-07-07 23:23:01 -04:00
parent b3eb9e0193
commit 0a47d05d26
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028
7 changed files with 88 additions and 73 deletions

View File

@ -93,7 +93,7 @@ from awx.main.utils import (
get_object_or_400,
getattrd,
get_pk_from_dict,
schedule_task_manager,
ScheduleWorkflowManager,
ignore_inventory_computed_fields,
)
from awx.main.utils.encryption import encrypt_value
@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)

View File

@ -45,7 +45,7 @@ from awx.main.utils.common import (
get_type_for_model,
parse_yaml_or_json,
getattr_dne,
schedule_task_manager,
ScheduleDependencyManager,
get_event_partition_epoch,
get_capacity_type,
)
@ -1357,7 +1357,7 @@ class UnifiedJob(
self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.websocket_emit_status("pending")
schedule_task_manager()
ScheduleDependencyManager().schedule()
# Each type of unified job has a different Task class; get the
# appropirate one.

View File

@ -40,7 +40,7 @@ from awx.main.models.mixins import (
from awx.main.models.jobs import LaunchTimeConfigBase, LaunchTimeConfig, JobTemplate
from awx.main.models.credential import Credential
from awx.main.redact import REPLACE_STR
from awx.main.utils import schedule_task_manager
from awx.main.utils import ScheduleWorkflowManager
__all__ = [
@ -816,7 +816,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
self.save()
self.send_approval_notification('approved')
self.websocket_emit_status(self.status)
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request)
def deny(self, request=None):
@ -825,7 +825,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
self.save()
self.send_approval_notification('denied')
self.websocket_emit_status(self.status)
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
def signal_start(self, **kwargs):

View File

@ -33,7 +33,12 @@ from awx.main.models import (
)
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.utils.pglock import advisory_lock
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
from awx.main.utils import (
get_type_for_model,
ScheduleTaskManager,
ScheduleDependencyManager,
ScheduleWorkflowManager,
)
from awx.main.utils.common import create_partition
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
@ -118,7 +123,7 @@ class TaskBase:
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
return
logger.debug(f"Starting {self.prefix} Scheduler")
with task_manager_bulk_reschedule():
with self.schedule_manager.task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
@ -128,6 +133,7 @@ class TaskBase:
class WorkflowManager(TaskBase):
def __init__(self):
self.schedule_manager = ScheduleWorkflowManager()
super().__init__(prefix="workflow_manager")
@timeit
@ -136,6 +142,7 @@ class WorkflowManager(TaskBase):
for workflow_job in workflow_jobs:
if self.timed_out():
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
self.schedule_manager.schedule()
# Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run
break
@ -174,7 +181,7 @@ class WorkflowManager(TaskBase):
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
ScheduleWorkflowManager().schedule()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
@ -298,6 +305,7 @@ class WorkflowManager(TaskBase):
class DependencyManager(TaskBase):
def __init__(self):
self.schedule_manager = ScheduleDependencyManager()
super().__init__(prefix="dependency_manager")
def create_project_update(self, task, project_id=None):
@ -476,7 +484,7 @@ class DependencyManager(TaskBase):
if len(all_sorted_tasks) > 0:
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_tasks(all_sorted_tasks)
schedule_task_manager()
ScheduleTaskManager().schedule()
class TaskManager(TaskBase):
@ -496,6 +504,7 @@ class TaskManager(TaskBase):
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.time_delta_job_explanation = timedelta(seconds=30)
self.schedule_manager = ScheduleTaskManager()
super().__init__(prefix="task_manager")
def after_lock_init(self, all_sorted_tasks):
@ -541,7 +550,7 @@ class TaskManager(TaskBase):
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
schedule_task_manager()
ScheduleTaskManager().schedule()
from awx.main.tasks.system import handle_work_error, handle_work_success
dependent_tasks = dependent_tasks or []

View File

@ -13,38 +13,23 @@ from awx.main.dispatch import get_local_queuename
logger = logging.getLogger('awx.main.scheduler')
@task(queue=get_local_queuename)
def task_manager():
prefix = 'task'
def run_manager(manager, prefix):
if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS:
logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/")
return
manager().schedule()
TaskManager().schedule()
@task(queue=get_local_queuename)
def task_manager():
run_manager(TaskManager, "task")
@task(queue=get_local_queuename)
def dependency_manager():
prefix = 'dependency'
if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS:
logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/")
return
DependencyManager().schedule()
run_manager(DependencyManager, "dependency")
@task(queue=get_local_queuename)
def workflow_manager():
prefix = 'workflow'
if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS:
logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/")
return
WorkflowManager().schedule()
def run_task_manager():
if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS:
logger.debug("Not running task managers, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/")
return
task_manager()
dependency_manager()
workflow_manager()
run_manager(WorkflowManager, "workflow")

View File

@ -53,7 +53,7 @@ from awx.main.dispatch import get_local_queuename, reaper
from awx.main.utils.common import (
ignore_inventory_computed_fields,
ignore_inventory_group_removal,
schedule_task_manager,
ScheduleWorkflowManager,
)
from awx.main.utils.external_logging import reconfigure_rsyslog
@ -667,7 +667,7 @@ def handle_work_success(task_actual):
if not instance:
return
schedule_task_manager()
ScheduleWorkflowManager().schedule()
@task(queue=get_local_queuename)
@ -709,7 +709,7 @@ def handle_work_error(task_id, *args, **kwargs):
# what the job complete message handler does then we may want to send a
# completion event for each job here.
if first_instance:
schedule_task_manager()
ScheduleWorkflowManager().schedule()
pass

View File

@ -78,8 +78,9 @@ __all__ = [
'IllegalArgumentError',
'get_custom_venv_choices',
'get_external_account',
'task_manager_bulk_reschedule',
'schedule_task_manager',
'ScheduleTaskManager',
'ScheduleDependencyManager',
'ScheduleWorkflowManager',
'classproperty',
'create_temporary_fifo',
'truncate_stdout',
@ -846,6 +847,59 @@ def get_mem_effective_capacity(mem_bytes):
_inventory_updates = threading.local()
_task_manager = threading.local()
_dependency_manager = threading.local()
_workflow_manager = threading.local()
class ScheduleManager:
def __init__(self, manager, manager_threading_local):
self.manager = manager
self.manager_threading_local = manager_threading_local
def schedule(self):
if getattr(self.manager_threading_local, 'bulk_reschedule', False):
self.manager_threading_local.needs_scheduling = True
return
from django.db import connection
# runs right away if not in transaction
connection.on_commit(lambda: self.manager.delay())
@contextlib.contextmanager
def task_manager_bulk_reschedule(self):
"""Context manager to avoid submitting task multiple times."""
try:
previous_flag = getattr(self.manager_threading_local, 'bulk_reschedule', False)
previous_value = getattr(self.manager_threading_local, 'needs_scheduling', False)
self.manager_threading_local.bulk_reschedule = True
self.manager_threading_local.needs_scheduling = False
yield
finally:
self.manager_threading_local.bulk_reschedule = previous_flag
if self.manager_threading_local.needs_scheduling:
self.schedule()
self.manager_threading_local.needs_scheduling = previous_value
class ScheduleTaskManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import task_manager
super().__init__(task_manager, _task_manager)
class ScheduleDependencyManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import dependency_manager
super().__init__(dependency_manager, _dependency_manager)
class ScheduleWorkflowManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import workflow_manager
super().__init__(workflow_manager, _workflow_manager)
@contextlib.contextmanager
@ -861,39 +915,6 @@ def ignore_inventory_computed_fields():
_inventory_updates.is_updating = previous_value
def _schedule_task_manager():
from awx.main.scheduler.tasks import task_manager, dependency_manager, workflow_manager
from django.db import connection
# runs right away if not in transaction
connection.on_commit(lambda: task_manager.delay())
connection.on_commit(lambda: dependency_manager.delay())
connection.on_commit(lambda: workflow_manager.delay())
@contextlib.contextmanager
def task_manager_bulk_reschedule():
"""Context manager to avoid submitting task multiple times."""
try:
previous_flag = getattr(_task_manager, 'bulk_reschedule', False)
previous_value = getattr(_task_manager, 'needs_scheduling', False)
_task_manager.bulk_reschedule = True
_task_manager.needs_scheduling = False
yield
finally:
_task_manager.bulk_reschedule = previous_flag
if _task_manager.needs_scheduling:
_schedule_task_manager()
_task_manager.needs_scheduling = previous_value
def schedule_task_manager():
if getattr(_task_manager, 'bulk_reschedule', False):
_task_manager.needs_scheduling = True
return
_schedule_task_manager()
@contextlib.contextmanager
def ignore_inventory_group_removal():
"""