mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 18:09:57 -03:30
Split TaskManager into
- DependencyManager spawns dependencies if necessary - WorkflowManager processes running workflows to see if a new job is ready to spawn - TaskManager starts tasks if unblocked and has execution capacity
This commit is contained in:
parent
19c24cba10
commit
431b9370df
@ -184,12 +184,10 @@ class Metrics:
|
||||
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
|
||||
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
|
||||
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
|
||||
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'),
|
||||
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
|
||||
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
|
||||
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
|
||||
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
|
||||
SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
|
||||
SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
|
||||
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||
IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'),
|
||||
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||
@ -197,6 +195,17 @@ class Metrics:
|
||||
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
|
||||
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
|
||||
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
|
||||
SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
|
||||
SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
|
||||
SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||
IntM('dependency_manager_schedule_calls', 'Number of calls to task manager schedule'),
|
||||
SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||
SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
|
||||
SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
|
||||
IntM('workflow_manager_schedule_calls', 'Number of calls to task manager schedule'),
|
||||
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
|
||||
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
|
||||
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
|
||||
]
|
||||
# turn metric list into dictionary with the metric name as a key
|
||||
self.METRICS = {}
|
||||
|
||||
@ -1026,7 +1026,6 @@ class UnifiedJob(
|
||||
event_qs = self.get_event_queryset()
|
||||
except NotImplementedError:
|
||||
return True # Model without events, such as WFJT
|
||||
self.log_lifecycle("event_processing_finished")
|
||||
return self.emitted_events == event_qs.count()
|
||||
|
||||
def result_stdout_raw_handle(self, enforce_max_bytes=True):
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
# Copyright (c) 2017 Ansible, Inc.
|
||||
#
|
||||
|
||||
from .task_manager import TaskManager
|
||||
from .task_manager import TaskManager, DependencyManager, WorkflowManager
|
||||
|
||||
__all__ = ['TaskManager']
|
||||
__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager']
|
||||
|
||||
@ -53,103 +53,86 @@ def timeit(func):
|
||||
t_now = time.perf_counter()
|
||||
result = func(*args, **kwargs)
|
||||
dur = time.perf_counter() - t_now
|
||||
args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur)
|
||||
args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur)
|
||||
return result
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
class TaskManager:
|
||||
class TaskBase:
|
||||
def __init__(self):
|
||||
"""
|
||||
Do NOT put database queries or other potentially expensive operations
|
||||
in the task manager init. The task manager object is created every time a
|
||||
job is created, transitions state, and every 30 seconds on each tower node.
|
||||
More often then not, the object is destroyed quickly because the NOOP case is hit.
|
||||
|
||||
The NOOP case is short-circuit logic. If the task manager realizes that another instance
|
||||
of the task manager is already running, then it short-circuits and decides not to run.
|
||||
"""
|
||||
# start task limit indicates how many pending jobs can be started on this
|
||||
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
|
||||
# the task manager after 5 minutes. At scale, the task manager can easily take more than
|
||||
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
|
||||
# will no longer be started and will be started on the next task manager cycle.
|
||||
self.start_task_limit = settings.START_TASK_LIMIT
|
||||
self.time_delta_job_explanation = timedelta(seconds=30)
|
||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
||||
# initialize each metric to 0 and force metric_has_changed to true. This
|
||||
# ensures each task manager metric will be overridden when pipe_execute
|
||||
# is called later.
|
||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
||||
for m in self.subsystem_metrics.METRICS:
|
||||
if m.startswith("task_manager"):
|
||||
if m.startswith(self.prefix):
|
||||
self.subsystem_metrics.set(m, 0)
|
||||
|
||||
def after_lock_init(self, all_sorted_tasks):
|
||||
"""
|
||||
Init AFTER we know this instance of the task manager will run because the lock is acquired.
|
||||
"""
|
||||
self.dependency_graph = DependencyGraph()
|
||||
self.instances = TaskManagerInstances(all_sorted_tasks)
|
||||
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
|
||||
self.controlplane_ig = self.instance_groups.controlplane_ig
|
||||
|
||||
def job_blocked_by(self, task):
|
||||
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
|
||||
# in the old task manager this was handled as a method on each task object outside of the graph and
|
||||
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
|
||||
blocked_by = self.dependency_graph.task_blocked_by(task)
|
||||
if blocked_by:
|
||||
return blocked_by
|
||||
|
||||
for dep in task.dependent_jobs.all():
|
||||
if dep.status in ACTIVE_STATES:
|
||||
return dep
|
||||
# if we detect a failed or error dependency, go ahead and fail this
|
||||
# task. The errback on the dependency takes some time to trigger,
|
||||
# and we don't want the task to enter running state if its
|
||||
# dependency has failed or errored.
|
||||
elif dep.status in ("error", "failed"):
|
||||
task.status = 'failed'
|
||||
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
||||
get_type_for_model(type(dep)),
|
||||
dep.name,
|
||||
dep.id,
|
||||
)
|
||||
task.save(update_fields=['status', 'job_explanation'])
|
||||
task.websocket_emit_status('failed')
|
||||
return dep
|
||||
|
||||
return None
|
||||
|
||||
@timeit
|
||||
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
|
||||
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
|
||||
inventory_updates_qs = (
|
||||
InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
|
||||
)
|
||||
inventory_updates = [i for i in inventory_updates_qs]
|
||||
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
|
||||
project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')]
|
||||
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')]
|
||||
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')]
|
||||
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)]
|
||||
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
|
||||
return all_tasks
|
||||
|
||||
def get_running_workflow_jobs(self):
|
||||
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
|
||||
return graph_workflow_jobs
|
||||
|
||||
def get_inventory_source_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = set()
|
||||
for task in all_sorted_tasks:
|
||||
if isinstance(task, Job):
|
||||
inventory_ids.add(task.inventory_id)
|
||||
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
|
||||
@timeit
|
||||
def get_tasks(self, filter_args):
|
||||
jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')]
|
||||
inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
|
||||
inventory_updates = [i for i in inventory_updates_qs]
|
||||
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
|
||||
project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')]
|
||||
system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')]
|
||||
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')]
|
||||
workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)]
|
||||
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
|
||||
logger.debug(f"{self.prefix} {all_tasks}")
|
||||
return all_tasks
|
||||
|
||||
def record_aggregate_metrics(self, *args):
|
||||
if not settings.IS_TESTING():
|
||||
# increment task_manager_schedule_calls regardless if the other
|
||||
# metrics are recorded
|
||||
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}_schedule_calls", 1)
|
||||
# Only record metrics if the last time recording was more
|
||||
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
||||
# Prevents a short-duration task manager that runs directly after a
|
||||
# long task manager to override useful metrics.
|
||||
current_time = time.time()
|
||||
time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
|
||||
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
|
||||
logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
||||
self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
else:
|
||||
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
||||
|
||||
def record_aggregate_metrics_and_exit(self, *args):
|
||||
self.record_aggregate_metrics()
|
||||
sys.exit(1)
|
||||
|
||||
def schedule(self):
|
||||
# Lock
|
||||
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
|
||||
with transaction.atomic():
|
||||
if acquired is False:
|
||||
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
|
||||
return
|
||||
logger.debug(f"Starting {self.prefix} Scheduler")
|
||||
with task_manager_bulk_reschedule():
|
||||
# if sigterm due to timeout, still record metrics
|
||||
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
|
||||
self._schedule()
|
||||
self.record_aggregate_metrics()
|
||||
logger.debug(f"Finishing {self.prefix} Scheduler")
|
||||
|
||||
|
||||
class WorkflowManager(TaskBase):
|
||||
def __init__(self):
|
||||
self.prefix = "workflow_manager"
|
||||
super().__init__()
|
||||
|
||||
@timeit
|
||||
def spawn_workflow_graph_jobs(self, workflow_jobs):
|
||||
logger.debug(f"=== {workflow_jobs}")
|
||||
for workflow_job in workflow_jobs:
|
||||
if workflow_job.cancel_flag:
|
||||
logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format)
|
||||
@ -255,72 +238,52 @@ class TaskManager:
|
||||
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
|
||||
return result
|
||||
|
||||
@timeit
|
||||
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
|
||||
self.subsystem_metrics.inc("task_manager_tasks_started", 1)
|
||||
self.start_task_limit -= 1
|
||||
if self.start_task_limit == 0:
|
||||
# schedule another run immediately after this task manager
|
||||
schedule_task_manager()
|
||||
from awx.main.tasks.system import handle_work_error, handle_work_success
|
||||
|
||||
dependent_tasks = dependent_tasks or []
|
||||
|
||||
task_actual = {
|
||||
'type': get_type_for_model(type(task)),
|
||||
'id': task.id,
|
||||
}
|
||||
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
||||
|
||||
task.status = 'waiting'
|
||||
|
||||
(start_status, opts) = task.pre_start()
|
||||
if not start_status:
|
||||
task.status = 'failed'
|
||||
if task.job_explanation:
|
||||
task.job_explanation += ' '
|
||||
task.job_explanation += 'Task failed pre-start check.'
|
||||
task.save()
|
||||
# TODO: run error handler to fail sub-tasks and send notifications
|
||||
else:
|
||||
if type(task) is WorkflowJob:
|
||||
task.status = 'running'
|
||||
task.send_notification_templates('running')
|
||||
logger.debug('Transitioning %s to running status.', task.log_format)
|
||||
schedule_task_manager()
|
||||
# at this point we already have control/execution nodes selected for the following cases
|
||||
else:
|
||||
task.instance_group = instance_group
|
||||
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
|
||||
logger.debug(
|
||||
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
|
||||
def timeout_approval_node(self):
|
||||
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
|
||||
now = tz_now()
|
||||
for task in workflow_approvals:
|
||||
approval_timeout_seconds = timedelta(seconds=task.timeout)
|
||||
if task.timeout == 0:
|
||||
continue
|
||||
if (now - task.created) >= approval_timeout_seconds:
|
||||
timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(
|
||||
name=task.name, pk=task.pk, timeout=task.timeout
|
||||
)
|
||||
with disable_activity_stream():
|
||||
task.celery_task_id = str(uuid.uuid4())
|
||||
task.save()
|
||||
task.log_lifecycle("waiting")
|
||||
|
||||
def post_commit():
|
||||
if task.status != 'failed' and type(task) is not WorkflowJob:
|
||||
# Before task is dispatched, ensure that job_event partitions exist
|
||||
create_partition(task.event_class._meta.db_table, start=task.created)
|
||||
task_cls = task._get_task_class()
|
||||
task_cls.apply_async(
|
||||
[task.pk],
|
||||
opts,
|
||||
queue=task.get_queue_name(),
|
||||
uuid=task.celery_task_id,
|
||||
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
|
||||
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
|
||||
)
|
||||
|
||||
task.websocket_emit_status(task.status) # adds to on_commit
|
||||
connection.on_commit(post_commit)
|
||||
logger.warning(timeout_message)
|
||||
task.timed_out = True
|
||||
task.status = 'failed'
|
||||
task.send_approval_notification('timed_out')
|
||||
task.websocket_emit_status(task.status)
|
||||
task.job_explanation = timeout_message
|
||||
task.save(update_fields=['status', 'job_explanation', 'timed_out'])
|
||||
|
||||
@timeit
|
||||
def process_running_tasks(self, running_tasks):
|
||||
for task in running_tasks:
|
||||
self.dependency_graph.add_job(task)
|
||||
def get_tasks(self):
|
||||
return self.get_running_workflow_jobs()
|
||||
|
||||
@timeit
|
||||
def _schedule(self):
|
||||
running_workflow_tasks = self.get_tasks()
|
||||
if len(running_workflow_tasks) > 0:
|
||||
self.process_finished_workflow_jobs(running_workflow_tasks)
|
||||
|
||||
previously_running_workflow_tasks = running_workflow_tasks
|
||||
running_workflow_tasks = []
|
||||
for workflow_job in previously_running_workflow_tasks:
|
||||
if workflow_job.status == 'running':
|
||||
running_workflow_tasks.append(workflow_job)
|
||||
else:
|
||||
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
|
||||
|
||||
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
||||
|
||||
self.timeout_approval_node()
|
||||
|
||||
|
||||
class DependencyManager(TaskBase):
|
||||
def __init__(self):
|
||||
self.prefix = "dependency_manager"
|
||||
super().__init__()
|
||||
|
||||
def create_project_update(self, task, project_id=None):
|
||||
if project_id is None:
|
||||
@ -341,14 +304,20 @@ class TaskManager:
|
||||
inventory_task.status = 'pending'
|
||||
inventory_task.save()
|
||||
logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format))
|
||||
# inventory_sources = self.get_inventory_source_tasks([task])
|
||||
# self.process_inventory_sources(inventory_sources)
|
||||
|
||||
return inventory_task
|
||||
|
||||
def add_dependencies(self, task, dependencies):
|
||||
with disable_activity_stream():
|
||||
task.dependent_jobs.add(*dependencies)
|
||||
|
||||
def get_inventory_source_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = set()
|
||||
for task in all_sorted_tasks:
|
||||
if isinstance(task, Job):
|
||||
inventory_ids.add(task.inventory_id)
|
||||
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
|
||||
|
||||
def get_latest_inventory_update(self, inventory_source):
|
||||
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
|
||||
if not latest_inventory_update.exists():
|
||||
@ -481,6 +450,145 @@ class TaskManager:
|
||||
|
||||
return created_dependencies
|
||||
|
||||
def process_tasks(self, all_sorted_tasks):
|
||||
self.generate_dependencies(all_sorted_tasks)
|
||||
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks))
|
||||
|
||||
@timeit
|
||||
def _schedule(self):
|
||||
all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
|
||||
|
||||
if len(all_sorted_tasks) > 0:
|
||||
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
|
||||
self.process_tasks(all_sorted_tasks)
|
||||
schedule_task_manager()
|
||||
|
||||
|
||||
class TaskManager(TaskBase):
|
||||
def __init__(self):
|
||||
"""
|
||||
Do NOT put database queries or other potentially expensive operations
|
||||
in the task manager init. The task manager object is created every time a
|
||||
job is created, transitions state, and every 30 seconds on each tower node.
|
||||
More often then not, the object is destroyed quickly because the NOOP case is hit.
|
||||
|
||||
The NOOP case is short-circuit logic. If the task manager realizes that another instance
|
||||
of the task manager is already running, then it short-circuits and decides not to run.
|
||||
"""
|
||||
# start task limit indicates how many pending jobs can be started on this
|
||||
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
|
||||
# the task manager after 5 minutes. At scale, the task manager can easily take more than
|
||||
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
|
||||
# will no longer be started and will be started on the next task manager cycle.
|
||||
self.start_task_limit = settings.START_TASK_LIMIT
|
||||
self.time_delta_job_explanation = timedelta(seconds=30)
|
||||
self.prefix = "task_manager"
|
||||
super().__init__()
|
||||
|
||||
def after_lock_init(self, all_sorted_tasks):
|
||||
"""
|
||||
Init AFTER we know this instance of the task manager will run because the lock is acquired.
|
||||
"""
|
||||
self.dependency_graph = DependencyGraph()
|
||||
self.instances = TaskManagerInstances(all_sorted_tasks)
|
||||
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
|
||||
self.controlplane_ig = self.instance_groups.controlplane_ig
|
||||
|
||||
def job_blocked_by(self, task):
|
||||
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
|
||||
# in the old task manager this was handled as a method on each task object outside of the graph and
|
||||
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
|
||||
blocked_by = self.dependency_graph.task_blocked_by(task)
|
||||
if blocked_by:
|
||||
return blocked_by
|
||||
|
||||
for dep in task.dependent_jobs.all():
|
||||
if dep.status in ACTIVE_STATES:
|
||||
return dep
|
||||
# if we detect a failed or error dependency, go ahead and fail this
|
||||
# task. The errback on the dependency takes some time to trigger,
|
||||
# and we don't want the task to enter running state if its
|
||||
# dependency has failed or errored.
|
||||
elif dep.status in ("error", "failed"):
|
||||
task.status = 'failed'
|
||||
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
||||
get_type_for_model(type(dep)),
|
||||
dep.name,
|
||||
dep.id,
|
||||
)
|
||||
task.save(update_fields=['status', 'job_explanation'])
|
||||
task.websocket_emit_status('failed')
|
||||
return dep
|
||||
|
||||
return None
|
||||
|
||||
@timeit
|
||||
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
|
||||
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
|
||||
self.start_task_limit -= 1
|
||||
if self.start_task_limit == 0:
|
||||
# schedule another run immediately after this task manager
|
||||
schedule_task_manager()
|
||||
from awx.main.tasks.system import handle_work_error, handle_work_success
|
||||
|
||||
dependent_tasks = dependent_tasks or []
|
||||
|
||||
task_actual = {
|
||||
'type': get_type_for_model(type(task)),
|
||||
'id': task.id,
|
||||
}
|
||||
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
||||
|
||||
task.status = 'waiting'
|
||||
|
||||
(start_status, opts) = task.pre_start()
|
||||
if not start_status:
|
||||
task.status = 'failed'
|
||||
if task.job_explanation:
|
||||
task.job_explanation += ' '
|
||||
task.job_explanation += 'Task failed pre-start check.'
|
||||
task.save()
|
||||
# TODO: run error handler to fail sub-tasks and send notifications
|
||||
else:
|
||||
if type(task) is WorkflowJob:
|
||||
task.status = 'running'
|
||||
task.send_notification_templates('running')
|
||||
logger.debug('Transitioning %s to running status.', task.log_format)
|
||||
schedule_task_manager()
|
||||
# at this point we already have control/execution nodes selected for the following cases
|
||||
else:
|
||||
task.instance_group = instance_group
|
||||
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
|
||||
logger.debug(
|
||||
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
|
||||
)
|
||||
with disable_activity_stream():
|
||||
task.celery_task_id = str(uuid.uuid4())
|
||||
task.save()
|
||||
task.log_lifecycle("waiting")
|
||||
|
||||
def post_commit():
|
||||
if task.status != 'failed' and type(task) is not WorkflowJob:
|
||||
# Before task is dispatched, ensure that job_event partitions exist
|
||||
create_partition(task.event_class._meta.db_table, start=task.created)
|
||||
task_cls = task._get_task_class()
|
||||
task_cls.apply_async(
|
||||
[task.pk],
|
||||
opts,
|
||||
queue=task.get_queue_name(),
|
||||
uuid=task.celery_task_id,
|
||||
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
|
||||
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
|
||||
)
|
||||
|
||||
task.websocket_emit_status(task.status) # adds to on_commit
|
||||
connection.on_commit(post_commit)
|
||||
|
||||
@timeit
|
||||
def process_running_tasks(self, running_tasks):
|
||||
for task in running_tasks:
|
||||
self.dependency_graph.add_job(task)
|
||||
|
||||
@timeit
|
||||
def process_pending_tasks(self, pending_tasks):
|
||||
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
|
||||
@ -490,7 +598,7 @@ class TaskManager:
|
||||
break
|
||||
blocked_by = self.job_blocked_by(task)
|
||||
if blocked_by:
|
||||
self.subsystem_metrics.inc("task_manager_tasks_blocked", 1)
|
||||
self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1)
|
||||
task.log_lifecycle("blocked", blocked_by=blocked_by)
|
||||
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
|
||||
if task.job_explanation != job_explanation:
|
||||
@ -599,25 +707,6 @@ class TaskManager:
|
||||
tasks_to_update_job_explanation.append(task)
|
||||
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
|
||||
|
||||
def timeout_approval_node(self):
|
||||
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
|
||||
now = tz_now()
|
||||
for task in workflow_approvals:
|
||||
approval_timeout_seconds = timedelta(seconds=task.timeout)
|
||||
if task.timeout == 0:
|
||||
continue
|
||||
if (now - task.created) >= approval_timeout_seconds:
|
||||
timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(
|
||||
name=task.name, pk=task.pk, timeout=task.timeout
|
||||
)
|
||||
logger.warning(timeout_message)
|
||||
task.timed_out = True
|
||||
task.status = 'failed'
|
||||
task.send_approval_notification('timed_out')
|
||||
task.websocket_emit_status(task.status)
|
||||
task.job_explanation = timeout_message
|
||||
task.save(update_fields=['status', 'job_explanation', 'timed_out'])
|
||||
|
||||
def reap_jobs_from_orphaned_instances(self):
|
||||
# discover jobs that are in running state but aren't on an execution node
|
||||
# that we know about; this is a fairly rare event, but it can occur if you,
|
||||
@ -633,89 +722,19 @@ class TaskManager:
|
||||
def process_tasks(self, all_sorted_tasks):
|
||||
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]
|
||||
self.process_running_tasks(running_tasks)
|
||||
self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks))
|
||||
self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks))
|
||||
|
||||
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
|
||||
|
||||
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
|
||||
dependencies = self.generate_dependencies(undeped_tasks)
|
||||
deps_of_deps = self.generate_dependencies(dependencies)
|
||||
dependencies += deps_of_deps
|
||||
self.process_pending_tasks(dependencies)
|
||||
self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies))
|
||||
|
||||
self.process_pending_tasks(pending_tasks)
|
||||
self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks))
|
||||
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks))
|
||||
|
||||
@timeit
|
||||
def _schedule(self):
|
||||
finished_wfjs = []
|
||||
all_sorted_tasks = self.get_tasks()
|
||||
all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))
|
||||
|
||||
self.after_lock_init(all_sorted_tasks)
|
||||
self.reap_jobs_from_orphaned_instances()
|
||||
|
||||
if len(all_sorted_tasks) > 0:
|
||||
# TODO: Deal with
|
||||
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||
# self.process_latest_project_updates(latest_project_updates)
|
||||
|
||||
# latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
|
||||
# self.process_latest_inventory_updates(latest_inventory_updates)
|
||||
|
||||
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
|
||||
|
||||
running_workflow_tasks = self.get_running_workflow_jobs()
|
||||
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
|
||||
|
||||
previously_running_workflow_tasks = running_workflow_tasks
|
||||
running_workflow_tasks = []
|
||||
for workflow_job in previously_running_workflow_tasks:
|
||||
if workflow_job.status == 'running':
|
||||
running_workflow_tasks.append(workflow_job)
|
||||
else:
|
||||
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
|
||||
|
||||
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
||||
|
||||
self.timeout_approval_node()
|
||||
self.reap_jobs_from_orphaned_instances()
|
||||
|
||||
self.process_tasks(all_sorted_tasks)
|
||||
return finished_wfjs
|
||||
|
||||
def record_aggregate_metrics(self, *args):
|
||||
if not settings.IS_TESTING():
|
||||
# increment task_manager_schedule_calls regardless if the other
|
||||
# metrics are recorded
|
||||
s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1)
|
||||
# Only record metrics if the last time recording was more
|
||||
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
|
||||
# Prevents a short-duration task manager that runs directly after a
|
||||
# long task manager to override useful metrics.
|
||||
current_time = time.time()
|
||||
time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp")
|
||||
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
|
||||
logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago")
|
||||
self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time)
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
else:
|
||||
logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago")
|
||||
|
||||
def record_aggregate_metrics_and_exit(self, *args):
|
||||
self.record_aggregate_metrics()
|
||||
sys.exit(1)
|
||||
|
||||
def schedule(self):
|
||||
# Lock
|
||||
with advisory_lock('task_manager_lock', wait=False) as acquired:
|
||||
with transaction.atomic():
|
||||
if acquired is False:
|
||||
logger.debug("Not running scheduler, another task holds lock")
|
||||
return
|
||||
logger.debug("Starting Scheduler")
|
||||
with task_manager_bulk_reschedule():
|
||||
# if sigterm due to timeout, still record metrics
|
||||
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
|
||||
self._schedule()
|
||||
self.record_aggregate_metrics()
|
||||
logger.debug("Finishing Scheduler")
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
import logging
|
||||
|
||||
# AWX
|
||||
from awx.main.scheduler import TaskManager
|
||||
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
|
||||
@ -10,6 +10,21 @@ logger = logging.getLogger('awx.main.scheduler')
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def run_task_manager():
|
||||
logger.debug("Running task manager.")
|
||||
def task_manager():
|
||||
TaskManager().schedule()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def dependency_manager():
|
||||
DependencyManager().schedule()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def workflow_manager():
|
||||
WorkflowManager().schedule()
|
||||
|
||||
|
||||
def run_task_manager():
|
||||
task_manager()
|
||||
dependency_manager()
|
||||
workflow_manager()
|
||||
|
||||
@ -862,11 +862,13 @@ def ignore_inventory_computed_fields():
|
||||
|
||||
|
||||
def _schedule_task_manager():
|
||||
from awx.main.scheduler.tasks import run_task_manager
|
||||
from awx.main.scheduler.tasks import task_manager, dependency_manager, workflow_manager
|
||||
from django.db import connection
|
||||
|
||||
# runs right away if not in transaction
|
||||
connection.on_commit(lambda: run_task_manager.delay())
|
||||
connection.on_commit(lambda: task_manager.delay())
|
||||
connection.on_commit(lambda: dependency_manager.delay())
|
||||
connection.on_commit(lambda: workflow_manager.delay())
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
||||
@ -442,7 +442,9 @@ CELERYBEAT_SCHEDULE = {
|
||||
'options': {'expires': 50},
|
||||
},
|
||||
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
|
||||
'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||
'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||
'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
|
||||
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
|
||||
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user