Merge pull request #12629 from fosterseth/task_manager_refactor_squashed

Task manager refactor
This commit is contained in:
Seth Foster 2022-08-10 16:02:05 -04:00 committed by GitHub
commit 85a5b58d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 996 additions and 634 deletions

17
awx/api/urls/debug.py Normal file
View File

@ -0,0 +1,17 @@
from django.urls import re_path
from awx.api.views.debug import (
DebugRootView,
TaskManagerDebugView,
DependencyManagerDebugView,
WorkflowManagerDebugView,
)
urls = [
re_path(r'^$', DebugRootView.as_view(), name='debug'),
re_path(r'^task_manager/$', TaskManagerDebugView.as_view(), name='task_manager'),
re_path(r'^dependency_manager/$', DependencyManagerDebugView.as_view(), name='dependency_manager'),
re_path(r'^workflow_manager/$', WorkflowManagerDebugView.as_view(), name='workflow_manager'),
]
__all__ = ['urls']

View File

@ -2,9 +2,9 @@
# All Rights Reserved.
from __future__ import absolute_import, unicode_literals
from django.conf import settings
from django.urls import include, re_path
from awx import MODE
from awx.api.generics import LoggedLoginView, LoggedLogoutView
from awx.api.views import (
ApiRootView,
@ -145,7 +145,12 @@ urlpatterns = [
re_path(r'^logout/$', LoggedLogoutView.as_view(next_page='/api/', redirect_field_name='next'), name='logout'),
re_path(r'^o/', include(oauth2_root_urls)),
]
if settings.SETTINGS_MODULE == 'awx.settings.development':
if MODE == 'development':
# Only include these if we are in the development environment
from awx.api.swagger import SwaggerSchemaView
urlpatterns += [re_path(r'^swagger/$', SwaggerSchemaView.as_view(), name='swagger_view')]
from awx.api.urls.debug import urls as debug_urls
urlpatterns += [re_path(r'^debug/', include(debug_urls))]

View File

@ -93,7 +93,7 @@ from awx.main.utils import (
get_object_or_400,
getattrd,
get_pk_from_dict,
schedule_task_manager,
ScheduleWorkflowManager,
ignore_inventory_computed_fields,
)
from awx.main.utils.encryption import encrypt_value
@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)

68
awx/api/views/debug.py Normal file
View File

@ -0,0 +1,68 @@
from collections import OrderedDict
from django.conf import settings
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from awx.api.generics import APIView
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
class TaskManagerDebugView(APIView):
_ignore_model_permissions = True
exclude_from_schema = True
permission_classes = [AllowAny]
prefix = 'Task'
def get(self, request):
TaskManager().schedule()
if not settings.AWX_DISABLE_TASK_MANAGERS:
msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True"
else:
msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager"
return Response(msg)
class DependencyManagerDebugView(APIView):
_ignore_model_permissions = True
exclude_from_schema = True
permission_classes = [AllowAny]
prefix = 'Dependency'
def get(self, request):
DependencyManager().schedule()
if not settings.AWX_DISABLE_TASK_MANAGERS:
msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True"
else:
msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager"
return Response(msg)
class WorkflowManagerDebugView(APIView):
_ignore_model_permissions = True
exclude_from_schema = True
permission_classes = [AllowAny]
prefix = 'Workflow'
def get(self, request):
WorkflowManager().schedule()
if not settings.AWX_DISABLE_TASK_MANAGERS:
msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True"
else:
msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager"
return Response(msg)
class DebugRootView(APIView):
_ignore_model_permissions = True
exclude_from_schema = True
permission_classes = [AllowAny]
def get(self, request, format=None):
'''List of available debug urls'''
data = OrderedDict()
data['task_manager'] = '/api/debug/task_manager/'
data['dependency_manager'] = '/api/debug/dependency_manager/'
data['workflow_manager'] = '/api/debug/workflow_manager/'
return Response(data)

View File

@ -184,19 +184,28 @@ class Metrics:
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'),
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'),
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'),
IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('task_manager_tasks_started', 'Number of tasks started'),
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'),
SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'),
SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'),
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}

View File

@ -401,13 +401,15 @@ class AutoscalePool(WorkerPool):
# the task manager to never do more work
current_task = w.current_task
if current_task and isinstance(current_task, dict):
if current_task.get('task', '').endswith('tasks.run_task_manager'):
endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager']
current_task_name = current_task.get('task', '')
if any(current_task_name.endswith(e) for e in endings):
if 'started' not in current_task:
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (60 * 5):
logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa
if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD):
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa
os.kill(w.pid, signal.SIGTERM)
for m in orphaned:

View File

@ -862,7 +862,7 @@ class Command(BaseCommand):
overwrite_vars=bool(options.get('overwrite_vars', False)),
)
inventory_update = inventory_source.create_inventory_update(
_eager_fields=dict(job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd())
_eager_fields=dict(status='running', job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd())
)
data = AnsibleInventoryLoader(source=source, verbosity=verbosity).load()

View File

@ -0,0 +1,35 @@
# Generated by Django 3.2.13 on 2022-08-10 14:03
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0164_remove_inventorysource_update_on_project_update'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='preferred_instance_groups_cache',
field=models.JSONField(
blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True
),
),
migrations.AddField(
model_name='unifiedjob',
name='task_impact',
field=models.PositiveIntegerField(default=0, editable=False, help_text='Number of forks an instance consumes when running this job.'),
),
migrations.AddField(
model_name='workflowapproval',
name='expires',
field=models.DateTimeField(
default=None,
editable=False,
help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.',
null=True,
),
),
]

View File

@ -90,6 +90,9 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
extra_vars_dict = VarsDictProperty('extra_vars', True)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
def clean_inventory(self):
inv = self.inventory
if not inv:
@ -178,12 +181,12 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def get_passwords_needed_to_start(self):
return self.passwords_needed_to_start
@property
def task_impact(self):
def _get_task_impact(self):
# NOTE: We sorta have to assume the host count matches and that forks default to 5
from awx.main.models.inventory import Host
count_hosts = Host.objects.filter(enabled=True, inventory__ad_hoc_commands__pk=self.pk).count()
if self.inventory:
count_hosts = self.inventory.total_hosts
else:
count_hosts = 5
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
def copy(self):
@ -207,10 +210,20 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', [])
def add_to_update_fields(name):
if name not in update_fields:
update_fields.append(name)
if not self.preferred_instance_groups_cache:
self.preferred_instance_groups_cache = self._get_preferred_instance_group_cache()
add_to_update_fields("preferred_instance_groups_cache")
if not self.name:
self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512)
if 'name' not in update_fields:
update_fields.append('name')
add_to_update_fields("name")
if self.task_impact == 0:
self.task_impact = self._get_task_impact()
add_to_update_fields("task_impact")
super(AdHocCommand, self).save(*args, **kwargs)
@property

View File

@ -12,6 +12,7 @@ from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from django.utils.timezone import now, timedelta
from django.db.models import Sum
import redis
from solo.models import SingletonModel
@ -149,10 +150,13 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def consumed_capacity(self):
capacity_consumed = 0
if self.node_type in ('hybrid', 'execution'):
capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')))
capacity_consumed += (
UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).aggregate(Sum("task_impact"))["task_impact__sum"]
or 0
)
if self.node_type in ('hybrid', 'control'):
capacity_consumed += sum(
settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting'))
capacity_consumed += (
settings.AWX_CONTROL_NODE_TASK_IMPACT * UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')).count()
)
return capacity_consumed

View File

@ -337,9 +337,12 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
else:
active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES)
failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True)
total_hosts = active_hosts.count()
# if total_hosts has changed, set update_task_impact to True
update_task_impact = total_hosts != self.total_hosts
computed_fields = {
'has_active_failures': bool(failed_hosts.count()),
'total_hosts': active_hosts.count(),
'total_hosts': total_hosts,
'hosts_with_active_failures': failed_hosts.count(),
'total_groups': active_groups.count(),
'has_inventory_sources': bool(active_inventory_sources.count()),
@ -357,6 +360,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
computed_fields.pop(field)
if computed_fields:
iobj.save(update_fields=computed_fields.keys())
if update_task_impact:
# if total hosts count has changed, re-calculate task_impact for any
# job that is still in pending for this inventory, since task_impact
# is cached on task creation and used in task management system
tasks = self.jobs.filter(status="pending")
for t in tasks:
t.task_impact = t._get_task_impact()
UnifiedJob.objects.bulk_update(tasks, ['task_impact'])
logger.debug("Finished updating inventory computed fields, pk={0}, in " "{1:.3f} seconds".format(self.pk, time.time() - start_time))
def websocket_emit_status(self, status):
@ -1220,8 +1231,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
return UnpartitionedInventoryUpdateEvent
return InventoryUpdateEvent
@property
def task_impact(self):
def _get_task_impact(self):
return 1
# InventoryUpdate credential required

View File

@ -644,8 +644,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
raise ParseError(_('{status_value} is not a valid status option.').format(status_value=status))
return self._get_hosts(**kwargs)
@property
def task_impact(self):
def _get_task_impact(self):
if self.launch_type == 'callback':
count_hosts = 2
else:
@ -1213,6 +1212,9 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
extra_vars_dict = VarsDictProperty('extra_vars', True)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@classmethod
def _get_parent_field_name(cls):
return 'system_job_template'
@ -1238,8 +1240,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
return UnpartitionedSystemJobEvent
return SystemJobEvent
@property
def task_impact(self):
def _get_task_impact(self):
return 5
@property

View File

@ -513,6 +513,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
help_text=_('The SCM Revision discovered by this update for the given project and branch.'),
)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
def _get_parent_field_name(self):
return 'project'
@ -560,8 +563,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
return UnpartitionedProjectUpdateEvent
return ProjectUpdateEvent
@property
def task_impact(self):
def _get_task_impact(self):
return 0 if self.job_type == 'run' else 1
@property

View File

@ -45,7 +45,8 @@ from awx.main.utils.common import (
get_type_for_model,
parse_yaml_or_json,
getattr_dne,
schedule_task_manager,
ScheduleDependencyManager,
ScheduleTaskManager,
get_event_partition_epoch,
get_capacity_type,
)
@ -381,6 +382,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
unified_job.survey_passwords = new_job_passwords
kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch
unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache()
unified_job._set_default_dependencies_processed()
unified_job.task_impact = unified_job._get_task_impact()
from awx.main.signals import disable_activity_stream, activity_stream_create
with disable_activity_stream():
@ -693,6 +699,14 @@ class UnifiedJob(
on_delete=polymorphic.SET_NULL,
help_text=_('The Instance group the job was run under'),
)
preferred_instance_groups_cache = models.JSONField(
blank=True,
null=True,
default=None,
editable=False,
help_text=_("A cached list with pk values from preferred instance groups."),
)
task_impact = models.PositiveIntegerField(default=0, editable=False, help_text=_("Number of forks an instance consumes when running this job."))
organization = models.ForeignKey(
'Organization',
blank=True,
@ -754,6 +768,9 @@ class UnifiedJob(
def _get_parent_field_name(self):
return 'unified_job_template' # Override in subclasses.
def _get_preferred_instance_group_cache(self):
return [ig.pk for ig in self.preferred_instance_groups]
@classmethod
def _get_unified_job_template_class(cls):
"""
@ -808,6 +825,9 @@ class UnifiedJob(
update_fields = self._update_parent_instance_no_save(parent_instance)
parent_instance.save(update_fields=update_fields)
def _set_default_dependencies_processed(self):
pass
def save(self, *args, **kwargs):
"""Save the job, with current status, to the database.
Ensure that all data is consistent before doing so.
@ -1026,7 +1046,6 @@ class UnifiedJob(
event_qs = self.get_event_queryset()
except NotImplementedError:
return True # Model without events, such as WFJT
self.log_lifecycle("event_processing_finished")
return self.emitted_events == event_qs.count()
def result_stdout_raw_handle(self, enforce_max_bytes=True):
@ -1241,9 +1260,8 @@ class UnifiedJob(
except JobLaunchConfig.DoesNotExist:
return False
@property
def task_impact(self):
raise NotImplementedError # Implement in subclass.
def _get_task_impact(self):
return self.task_impact # return default, should implement in subclass.
def websocket_emit_data(self):
'''Return extra data that should be included when submitting data to the browser over the websocket connection'''
@ -1358,7 +1376,10 @@ class UnifiedJob(
self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.websocket_emit_status("pending")
schedule_task_manager()
if self.dependencies_processed:
ScheduleTaskManager().schedule()
else:
ScheduleDependencyManager().schedule()
# Each type of unified job has a different Task class; get the
# appropirate one.
@ -1515,8 +1536,8 @@ class UnifiedJob(
'state': state,
'work_unit_id': self.work_unit_id,
}
if self.unified_job_template:
extra["template_name"] = self.unified_job_template.name
if self.name:
extra["task_name"] = self.name
if state == "blocked" and blocked_by:
blocked_by_msg = f"{blocked_by._meta.model_name}-{blocked_by.id}"
msg = f"{self._meta.model_name}-{self.id} blocked by {blocked_by_msg}"

View File

@ -13,6 +13,7 @@ from django.db import connection, models
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist
from django.utils.timezone import now, timedelta
# from django import settings as tower_settings
@ -40,7 +41,7 @@ from awx.main.models.mixins import (
from awx.main.models.jobs import LaunchTimeConfigBase, LaunchTimeConfig, JobTemplate
from awx.main.models.credential import Credential
from awx.main.redact import REPLACE_STR
from awx.main.utils import schedule_task_manager
from awx.main.utils import ScheduleWorkflowManager
__all__ = [
@ -622,6 +623,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
)
is_sliced_job = models.BooleanField(default=False)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@property
def workflow_nodes(self):
return self.workflow_job_nodes
@ -668,8 +672,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
)
return result
@property
def task_impact(self):
def _get_task_impact(self):
return 0
def get_ancestor_workflows(self):
@ -783,6 +786,12 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
default=0,
help_text=_("The amount of time (in seconds) before the approval node expires and fails."),
)
expires = models.DateTimeField(
default=None,
null=True,
editable=False,
help_text=_("The time this approval will expire. This is the created time plus timeout, used for filtering."),
)
timed_out = models.BooleanField(default=False, help_text=_("Shows when an approval node (with a timeout assigned to it) has timed out."))
approved_or_denied_by = models.ForeignKey(
'auth.User',
@ -793,6 +802,9 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
on_delete=models.SET_NULL,
)
def _set_default_dependencies_processed(self):
self.dependencies_processed = True
@classmethod
def _get_unified_job_template_class(cls):
return WorkflowApprovalTemplate
@ -810,13 +822,32 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
def _get_parent_field_name(self):
return 'workflow_approval_template'
def save(self, *args, **kwargs):
update_fields = list(kwargs.get('update_fields', []))
if self.timeout != 0 and ((not self.pk) or (not update_fields) or ('timeout' in update_fields)):
if not self.created: # on creation, created will be set by parent class, so we fudge it here
created = now()
else:
created = self.created
new_expires = created + timedelta(seconds=self.timeout)
if new_expires != self.expires:
self.expires = new_expires
if update_fields and 'expires' not in update_fields:
update_fields.append('expires')
elif self.timeout == 0 and ((not update_fields) or ('timeout' in update_fields)):
if self.expires:
self.expires = None
if update_fields and 'expires' not in update_fields:
update_fields.append('expires')
super(WorkflowApproval, self).save(*args, **kwargs)
def approve(self, request=None):
self.status = 'successful'
self.approved_or_denied_by = get_current_user()
self.save()
self.send_approval_notification('approved')
self.websocket_emit_status(self.status)
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request)
def deny(self, request=None):
@ -825,7 +856,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin):
self.save()
self.send_approval_notification('denied')
self.websocket_emit_status(self.status)
schedule_task_manager()
ScheduleWorkflowManager().schedule()
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
def signal_start(self, **kwargs):

View File

@ -1,6 +1,6 @@
# Copyright (c) 2017 Ansible, Inc.
#
from .task_manager import TaskManager
from .task_manager import TaskManager, DependencyManager, WorkflowManager
__all__ = ['TaskManager']
__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager']

View File

@ -7,6 +7,11 @@ from awx.main.models import (
WorkflowJob,
)
import logging
logger = logging.getLogger('awx.main.scheduler.dependency_graph')
class DependencyGraph(object):
PROJECT_UPDATES = 'project_updates'
@ -36,6 +41,9 @@ class DependencyGraph(object):
self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {}
def mark_if_no_key(self, job_type, id, job):
if id is None:
logger.warning(f'Null dependency graph key from {job}, could be integrity error or bug, ignoring')
return
# only mark first occurrence of a task. If 10 of JobA are launched
# (concurrent disabled), the dependency graph should return that jobs
# 2 through 10 are blocked by job1
@ -66,7 +74,10 @@ class DependencyGraph(object):
self.mark_if_no_key(self.JOB_TEMPLATE_JOBS, job.job_template_id, job)
def mark_workflow_job(self, job):
self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id, job)
if job.workflow_job_template_id:
self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id, job)
elif job.unified_job_template_id: # for sliced jobs
self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.unified_job_template_id, job)
def project_update_blocked_by(self, job):
return self.get_item(self.PROJECT_UPDATES, job.project_id)
@ -85,7 +96,13 @@ class DependencyGraph(object):
def workflow_job_blocked_by(self, job):
if job.allow_simultaneous is False:
return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id)
if job.workflow_job_template_id:
return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id)
elif job.unified_job_template_id:
# Sliced jobs can be either Job or WorkflowJob type, and either should block a sliced WorkflowJob
return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.unified_job_template_id) or self.get_item(
self.JOB_TEMPLATE_JOBS, job.unified_job_template_id
)
return None
def system_job_blocked_by(self, job):

View File

@ -15,27 +15,31 @@ from django.db import transaction, connection
from django.utils.translation import gettext_lazy as _, gettext_noop
from django.utils.timezone import now as tz_now
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.dispatch.reaper import reap_job
from awx.main.models import (
AdHocCommand,
Instance,
InventorySource,
InventoryUpdate,
Job,
Project,
ProjectUpdate,
SystemJob,
UnifiedJob,
WorkflowApproval,
WorkflowJob,
WorkflowJobNode,
WorkflowJobTemplate,
)
from awx.main.scheduler.dag_workflow import WorkflowDAG
from awx.main.utils.pglock import advisory_lock
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
from awx.main.utils.common import create_partition
from awx.main.utils import (
get_type_for_model,
ScheduleTaskManager,
ScheduleWorkflowManager,
)
from awx.main.utils.common import create_partition, task_manager_bulk_reschedule
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
@ -53,167 +57,97 @@ def timeit(func):
t_now = time.perf_counter()
result = func(*args, **kwargs)
dur = time.perf_counter() - t_now
args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur)
args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur)
return result
return inner
class TaskManager:
def __init__(self):
"""
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
"""
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
class TaskBase:
def __init__(self, prefix=""):
self.prefix = prefix
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.start_time = time.time()
self.start_task_limit = settings.START_TASK_LIMIT
for m in self.subsystem_metrics.METRICS:
if m.startswith("task_manager"):
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
def after_lock_init(self, all_sorted_tasks):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(all_sorted_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
blocked_by = self.dependency_graph.task_blocked_by(task)
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None
def timed_out(self):
"""Return True/False if we have met or exceeded the timeout for the task manager."""
elapsed = time.time() - self.start_time
if elapsed >= settings.TASK_MANAGER_TIMEOUT:
logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.")
return True
return False
@timeit
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
def get_tasks(self, filter_args):
wf_approval_ctype_id = ContentType.objects.get_for_model(WorkflowApproval).id
qs = (
UnifiedJob.objects.filter(**filter_args)
.exclude(launch_type='sync')
.exclude(polymorphic_ctype_id=wf_approval_ctype_id)
.order_by('created')
.prefetch_related('dependent_jobs')
)
inventory_updates = [i for i in inventory_updates_qs]
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')]
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')]
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')]
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)]
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
return all_tasks
self.all_tasks = [t for t in qs]
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
# long task manager to override useful metrics.
current_time = time.time()
time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp")
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time)
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)
def schedule(self):
# Lock
with task_manager_bulk_reschedule():
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
with transaction.atomic():
if acquired is False:
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
return
logger.debug(f"Starting {self.prefix} Scheduler")
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
self.record_aggregate_metrics()
logger.debug(f"Finishing {self.prefix} Scheduler")
class WorkflowManager(TaskBase):
def __init__(self):
super().__init__(prefix="workflow_manager")
@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format)
continue
dag = WorkflowDAG(workflow_job)
spawn_nodes = dag.bfs_nodes_to_run()
if spawn_nodes:
logger.debug('Spawning jobs for %s', workflow_job.log_format)
else:
logger.debug('No nodes to spawn for %s', workflow_job.log_format)
for spawn_node in spawn_nodes:
if spawn_node.unified_job_template is None:
continue
kv = spawn_node.get_job_kwargs()
job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job
spawn_node.save()
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
can_start = True
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
workflow_ancestors = job.get_ancestor_workflows()
if spawn_node.unified_job_template in set(workflow_ancestors):
can_start = False
logger.info(
'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format(
job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors]
)
)
display_list = [spawn_node.unified_job_template] + workflow_ancestors
job.job_explanation = gettext_noop(
"Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})"
).format(', '.join(['<{}>'.format(tmp) for tmp in display_list]))
else:
logger.debug(
'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format(
job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors]
)
)
if not job._resources_sufficient_for_launch():
can_start = False
job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory"
)
if can_start:
if workflow_job.start_args:
start_args = json.loads(decrypt_field(workflow_job, 'start_args'))
else:
start_args = {}
can_start = job.signal_start(**start_args)
if not can_start:
job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it " "was not in the right state or required manual credentials"
)
if not can_start:
job.status = 'failed'
job.save(update_fields=['status', 'job_explanation'])
job.websocket_emit_status('failed')
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
# emit_websocket_notification('/socket.io/jobs', '', dict(id=))
def process_finished_workflow_jobs(self, workflow_jobs):
def spawn_workflow_graph_jobs(self):
result = []
for workflow_job in workflow_jobs:
for workflow_job in self.all_tasks:
if self.timed_out():
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
ScheduleWorkflowManager().schedule()
# Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run
break
dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
@ -228,99 +162,106 @@ class TaskManager:
status_changed = True
else:
workflow_nodes = dag.mark_dnr_nodes()
for n in workflow_nodes:
n.save(update_fields=['do_not_run'])
WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run'])
# If workflow is now done, we do special things to mark it as done.
is_done = dag.is_workflow_done()
if not is_done:
continue
has_failed, reason = dag.has_workflow_failed()
logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
new_status = 'failed' if has_failed else 'successful'
logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status))
update_fields = ['status', 'start_args']
workflow_job.status = new_status
if reason:
logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}')
workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed")
update_fields.append('job_explanation')
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=update_fields)
status_changed = True
if is_done:
has_failed, reason = dag.has_workflow_failed()
logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
new_status = 'failed' if has_failed else 'successful'
logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status))
update_fields = ['status', 'start_args']
workflow_job.status = new_status
if reason:
logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}')
workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed")
update_fields.append('job_explanation')
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=update_fields)
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
ScheduleWorkflowManager().schedule()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
if workflow_job.status == 'running':
spawn_nodes = dag.bfs_nodes_to_run()
if spawn_nodes:
logger.debug('Spawning jobs for %s', workflow_job.log_format)
else:
logger.debug('No nodes to spawn for %s', workflow_job.log_format)
for spawn_node in spawn_nodes:
if spawn_node.unified_job_template is None:
continue
kv = spawn_node.get_job_kwargs()
job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job
spawn_node.save()
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
can_start = True
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
workflow_ancestors = job.get_ancestor_workflows()
if spawn_node.unified_job_template in set(workflow_ancestors):
can_start = False
logger.info(
'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format(
job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors]
)
)
display_list = [spawn_node.unified_job_template] + workflow_ancestors
job.job_explanation = gettext_noop(
"Workflow Job spawned from workflow could not start because it "
"would result in recursion (spawn order, most recent first: {})"
).format(', '.join('<{}>'.format(tmp) for tmp in display_list))
else:
logger.debug(
'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format(
job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors]
)
)
if not job._resources_sufficient_for_launch():
can_start = False
job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it was missing a related resource such as project or inventory"
)
if can_start:
if workflow_job.start_args:
start_args = json.loads(decrypt_field(workflow_job, 'start_args'))
else:
start_args = {}
can_start = job.signal_start(**start_args)
if not can_start:
job.job_explanation = gettext_noop(
"Job spawned from workflow could not start because it was not in the right state or required manual credentials"
)
if not can_start:
job.status = 'failed'
job.save(update_fields=['status', 'job_explanation'])
job.websocket_emit_status('failed')
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
# emit_websocket_notification('/socket.io/jobs', '', dict(id=))
return result
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc("task_manager_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
schedule_task_manager()
from awx.main.tasks.system import handle_work_error, handle_work_success
dependent_tasks = dependent_tasks or []
task_actual = {
'type': get_type_for_model(type(task)),
'id': task.id,
}
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
task.status = 'waiting'
(start_status, opts) = task.pre_start()
if not start_status:
task.status = 'failed'
if task.job_explanation:
task.job_explanation += ' '
task.job_explanation += 'Task failed pre-start check.'
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
schedule_task_manager()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
task.log_lifecycle("waiting")
def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
# Before task is dispatched, ensure that job_event partitions exist
create_partition(task.event_class._meta.db_table, start=task.created)
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
)
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)
def get_tasks(self, filter_args):
self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args)]
@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
def _schedule(self):
self.get_tasks(dict(status__in=["running"], dependencies_processed=True))
if len(self.all_tasks) > 0:
self.spawn_workflow_graph_jobs()
class DependencyManager(TaskBase):
def __init__(self):
super().__init__(prefix="dependency_manager")
def create_project_update(self, task, project_id=None):
if project_id is None:
@ -341,14 +282,20 @@ class TaskManager:
inventory_task.status = 'pending'
inventory_task.save()
logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format))
# inventory_sources = self.get_inventory_source_tasks([task])
# self.process_inventory_sources(inventory_sources)
return inventory_task
def add_dependencies(self, task, dependencies):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
def get_inventory_source_tasks(self):
inventory_ids = set()
for task in self.all_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists():
@ -481,16 +428,166 @@ class TaskManager:
return created_dependencies
def process_tasks(self):
deps = self.generate_dependencies(self.all_tasks)
self.generate_dependencies(deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps))
@timeit
def _schedule(self):
self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
if len(self.all_tasks) > 0:
self.get_inventory_source_tasks()
self.process_tasks()
ScheduleTaskManager().schedule()
class TaskManager(TaskBase):
def __init__(self):
"""
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
"""
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.time_delta_job_explanation = timedelta(seconds=30)
super().__init__(prefix="task_manager")
def after_lock_init(self):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(self.all_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
# in the old task manager this was handled as a method on each task object outside of the graph and
# probably has the side effect of cutting down *a lot* of the logic from this task manager class
blocked_by = self.dependency_graph.task_blocked_by(task)
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.dependency_graph.add_job(task)
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
ScheduleTaskManager().schedule()
from awx.main.tasks.system import handle_work_error, handle_work_success
# update capacity for control node and execution node
if task.controller_node:
self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
if task.execution_node:
self.instances[task.execution_node].consume_capacity(task.task_impact)
dependent_tasks = dependent_tasks or []
task_actual = {
'type': get_type_for_model(type(task)),
'id': task.id,
}
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
task.status = 'waiting'
(start_status, opts) = task.pre_start()
if not start_status:
task.status = 'failed'
if task.job_explanation:
task.job_explanation += ' '
task.job_explanation += 'Task failed pre-start check.'
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
# Call this to ensure Workflow nodes get spawned in timely manner
ScheduleWorkflowManager().schedule()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
task.log_lifecycle("waiting")
def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
# Before task is dispatched, ensure that job_event partitions exist
create_partition(task.event_class._meta.db_table, start=task.created)
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}],
errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}],
)
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)
@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
if type(task) is WorkflowJob:
ScheduleWorkflowManager().schedule()
self.dependency_graph.add_job(task)
@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
for task in pending_tasks:
if self.start_task_limit <= 0:
break
if self.timed_out():
logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early")
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.subsystem_metrics.inc("task_manager_tasks_blocked", 1)
self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1)
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
@ -499,19 +596,16 @@ class TaskManager:
tasks_to_update_job_explanation.append(task)
continue
found_acceptable_queue = False
preferred_instance_groups = task.preferred_instance_groups
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
logger.debug("{} is blocked from running, workflow already running".format(task.log_format))
continue
else:
running_workflow_templates.add(task.unified_job_template_id)
# Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph.
# Double check that using just the DependencyGraph works for Workflows and Sliced Jobs.
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
found_acceptable_queue = False
preferred_instance_groups = self.instance_groups.get_instance_groups_from_task_cache(task)
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
@ -530,8 +624,6 @@ class TaskManager:
# All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups
if task.capacity_type == 'control':
task.execution_node = control_instance.hostname
control_instance.consume_capacity(control_impact)
self.dependency_graph.add_job(task)
execution_instance = self.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
@ -541,7 +633,6 @@ class TaskManager:
for instance_group in preferred_instance_groups:
if instance_group.is_container_group:
self.dependency_graph.add_job(task)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break
@ -563,9 +654,7 @@ class TaskManager:
control_instance = execution_instance
task.controller_node = execution_instance.hostname
control_instance.consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
task.log_lifecycle("controller_node_chosen")
execution_instance.consume_capacity(task.task_impact)
task.log_lifecycle("execution_node_chosen")
logger.debug(
"Starting {} in group {} instance {} (remaining_capacity={})".format(
@ -573,7 +662,6 @@ class TaskManager:
)
)
execution_instance = self.instances[execution_instance.hostname].obj
self.dependency_graph.add_job(task)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
break
@ -599,25 +687,6 @@ class TaskManager:
tasks_to_update_job_explanation.append(task)
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
def timeout_approval_node(self):
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
now = tz_now()
for task in workflow_approvals:
approval_timeout_seconds = timedelta(seconds=task.timeout)
if task.timeout == 0:
continue
if (now - task.created) >= approval_timeout_seconds:
timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(
name=task.name, pk=task.pk, timeout=task.timeout
)
logger.warning(timeout_message)
task.timed_out = True
task.status = 'failed'
task.send_approval_notification('timed_out')
task.websocket_emit_status(task.status)
task.job_explanation = timeout_message
task.save(update_fields=['status', 'job_explanation', 'timed_out'])
def reap_jobs_from_orphaned_instances(self):
# discover jobs that are in running state but aren't on an execution node
# that we know about; this is a fairly rare event, but it can occur if you,
@ -630,92 +699,45 @@ class TaskManager:
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
reap_job(j, 'failed')
def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]
def process_tasks(self):
running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']]
self.process_running_tasks(running_tasks)
self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks))
self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks))
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies))
pending_tasks = [t for t in self.all_tasks if t.status == 'pending']
self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks))
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks))
def timeout_approval_node(self, task):
if self.timed_out():
logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early")
# Do not process any more workflow approval nodes. Stop here.
# Maybe we should schedule another TaskManager run
return
timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(name=task.name, pk=task.pk, timeout=task.timeout)
logger.warning(timeout_message)
task.timed_out = True
task.status = 'failed'
task.send_approval_notification('timed_out')
task.websocket_emit_status(task.status)
task.job_explanation = timeout_message
task.save(update_fields=['status', 'job_explanation', 'timed_out'])
def get_expired_workflow_approvals(self):
# timeout of 0 indicates that it never expires
qs = WorkflowApproval.objects.filter(status='pending').exclude(timeout=0).filter(expires__lt=tz_now())
return qs
@timeit
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))
self.after_lock_init(all_sorted_tasks)
self.after_lock_init()
self.reap_jobs_from_orphaned_instances()
if len(all_sorted_tasks) > 0:
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
# self.process_latest_project_updates(latest_project_updates)
if len(self.all_tasks) > 0:
self.process_tasks()
# latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks)
# self.process_latest_inventory_updates(latest_inventory_updates)
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
running_workflow_tasks = self.get_running_workflow_jobs()
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
previously_running_workflow_tasks = running_workflow_tasks
running_workflow_tasks = []
for workflow_job in previously_running_workflow_tasks:
if workflow_job.status == 'running':
running_workflow_tasks.append(workflow_job)
else:
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.timeout_approval_node()
self.reap_jobs_from_orphaned_instances()
self.process_tasks(all_sorted_tasks)
return finished_wfjs
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
# long task manager to override useful metrics.
current_time = time.time()
time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp")
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago")
self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time)
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago")
def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)
def schedule(self):
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
with transaction.atomic():
if acquired is False:
logger.debug("Not running scheduler, another task holds lock")
return
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
self.record_aggregate_metrics()
logger.debug("Finishing Scheduler")
for workflow_approval in self.get_expired_workflow_approvals():
self.timeout_approval_node(workflow_approval)

View File

@ -67,6 +67,7 @@ class TaskManagerInstanceGroups:
def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None):
self.instance_groups = dict()
self.controlplane_ig = None
self.pk_ig_map = dict()
if instance_groups is not None: # for testing
self.instance_groups = instance_groups
@ -81,6 +82,7 @@ class TaskManagerInstanceGroups:
instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname
],
)
self.pk_ig_map[instance_group.pk] = instance_group
def get_remaining_capacity(self, group_name):
instances = self.instance_groups[group_name]['instances']
@ -121,3 +123,17 @@ class TaskManagerInstanceGroups:
elif i.capacity > largest_instance.capacity:
largest_instance = i
return largest_instance
def get_instance_groups_from_task_cache(self, task):
igs = []
if task.preferred_instance_groups_cache:
for pk in task.preferred_instance_groups_cache:
ig = self.pk_ig_map.get(pk, None)
if ig:
igs.append(ig)
else:
logger.warn(f"Unknown instance group with pk {pk} for task {task}")
if len(igs) == 0:
logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}")
return task.global_instance_groups
return igs

View File

@ -1,15 +1,35 @@
# Python
import logging
# Django
from django.conf import settings
# AWX
from awx.main.scheduler import TaskManager
from awx import MODE
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename
logger = logging.getLogger('awx.main.scheduler')
def run_manager(manager, prefix):
if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS:
logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/")
return
manager().schedule()
@task(queue=get_local_queuename)
def run_task_manager():
logger.debug("Running task manager.")
TaskManager().schedule()
def task_manager():
run_manager(TaskManager, "task")
@task(queue=get_local_queuename)
def dependency_manager():
run_manager(DependencyManager, "dependency")
@task(queue=get_local_queuename)
def workflow_manager():
run_manager(WorkflowManager, "workflow")

View File

@ -53,7 +53,8 @@ from awx.main.dispatch import get_local_queuename, reaper
from awx.main.utils.common import (
ignore_inventory_computed_fields,
ignore_inventory_group_removal,
schedule_task_manager,
ScheduleWorkflowManager,
ScheduleTaskManager,
)
from awx.main.utils.external_logging import reconfigure_rsyslog
@ -657,6 +658,13 @@ def awx_periodic_scheduler():
state.save()
def schedule_manager_success_or_error(instance):
if instance.unifiedjob_blocked_jobs.exists():
ScheduleTaskManager().schedule()
if instance.spawned_by_workflow:
ScheduleWorkflowManager().schedule()
@task(queue=get_local_queuename)
def handle_work_success(task_actual):
try:
@ -666,8 +674,7 @@ def handle_work_success(task_actual):
return
if not instance:
return
schedule_task_manager()
schedule_manager_success_or_error(instance)
@task(queue=get_local_queuename)
@ -709,8 +716,7 @@ def handle_work_error(task_id, *args, **kwargs):
# what the job complete message handler does then we may want to send a
# completion event for each job here.
if first_instance:
schedule_task_manager()
pass
schedule_manager_success_or_error(first_instance)
@task(queue=get_local_queuename)

View File

@ -13,7 +13,10 @@ from awx.main.models.workflow import (
WorkflowJobTemplateNode,
)
from awx.main.models.credential import Credential
from awx.main.scheduler import TaskManager
from awx.main.scheduler import TaskManager, WorkflowManager, DependencyManager
# Django
from django.utils.timezone import now, timedelta
@pytest.fixture
@ -137,8 +140,9 @@ class TestApprovalNodes:
post(url, {'name': 'Approve Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201)
post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201)
wf_job = WorkflowJob.objects.first()
DependencyManager().schedule() # TODO: exclude workflows from this and delete line
TaskManager().schedule()
TaskManager().schedule()
WorkflowManager().schedule()
wfj_node = wf_job.workflow_nodes.first()
approval = wfj_node.job
assert approval.name == 'Approve Test'
@ -162,8 +166,9 @@ class TestApprovalNodes:
post(url, {'name': 'Deny Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201)
post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201)
wf_job = WorkflowJob.objects.first()
DependencyManager().schedule() # TODO: exclude workflows from this and delete line
TaskManager().schedule()
TaskManager().schedule()
WorkflowManager().schedule()
wfj_node = wf_job.workflow_nodes.first()
approval = wfj_node.job
assert approval.name == 'Deny Test'
@ -216,6 +221,37 @@ class TestApprovalNodes:
approval.refresh_from_db()
assert approval.status == 'failed'
def test_expires_time_on_creation(self):
now_time = now()
wa = WorkflowApproval.objects.create(timeout=34)
# this is fudged, so we assert that the expires time is in reasonable range
assert timedelta(seconds=33) < (wa.expires - now_time) < timedelta(seconds=35)
@pytest.mark.parametrize('with_update_fields', [True, False])
def test_expires_time_update(self, with_update_fields):
wa = WorkflowApproval.objects.create()
assert wa.timeout == 0
assert wa.expires is None
wa.timeout = 1234
if with_update_fields:
wa.save(update_fields=['timeout'])
else:
wa.save()
assert wa.created + timedelta(seconds=1234) == wa.expires
@pytest.mark.parametrize('with_update_fields', [True, False])
def test_reset_timeout_and_expires(self, with_update_fields):
wa = WorkflowApproval.objects.create()
wa.timeout = 1234
wa.save()
assert wa.expires
wa.timeout = 0
if with_update_fields:
wa.save(update_fields=['timeout'])
else:
wa.save()
assert wa.expires is None
@pytest.mark.django_db
class TestExclusiveRelationshipEnforcement:

View File

@ -252,12 +252,14 @@ class TestTaskImpact:
def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(5, 2)
job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.inventory.total_hosts == 5
assert job.task_impact == 2 + 1 # forks becomes constraint
def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(3, 5)
job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.task_impact == 3 + 1 # hosts becomes constraint
def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away):
@ -270,9 +272,13 @@ class TestTaskImpact:
# Even distribution - all jobs run on 1 host
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1]
jobs[0].inventory.update_computed_fields()
for j in jobs:
j.task_impact = j._get_task_impact()
assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact
# Uneven distribution - first job takes the extra host
jobs[0].inventory.hosts.create(name='remainder_foo')
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1]
jobs[0].inventory.update_computed_fields()
# recalculate task_impact
jobs[0].task_impact = jobs[0]._get_task_impact()
assert [job.task_impact for job in jobs] == [3, 2, 2]

View File

@ -0,0 +1,6 @@
def create_job(jt, dependencies_processed=True):
job = jt.create_unified_job()
job.status = "pending"
job.dependencies_processed = dependencies_processed
job.save()
return job

View File

@ -1,9 +1,10 @@
import pytest
from unittest import mock
from datetime import timedelta
from awx.main.scheduler import TaskManager
from awx.main.models import InstanceGroup, WorkflowJob
from awx.main.scheduler import TaskManager, DependencyManager
from awx.main.models import InstanceGroup
from awx.main.tasks.system import apply_cluster_membership_policies
from . import create_job
@pytest.mark.django_db
@ -12,16 +13,12 @@ def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_gr
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_still_start"])
j1 = create_job(objects1.job_template)
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2')
objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
j2 = create_job(objects2.job_template)
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
@ -35,23 +32,26 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects1 = job_template_factory(
'jt1',
organization='org1',
project='proj1',
inventory='inv1',
credential='cred1',
)
objects1.job_template.instance_groups.add(ig1)
j1 = create_job(objects1.job_template, dependencies_processed=False)
p = objects1.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0
p.scm_type = "git"
p.scm_url = "http://github.com/ansible/ansible.git"
p.save()
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2', jobs=["job_should_still_start"])
objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2')
objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
j2 = create_job(objects2.job_template, dependencies_processed=False)
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule()
pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0])
@ -59,6 +59,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
pu.status = "successful"
pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule()
TaskManager.start_task.assert_any_call(j1, ig1, [], i1)
@ -69,7 +70,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
@pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj = wfjt.create_unified_job()
wfj.status = "pending"
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
@ -85,39 +86,50 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, control
i1.capacity = 1020
i1.save()
i2 = instance_factory("i2")
i2.capacity = 1020
i2.save()
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory(
'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"]
)
j1 = create_job(objects1.job_template)
objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2')
objects2.job_template.instance_groups.add(ig1)
j1_1 = objects2.jobs['job_should_also_start']
j1_1.status = 'pending'
j1_1.save()
objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3', jobs=["job_should_still_start"])
j1_1 = create_job(objects2.job_template)
objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3')
objects3.job_template.instance_groups.add(ig2)
j2 = objects3.jobs['job_should_still_start']
j2.status = 'pending'
j2.save()
objects4 = job_template_factory(
'jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4', jobs=["job_should_not_start"]
)
j2 = create_job(objects3.job_template)
objects4 = job_template_factory('jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4')
objects4.job_template.instance_groups.add(ig2)
j2_1 = objects4.jobs['job_should_not_start']
j2_1.status = 'pending'
j2_1.save()
tm = TaskManager()
j2_1 = create_job(objects4.job_template)
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j1_1, ig1, [], i1), mock.call(j2, ig2, [], i2)])
assert mock_job.call_count == 3
TaskManager().schedule()
# all jobs should be able to run, plenty of capacity across both instances
for j in [j1, j1_1, j2, j2_1]:
j.refresh_from_db()
assert j.status == "waiting"
# reset to pending
for j in [j1, j1_1, j2, j2_1]:
j.status = "pending"
j.save()
# make i2 can only be able to fit 1 job
i2.capacity = 510
i2.save()
TaskManager().schedule()
for j in [j1, j1_1, j2]:
j.refresh_from_db()
assert j.status == "waiting"
j2_1.refresh_from_db()
# could not run because i2 is full
assert j2_1.status == "pending"
@pytest.mark.django_db
@ -126,19 +138,13 @@ def test_failover_group_run(instance_factory, controlplane_instance_group, mocke
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start']
j1.status = 'pending'
j1.save()
objects2 = job_template_factory(
'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"]
)
j1 = create_job(objects1.job_template)
objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2')
objects2.job_template.instance_groups.add(ig1)
objects2.job_template.instance_groups.add(ig2)
j1_1 = objects2.jobs['job_should_also_start']
j1_1.status = 'pending'
j1_1.save()
j1_1 = create_job(objects2.job_template)
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500

View File

@ -3,21 +3,19 @@ from unittest import mock
import json
from datetime import timedelta
from awx.main.scheduler import TaskManager
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
from awx.main.utils import encrypt_field
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job
from awx.main.models.ha import Instance
from . import create_job
from django.conf import settings
@pytest.mark.django_db
def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
j = create_job(objects.job_template)
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@ -32,10 +30,8 @@ class TestJobLifeCycle:
expect_commit - list of expected on_commit calls
If any of these are None, then the assertion is not made.
"""
if expect_schedule and len(expect_schedule) > 1:
raise RuntimeError('Task manager should reschedule itself one time, at most.')
with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel:
with mock.patch('awx.main.utils.common._schedule_task_manager') as tm_sch:
with mock.patch('awx.main.utils.common.ScheduleManager._schedule') as tm_sch:
# Job are ultimately submitted in on_commit hook, but this will not
# actually run, because it waits until outer transaction, which is the test
# itself in this case
@ -56,22 +52,21 @@ class TestJobLifeCycle:
wj = wfjt.create_unified_job()
assert wj.workflow_nodes.count() == 2
wj.signal_start()
tm = TaskManager()
# Transitions workflow job to running
# needs to re-schedule so it spawns jobs next round
self.run_tm(tm, [mock.call('running')], [mock.call()])
self.run_tm(TaskManager(), [mock.call('running')])
# Spawns jobs
# needs re-schedule to submit jobs next round
self.run_tm(tm, [mock.call('pending'), mock.call('pending')], [mock.call()])
self.run_tm(WorkflowManager(), [mock.call('pending'), mock.call('pending')])
assert jt.jobs.count() == 2 # task manager spawned jobs
# Submits jobs
# intermission - jobs will run and reschedule TM when finished
self.run_tm(tm, [mock.call('waiting'), mock.call('waiting')], [])
self.run_tm(DependencyManager()) # flip dependencies_processed to True
self.run_tm(TaskManager(), [mock.call('waiting'), mock.call('waiting')])
# I am the job runner
for job in jt.jobs.all():
job.status = 'successful'
@ -79,7 +74,7 @@ class TestJobLifeCycle:
# Finishes workflow
# no further action is necessary, so rescheduling should not happen
self.run_tm(tm, [mock.call('successful')], [])
self.run_tm(WorkflowManager(), [mock.call('successful')])
def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group):
wfjts = [WorkflowJobTemplate.objects.create(name='foo')]
@ -90,16 +85,13 @@ class TestJobLifeCycle:
wj = wfjts[0].create_unified_job()
wj.signal_start()
tm = TaskManager()
while wfjts[0].status != 'successful':
wfjts[1].refresh_from_db()
if wfjts[1].status == 'successful':
# final run, no more work to do
self.run_tm(tm, expect_schedule=[])
else:
self.run_tm(tm, expect_schedule=[mock.call()])
attempts = 10
while wfjts[0].status != 'successful' and attempts > 0:
self.run_tm(TaskManager())
self.run_tm(WorkflowManager())
wfjts[0].refresh_from_db()
attempts -= 1
def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance):
assert Instance.objects.count() == 2
@ -113,6 +105,7 @@ class TestJobLifeCycle:
for uj in all_ujs:
uj.signal_start()
DependencyManager().schedule()
tm = TaskManager()
self.run_tm(tm)
@ -135,6 +128,7 @@ class TestJobLifeCycle:
for uj in all_ujs:
uj.signal_start()
DependencyManager().schedule()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager()
self.run_tm(tm)
@ -157,6 +151,7 @@ class TestJobLifeCycle:
for uj in all_ujs:
uj.signal_start()
DependencyManager().schedule()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager()
self.run_tm(tm)
@ -197,63 +192,49 @@ class TestJobLifeCycle:
@pytest.mark.django_db
def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory(
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]
)
j1 = objects.jobs["job_should_start"]
j1.status = 'pending'
def test_single_jt_multi_job_launch_blocks_last(job_template_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
j1 = create_job(objects.job_template)
j2 = create_job(objects.job_template)
TaskManager().schedule()
j1.refresh_from_db()
j2.refresh_from_db()
assert j1.status == "waiting"
assert j2.status == "pending"
# mimic running j1 to unblock j2
j1.status = "successful"
j1.save()
j2 = objects.jobs["job_should_not_start"]
j2.status = 'pending'
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance)
TaskManager().schedule()
j2.refresh_from_db()
assert j2.status == "waiting"
@pytest.mark.django_db
def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory(
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]
)
def test_single_jt_multi_job_launch_allow_simul_allowed(job_template_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
jt = objects.job_template
jt.allow_simultaneous = True
jt.save()
j1 = objects.jobs["job_should_start"]
j1.allow_simultaneous = True
j1.status = 'pending'
j1.save()
j2 = objects.jobs["job_should_not_start"]
j2.allow_simultaneous = True
j2.status = 'pending'
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls(
[mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
)
j1 = create_job(objects.job_template)
j2 = create_job(objects.job_template)
TaskManager().schedule()
j1.refresh_from_db()
j2.refresh_from_db()
assert j1.status == "waiting"
assert j2.status == "waiting"
@pytest.mark.django_db
def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker):
instance = hybrid_instance
controlplane_instance_group = instance.rampart_groups.first()
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"])
j1 = objects1.jobs["job_should_start"]
j1.status = 'pending'
j1.save()
j2 = objects2.jobs["job_should_not_start"]
j2.status = 'pending'
j2.save()
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2')
j1 = create_job(objects1.job_template)
j2 = create_job(objects2.job_template)
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 505
@ -269,11 +250,9 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke
@pytest.mark.django_db
def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
j = create_job(objects.job_template, dependencies_processed=False)
p = objects.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0
@ -281,12 +260,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
p.scm_url = "http://github.com/ansible/ansible.git"
p.save(skip_update=True)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
tm.schedule()
dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu:
dm.schedule()
mock_pu.assert_called_once_with(j)
pu = [x for x in p.project_updates.all()]
assert len(pu) == 1
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance)
pu[0].status = "successful"
pu[0].save()
@ -297,11 +277,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
@pytest.mark.django_db
def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
j = create_job(objects.job_template, dependencies_processed=False)
i = objects.inventory
ii = inventory_source_factory("ec2")
ii.source = "ec2"
@ -310,12 +288,13 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
ii.save()
i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
tm.schedule()
dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu:
dm.schedule()
mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance)
iu[0].status = "successful"
iu[0].save()
@ -334,19 +313,17 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s
iu.status = "pending"
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
tm.schedule()
dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu:
dm.schedule()
mock_pu.assert_called_with(iu, project_id=project.id)
@pytest.mark.django_db
def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
j = create_job(objects.job_template, dependencies_processed=False)
i = objects.inventory
ii = inventory_source_factory("ec2")
ii.source = "ec2"
@ -359,9 +336,9 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
j.start_args = encrypt_field(j, field_name="start_args")
j.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
tm.schedule()
dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu:
dm.schedule()
mock_iu.assert_not_called()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
@ -371,13 +348,11 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
@pytest.mark.django_db
def test_shared_dependencies_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["first_job", "second_job"])
j1 = objects.jobs["first_job"]
j1.status = 'pending'
j1.save()
j2 = objects.jobs["second_job"]
j2.status = 'pending'
j2.save()
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
objects.job_template.allow_simultaneous = True
objects.job_template.save()
j1 = create_job(objects.job_template, dependencies_processed=False)
j2 = create_job(objects.job_template, dependencies_processed=False)
p = objects.project
p.scm_update_on_launch = True
p.scm_update_cache_timeout = 300
@ -392,8 +367,8 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
ii.update_cache_timeout = 300
ii.save()
i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule()
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
@ -408,12 +383,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance)
TaskManager.start_task.assert_has_calls(
[mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
)
pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1
@ -422,30 +394,27 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
@pytest.mark.django_db
def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"])
job = objects.jobs["job"]
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
job = objects.job_template.create_unified_job()
job.instance_group = controlplane_instance_group
job.dependencies_process = True
job.status = "running"
job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
task_manager = TaskManager()
task_manager._schedule()
proj = objects.project
project_update = proj.create_project_update()
project_update.instance_group = controlplane_instance_group
project_update.status = "pending"
project_update.save()
assert not task_manager.job_blocked_by(project_update)
dependency_graph = DependencyGraph()
dependency_graph.add_job(job)
assert not dependency_graph.task_blocked_by(project_update)
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"])
job = objects.jobs["job"]
job.instance_group = controlplane_instance_group
@ -453,9 +422,6 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp
job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
task_manager = TaskManager()
task_manager._schedule()
inv = objects.inventory
inv_source = inventory_source_factory("ec2")
inv_source.source = "ec2"
@ -465,11 +431,9 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp
inventory_update.status = "pending"
inventory_update.save()
assert not task_manager.job_blocked_by(inventory_update)
dependency_graph = DependencyGraph()
dependency_graph.add_job(job)
assert not dependency_graph.task_blocked_by(inventory_update)
DependencyManager().schedule()
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, [], instance)
@pytest.mark.django_db
@ -484,7 +448,7 @@ def test_generate_dependencies_only_once(job_template_factory):
# job starts with dependencies_processed as False
assert not job.dependencies_processed
# run one cycle of ._schedule() to generate dependencies
TaskManager()._schedule()
DependencyManager().schedule()
# make sure dependencies_processed is now True
job = Job.objects.filter(name="job_gen_dep")[0]
@ -492,7 +456,7 @@ def test_generate_dependencies_only_once(job_template_factory):
# Run ._schedule() again, but make sure .generate_dependencies() is not
# called with job in the argument list
tm = TaskManager()
tm.generate_dependencies = mock.MagicMock(return_value=[])
tm._schedule()
tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])])
dm = DependencyManager()
dm.generate_dependencies = mock.MagicMock(return_value=[])
dm.schedule()
dm.generate_dependencies.assert_not_called()

View File

@ -78,8 +78,9 @@ __all__ = [
'IllegalArgumentError',
'get_custom_venv_choices',
'get_external_account',
'task_manager_bulk_reschedule',
'schedule_task_manager',
'ScheduleTaskManager',
'ScheduleDependencyManager',
'ScheduleWorkflowManager',
'classproperty',
'create_temporary_fifo',
'truncate_stdout',
@ -846,6 +847,66 @@ def get_mem_effective_capacity(mem_bytes):
_inventory_updates = threading.local()
_task_manager = threading.local()
_dependency_manager = threading.local()
_workflow_manager = threading.local()
@contextlib.contextmanager
def task_manager_bulk_reschedule():
managers = [ScheduleTaskManager(), ScheduleWorkflowManager(), ScheduleDependencyManager()]
"""Context manager to avoid submitting task multiple times."""
try:
for m in managers:
m.previous_flag = getattr(m.manager_threading_local, 'bulk_reschedule', False)
m.previous_value = getattr(m.manager_threading_local, 'needs_scheduling', False)
m.manager_threading_local.bulk_reschedule = True
m.manager_threading_local.needs_scheduling = False
yield
finally:
for m in managers:
m.manager_threading_local.bulk_reschedule = m.previous_flag
if m.manager_threading_local.needs_scheduling:
m.schedule()
m.manager_threading_local.needs_scheduling = m.previous_value
class ScheduleManager:
def __init__(self, manager, manager_threading_local):
self.manager = manager
self.manager_threading_local = manager_threading_local
def _schedule(self):
from django.db import connection
# runs right away if not in transaction
connection.on_commit(lambda: self.manager.delay())
def schedule(self):
if getattr(self.manager_threading_local, 'bulk_reschedule', False):
self.manager_threading_local.needs_scheduling = True
return
self._schedule()
class ScheduleTaskManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import task_manager
super().__init__(task_manager, _task_manager)
class ScheduleDependencyManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import dependency_manager
super().__init__(dependency_manager, _dependency_manager)
class ScheduleWorkflowManager(ScheduleManager):
def __init__(self):
from awx.main.scheduler.tasks import workflow_manager
super().__init__(workflow_manager, _workflow_manager)
@contextlib.contextmanager
@ -861,37 +922,6 @@ def ignore_inventory_computed_fields():
_inventory_updates.is_updating = previous_value
def _schedule_task_manager():
from awx.main.scheduler.tasks import run_task_manager
from django.db import connection
# runs right away if not in transaction
connection.on_commit(lambda: run_task_manager.delay())
@contextlib.contextmanager
def task_manager_bulk_reschedule():
"""Context manager to avoid submitting task multiple times."""
try:
previous_flag = getattr(_task_manager, 'bulk_reschedule', False)
previous_value = getattr(_task_manager, 'needs_scheduling', False)
_task_manager.bulk_reschedule = True
_task_manager.needs_scheduling = False
yield
finally:
_task_manager.bulk_reschedule = previous_flag
if _task_manager.needs_scheduling:
_schedule_task_manager()
_task_manager.needs_scheduling = previous_value
def schedule_task_manager():
if getattr(_task_manager, 'bulk_reschedule', False):
_task_manager.needs_scheduling = True
return
_schedule_task_manager()
@contextlib.contextmanager
def ignore_inventory_group_removal():
"""

View File

@ -248,6 +248,11 @@ SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL = 15
# The maximum allowed jobs to start on a given task manager cycle
START_TASK_LIMIT = 100
# Time out task managers if they take longer than this many seconds, plus TASK_MANAGER_TIMEOUT_GRACE_PERIOD
# We have the grace period so the task manager can bail out before the timeout.
TASK_MANAGER_TIMEOUT = 300
TASK_MANAGER_TIMEOUT_GRACE_PERIOD = 60
# Disallow sending session cookies over insecure connections
SESSION_COOKIE_SECURE = True
@ -442,7 +447,8 @@ CELERYBEAT_SCHEDULE = {
'options': {'expires': 50},
},
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},

View File

@ -78,18 +78,6 @@ include(optional('/etc/tower/conf.d/*.py'), scope=locals())
BASE_VENV_PATH = "/var/lib/awx/venv/"
AWX_VENV_PATH = os.path.join(BASE_VENV_PATH, "awx")
# If any local_*.py files are present in awx/settings/, use them to override
# default settings for development. If not present, we can still run using
# only the defaults.
try:
if os.getenv('AWX_KUBE_DEVEL', False):
include(optional('minikube.py'), scope=locals())
else:
include(optional('local_*.py'), scope=locals())
except ImportError:
traceback.print_exc()
sys.exit(1)
# Use SQLite for unit tests instead of PostgreSQL. If the lines below are
# commented out, Django will create the test_awx-dev database in PostgreSQL to
# run unit tests.
@ -110,5 +98,26 @@ CLUSTER_HOST_ID = socket.gethostname()
AWX_CALLBACK_PROFILE = True
# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!=================================
# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager).
# Allows user to trigger task managers directly for debugging and profiling purposes.
# Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development'
AWX_DISABLE_TASK_MANAGERS = False
# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!=================================
if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa
DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa
# If any local_*.py files are present in awx/settings/, use them to override
# default settings for development. If not present, we can still run using
# only the defaults.
# this needs to stay at the bottom of this file
try:
if os.getenv('AWX_KUBE_DEVEL', False):
include(optional('minikube.py'), scope=locals())
else:
include(optional('local_*.py'), scope=locals())
except ImportError:
traceback.print_exc()
sys.exit(1)