diff --git a/awx/api/urls/debug.py b/awx/api/urls/debug.py new file mode 100644 index 0000000000..30eb6d08b3 --- /dev/null +++ b/awx/api/urls/debug.py @@ -0,0 +1,17 @@ +from django.urls import re_path + +from awx.api.views.debug import ( + DebugRootView, + TaskManagerDebugView, + DependencyManagerDebugView, + WorkflowManagerDebugView, +) + +urls = [ + re_path(r'^$', DebugRootView.as_view(), name='debug'), + re_path(r'^task_manager/$', TaskManagerDebugView.as_view(), name='task_manager'), + re_path(r'^dependency_manager/$', DependencyManagerDebugView.as_view(), name='dependency_manager'), + re_path(r'^workflow_manager/$', WorkflowManagerDebugView.as_view(), name='workflow_manager'), +] + +__all__ = ['urls'] diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index c092696d24..2122369919 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -2,9 +2,9 @@ # All Rights Reserved. from __future__ import absolute_import, unicode_literals -from django.conf import settings from django.urls import include, re_path +from awx import MODE from awx.api.generics import LoggedLoginView, LoggedLogoutView from awx.api.views import ( ApiRootView, @@ -145,7 +145,12 @@ urlpatterns = [ re_path(r'^logout/$', LoggedLogoutView.as_view(next_page='/api/', redirect_field_name='next'), name='logout'), re_path(r'^o/', include(oauth2_root_urls)), ] -if settings.SETTINGS_MODULE == 'awx.settings.development': +if MODE == 'development': + # Only include these if we are in the development environment from awx.api.swagger import SwaggerSchemaView urlpatterns += [re_path(r'^swagger/$', SwaggerSchemaView.as_view(), name='swagger_view')] + + from awx.api.urls.debug import urls as debug_urls + + urlpatterns += [re_path(r'^debug/', include(debug_urls))] diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 0d46c05834..f67ab6622d 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -93,7 +93,7 @@ from awx.main.utils import ( get_object_or_400, getattrd, get_pk_from_dict, - schedule_task_manager, + ScheduleWorkflowManager, ignore_inventory_computed_fields, ) from awx.main.utils.encryption import encrypt_value @@ -3391,7 +3391,7 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() - schedule_task_manager() + ScheduleWorkflowManager().schedule() return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/api/views/debug.py b/awx/api/views/debug.py new file mode 100644 index 0000000000..8ccdd6afe8 --- /dev/null +++ b/awx/api/views/debug.py @@ -0,0 +1,68 @@ +from collections import OrderedDict + +from django.conf import settings + +from rest_framework.permissions import AllowAny +from rest_framework.response import Response +from awx.api.generics import APIView + +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager + + +class TaskManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Task' + + def get(self, request): + TaskManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DependencyManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Dependency' + + def get(self, request): + DependencyManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class WorkflowManagerDebugView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + prefix = 'Workflow' + + def get(self, request): + WorkflowManager().schedule() + if not settings.AWX_DISABLE_TASK_MANAGERS: + msg = f"Running {self.prefix} manager. To disable other triggers to the {self.prefix} manager, set AWX_DISABLE_TASK_MANAGERS to True" + else: + msg = f"AWX_DISABLE_TASK_MANAGERS is True, this view is the only way to trigger the {self.prefix} manager" + return Response(msg) + + +class DebugRootView(APIView): + _ignore_model_permissions = True + exclude_from_schema = True + permission_classes = [AllowAny] + + def get(self, request, format=None): + '''List of available debug urls''' + data = OrderedDict() + data['task_manager'] = '/api/debug/task_manager/' + data['dependency_manager'] = '/api/debug/dependency_manager/' + data['workflow_manager'] = '/api/debug/workflow_manager/' + return Response(data) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 1b5e3d1cc4..d52e986c63 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -184,19 +184,28 @@ class Metrics: FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), - SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading tasks from db'), SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'), SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), - SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), - SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), - IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'), + IntM('task_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), SetIntM('task_manager_tasks_started', 'Number of tasks started'), SetIntM('task_manager_running_processed', 'Number of running tasks processed'), SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'), SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), + SetFloatM('dependency_manager_get_tasks_seconds', 'Time spent loading pending tasks from db'), + SetFloatM('dependency_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + SetFloatM('dependency_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('dependency_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), + SetFloatM('dependency_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetIntM('dependency_manager_pending_processed', 'Number of pending tasks processed'), + SetFloatM('workflow_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('workflow_manager__schedule_calls', 'Number of calls to _schedule, after lock is acquired'), + SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'), + SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'), + SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 576f6bf799..9741f83a08 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -401,13 +401,15 @@ class AutoscalePool(WorkerPool): # the task manager to never do more work current_task = w.current_task if current_task and isinstance(current_task, dict): - if current_task.get('task', '').endswith('tasks.run_task_manager'): + endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager'] + current_task_name = current_task.get('task', '') + if any(current_task_name.endswith(e) for e in endings): if 'started' not in current_task: w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (60 * 5): - logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa + if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa os.kill(w.pid, signal.SIGTERM) for m in orphaned: diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 78acec423d..4361be300a 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -862,7 +862,7 @@ class Command(BaseCommand): overwrite_vars=bool(options.get('overwrite_vars', False)), ) inventory_update = inventory_source.create_inventory_update( - _eager_fields=dict(job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) + _eager_fields=dict(status='running', job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) ) data = AnsibleInventoryLoader(source=source, verbosity=verbosity).load() diff --git a/awx/main/migrations/0165_task_manager_refactor.py b/awx/main/migrations/0165_task_manager_refactor.py new file mode 100644 index 0000000000..2df6c6c2c2 --- /dev/null +++ b/awx/main/migrations/0165_task_manager_refactor.py @@ -0,0 +1,35 @@ +# Generated by Django 3.2.13 on 2022-08-10 14:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0164_remove_inventorysource_update_on_project_update'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='preferred_instance_groups_cache', + field=models.JSONField( + blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True + ), + ), + migrations.AddField( + model_name='unifiedjob', + name='task_impact', + field=models.PositiveIntegerField(default=0, editable=False, help_text='Number of forks an instance consumes when running this job.'), + ), + migrations.AddField( + model_name='workflowapproval', + name='expires', + field=models.DateTimeField( + default=None, + editable=False, + help_text='The time this approval will expire. This is the created time plus timeout, used for filtering.', + null=True, + ), + ), + ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index d0608bd652..7543162080 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -90,6 +90,9 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): extra_vars_dict = VarsDictProperty('extra_vars', True) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + def clean_inventory(self): inv = self.inventory if not inv: @@ -178,12 +181,12 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): def get_passwords_needed_to_start(self): return self.passwords_needed_to_start - @property - def task_impact(self): + def _get_task_impact(self): # NOTE: We sorta have to assume the host count matches and that forks default to 5 - from awx.main.models.inventory import Host - - count_hosts = Host.objects.filter(enabled=True, inventory__ad_hoc_commands__pk=self.pk).count() + if self.inventory: + count_hosts = self.inventory.total_hosts + else: + count_hosts = 5 return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 def copy(self): @@ -207,10 +210,20 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): def save(self, *args, **kwargs): update_fields = kwargs.get('update_fields', []) + + def add_to_update_fields(name): + if name not in update_fields: + update_fields.append(name) + + if not self.preferred_instance_groups_cache: + self.preferred_instance_groups_cache = self._get_preferred_instance_group_cache() + add_to_update_fields("preferred_instance_groups_cache") if not self.name: self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512) - if 'name' not in update_fields: - update_fields.append('name') + add_to_update_fields("name") + if self.task_impact == 0: + self.task_impact = self._get_task_impact() + add_to_update_fields("task_impact") super(AdHocCommand, self).save(*args, **kwargs) @property diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 782ca59344..3a6b7740a2 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -12,6 +12,7 @@ from django.dispatch import receiver from django.utils.translation import gettext_lazy as _ from django.conf import settings from django.utils.timezone import now, timedelta +from django.db.models import Sum import redis from solo.models import SingletonModel @@ -149,10 +150,13 @@ class Instance(HasPolicyEditsMixin, BaseModel): def consumed_capacity(self): capacity_consumed = 0 if self.node_type in ('hybrid', 'execution'): - capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + capacity_consumed += ( + UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).aggregate(Sum("task_impact"))["task_impact__sum"] + or 0 + ) if self.node_type in ('hybrid', 'control'): - capacity_consumed += sum( - settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')) + capacity_consumed += ( + settings.AWX_CONTROL_NODE_TASK_IMPACT * UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')).count() ) return capacity_consumed diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 9a386db9c2..e4cfc9f78d 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -337,9 +337,12 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): else: active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES) failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True) + total_hosts = active_hosts.count() + # if total_hosts has changed, set update_task_impact to True + update_task_impact = total_hosts != self.total_hosts computed_fields = { 'has_active_failures': bool(failed_hosts.count()), - 'total_hosts': active_hosts.count(), + 'total_hosts': total_hosts, 'hosts_with_active_failures': failed_hosts.count(), 'total_groups': active_groups.count(), 'has_inventory_sources': bool(active_inventory_sources.count()), @@ -357,6 +360,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): computed_fields.pop(field) if computed_fields: iobj.save(update_fields=computed_fields.keys()) + if update_task_impact: + # if total hosts count has changed, re-calculate task_impact for any + # job that is still in pending for this inventory, since task_impact + # is cached on task creation and used in task management system + tasks = self.jobs.filter(status="pending") + for t in tasks: + t.task_impact = t._get_task_impact() + UnifiedJob.objects.bulk_update(tasks, ['task_impact']) logger.debug("Finished updating inventory computed fields, pk={0}, in " "{1:.3f} seconds".format(self.pk, time.time() - start_time)) def websocket_emit_status(self, status): @@ -1220,8 +1231,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, return UnpartitionedInventoryUpdateEvent return InventoryUpdateEvent - @property - def task_impact(self): + def _get_task_impact(self): return 1 # InventoryUpdate credential required diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b1926435b1..fa313dfc23 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -644,8 +644,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana raise ParseError(_('{status_value} is not a valid status option.').format(status_value=status)) return self._get_hosts(**kwargs) - @property - def task_impact(self): + def _get_task_impact(self): if self.launch_type == 'callback': count_hosts = 2 else: @@ -1213,6 +1212,9 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): extra_vars_dict = VarsDictProperty('extra_vars', True) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @classmethod def _get_parent_field_name(cls): return 'system_job_template' @@ -1238,8 +1240,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): return UnpartitionedSystemJobEvent return SystemJobEvent - @property - def task_impact(self): + def _get_task_impact(self): return 5 @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 580f029a0b..5b8fabde97 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -513,6 +513,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage help_text=_('The SCM Revision discovered by this update for the given project and branch.'), ) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + def _get_parent_field_name(self): return 'project' @@ -560,8 +563,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage return UnpartitionedProjectUpdateEvent return ProjectUpdateEvent - @property - def task_impact(self): + def _get_task_impact(self): return 0 if self.job_type == 'run' else 1 @property diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 8a53fb5a19..aa26950844 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -45,7 +45,8 @@ from awx.main.utils.common import ( get_type_for_model, parse_yaml_or_json, getattr_dne, - schedule_task_manager, + ScheduleDependencyManager, + ScheduleTaskManager, get_event_partition_epoch, get_capacity_type, ) @@ -381,6 +382,11 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn unified_job.survey_passwords = new_job_passwords kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch + unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache() + + unified_job._set_default_dependencies_processed() + unified_job.task_impact = unified_job._get_task_impact() + from awx.main.signals import disable_activity_stream, activity_stream_create with disable_activity_stream(): @@ -693,6 +699,14 @@ class UnifiedJob( on_delete=polymorphic.SET_NULL, help_text=_('The Instance group the job was run under'), ) + preferred_instance_groups_cache = models.JSONField( + blank=True, + null=True, + default=None, + editable=False, + help_text=_("A cached list with pk values from preferred instance groups."), + ) + task_impact = models.PositiveIntegerField(default=0, editable=False, help_text=_("Number of forks an instance consumes when running this job.")) organization = models.ForeignKey( 'Organization', blank=True, @@ -754,6 +768,9 @@ class UnifiedJob( def _get_parent_field_name(self): return 'unified_job_template' # Override in subclasses. + def _get_preferred_instance_group_cache(self): + return [ig.pk for ig in self.preferred_instance_groups] + @classmethod def _get_unified_job_template_class(cls): """ @@ -808,6 +825,9 @@ class UnifiedJob( update_fields = self._update_parent_instance_no_save(parent_instance) parent_instance.save(update_fields=update_fields) + def _set_default_dependencies_processed(self): + pass + def save(self, *args, **kwargs): """Save the job, with current status, to the database. Ensure that all data is consistent before doing so. @@ -1026,7 +1046,6 @@ class UnifiedJob( event_qs = self.get_event_queryset() except NotImplementedError: return True # Model without events, such as WFJT - self.log_lifecycle("event_processing_finished") return self.emitted_events == event_qs.count() def result_stdout_raw_handle(self, enforce_max_bytes=True): @@ -1241,9 +1260,8 @@ class UnifiedJob( except JobLaunchConfig.DoesNotExist: return False - @property - def task_impact(self): - raise NotImplementedError # Implement in subclass. + def _get_task_impact(self): + return self.task_impact # return default, should implement in subclass. def websocket_emit_data(self): '''Return extra data that should be included when submitting data to the browser over the websocket connection''' @@ -1358,7 +1376,10 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - schedule_task_manager() + if self.dependencies_processed: + ScheduleTaskManager().schedule() + else: + ScheduleDependencyManager().schedule() # Each type of unified job has a different Task class; get the # appropirate one. @@ -1515,8 +1536,8 @@ class UnifiedJob( 'state': state, 'work_unit_id': self.work_unit_id, } - if self.unified_job_template: - extra["template_name"] = self.unified_job_template.name + if self.name: + extra["task_name"] = self.name if state == "blocked" and blocked_by: blocked_by_msg = f"{blocked_by._meta.model_name}-{blocked_by.id}" msg = f"{self._meta.model_name}-{self.id} blocked by {blocked_by_msg}" diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 38896ae827..c9301f769a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -13,6 +13,7 @@ from django.db import connection, models from django.conf import settings from django.utils.translation import gettext_lazy as _ from django.core.exceptions import ObjectDoesNotExist +from django.utils.timezone import now, timedelta # from django import settings as tower_settings @@ -40,7 +41,7 @@ from awx.main.models.mixins import ( from awx.main.models.jobs import LaunchTimeConfigBase, LaunchTimeConfig, JobTemplate from awx.main.models.credential import Credential from awx.main.redact import REPLACE_STR -from awx.main.utils import schedule_task_manager +from awx.main.utils import ScheduleWorkflowManager __all__ = [ @@ -622,6 +623,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio ) is_sliced_job = models.BooleanField(default=False) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @property def workflow_nodes(self): return self.workflow_job_nodes @@ -668,8 +672,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio ) return result - @property - def task_impact(self): + def _get_task_impact(self): return 0 def get_ancestor_workflows(self): @@ -783,6 +786,12 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): default=0, help_text=_("The amount of time (in seconds) before the approval node expires and fails."), ) + expires = models.DateTimeField( + default=None, + null=True, + editable=False, + help_text=_("The time this approval will expire. This is the created time plus timeout, used for filtering."), + ) timed_out = models.BooleanField(default=False, help_text=_("Shows when an approval node (with a timeout assigned to it) has timed out.")) approved_or_denied_by = models.ForeignKey( 'auth.User', @@ -793,6 +802,9 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): on_delete=models.SET_NULL, ) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @classmethod def _get_unified_job_template_class(cls): return WorkflowApprovalTemplate @@ -810,13 +822,32 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): def _get_parent_field_name(self): return 'workflow_approval_template' + def save(self, *args, **kwargs): + update_fields = list(kwargs.get('update_fields', [])) + if self.timeout != 0 and ((not self.pk) or (not update_fields) or ('timeout' in update_fields)): + if not self.created: # on creation, created will be set by parent class, so we fudge it here + created = now() + else: + created = self.created + new_expires = created + timedelta(seconds=self.timeout) + if new_expires != self.expires: + self.expires = new_expires + if update_fields and 'expires' not in update_fields: + update_fields.append('expires') + elif self.timeout == 0 and ((not update_fields) or ('timeout' in update_fields)): + if self.expires: + self.expires = None + if update_fields and 'expires' not in update_fields: + update_fields.append('expires') + super(WorkflowApproval, self).save(*args, **kwargs) + def approve(self, request=None): self.status = 'successful' self.approved_or_denied_by = get_current_user() self.save() self.send_approval_notification('approved') self.websocket_emit_status(self.status) - schedule_task_manager() + ScheduleWorkflowManager().schedule() return reverse('api:workflow_approval_approve', kwargs={'pk': self.pk}, request=request) def deny(self, request=None): @@ -825,7 +856,7 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): self.save() self.send_approval_notification('denied') self.websocket_emit_status(self.status) - schedule_task_manager() + ScheduleWorkflowManager().schedule() return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request) def signal_start(self, **kwargs): diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 02b967368d..86f06687c4 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from .task_manager import TaskManager +from .task_manager import TaskManager, DependencyManager, WorkflowManager -__all__ = ['TaskManager'] +__all__ = ['TaskManager', 'DependencyManager', 'WorkflowManager'] diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 48d2bd2971..c0f2801f08 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -7,6 +7,11 @@ from awx.main.models import ( WorkflowJob, ) +import logging + + +logger = logging.getLogger('awx.main.scheduler.dependency_graph') + class DependencyGraph(object): PROJECT_UPDATES = 'project_updates' @@ -36,6 +41,9 @@ class DependencyGraph(object): self.data[self.WORKFLOW_JOB_TEMPLATES_JOBS] = {} def mark_if_no_key(self, job_type, id, job): + if id is None: + logger.warning(f'Null dependency graph key from {job}, could be integrity error or bug, ignoring') + return # only mark first occurrence of a task. If 10 of JobA are launched # (concurrent disabled), the dependency graph should return that jobs # 2 through 10 are blocked by job1 @@ -66,7 +74,10 @@ class DependencyGraph(object): self.mark_if_no_key(self.JOB_TEMPLATE_JOBS, job.job_template_id, job) def mark_workflow_job(self, job): - self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id, job) + if job.workflow_job_template_id: + self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id, job) + elif job.unified_job_template_id: # for sliced jobs + self.mark_if_no_key(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.unified_job_template_id, job) def project_update_blocked_by(self, job): return self.get_item(self.PROJECT_UPDATES, job.project_id) @@ -85,7 +96,13 @@ class DependencyGraph(object): def workflow_job_blocked_by(self, job): if job.allow_simultaneous is False: - return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id) + if job.workflow_job_template_id: + return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.workflow_job_template_id) + elif job.unified_job_template_id: + # Sliced jobs can be either Job or WorkflowJob type, and either should block a sliced WorkflowJob + return self.get_item(self.WORKFLOW_JOB_TEMPLATES_JOBS, job.unified_job_template_id) or self.get_item( + self.JOB_TEMPLATE_JOBS, job.unified_job_template_id + ) return None def system_job_blocked_by(self, job): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c2f193a1c..b219aaf3b0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -15,27 +15,31 @@ from django.db import transaction, connection from django.utils.translation import gettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now from django.conf import settings +from django.contrib.contenttypes.models import ContentType # AWX from awx.main.dispatch.reaper import reap_job from awx.main.models import ( - AdHocCommand, Instance, InventorySource, InventoryUpdate, Job, Project, ProjectUpdate, - SystemJob, UnifiedJob, WorkflowApproval, WorkflowJob, + WorkflowJobNode, WorkflowJobTemplate, ) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock -from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager -from awx.main.utils.common import create_partition +from awx.main.utils import ( + get_type_for_model, + ScheduleTaskManager, + ScheduleWorkflowManager, +) +from awx.main.utils.common import create_partition, task_manager_bulk_reschedule from awx.main.signals import disable_activity_stream from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph @@ -53,167 +57,97 @@ def timeit(func): t_now = time.perf_counter() result = func(*args, **kwargs) dur = time.perf_counter() - t_now - args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur) + args[0].subsystem_metrics.inc(f"{args[0].prefix}_{func.__name__}_seconds", dur) return result return inner -class TaskManager: - def __init__(self): - """ - Do NOT put database queries or other potentially expensive operations - in the task manager init. The task manager object is created every time a - job is created, transitions state, and every 30 seconds on each tower node. - More often then not, the object is destroyed quickly because the NOOP case is hit. - - The NOOP case is short-circuit logic. If the task manager realizes that another instance - of the task manager is already running, then it short-circuits and decides not to run. - """ - # start task limit indicates how many pending jobs can be started on this - # .schedule() run. Starting jobs is expensive, and there is code in place to reap - # the task manager after 5 minutes. At scale, the task manager can easily take more than - # 5 minutes to start pending jobs. If this limit is reached, pending jobs - # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) - self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) +class TaskBase: + def __init__(self, prefix=""): + self.prefix = prefix # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. + self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.start_time = time.time() + self.start_task_limit = settings.START_TASK_LIMIT for m in self.subsystem_metrics.METRICS: - if m.startswith("task_manager"): + if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) - def after_lock_init(self, all_sorted_tasks): - """ - Init AFTER we know this instance of the task manager will run because the lock is acquired. - """ - self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) - self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) - self.controlplane_ig = self.instance_groups.controlplane_ig - - def job_blocked_by(self, task): - # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph - # in the old task manager this was handled as a method on each task object outside of the graph and - # probably has the side effect of cutting down *a lot* of the logic from this task manager class - blocked_by = self.dependency_graph.task_blocked_by(task) - if blocked_by: - return blocked_by - - for dep in task.dependent_jobs.all(): - if dep.status in ACTIVE_STATES: - return dep - # if we detect a failed or error dependency, go ahead and fail this - # task. The errback on the dependency takes some time to trigger, - # and we don't want the task to enter running state if its - # dependency has failed or errored. - elif dep.status in ("error", "failed"): - task.status = 'failed' - task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( - get_type_for_model(type(dep)), - dep.name, - dep.id, - ) - task.save(update_fields=['status', 'job_explanation']) - task.websocket_emit_status('failed') - return dep - - return None + def timed_out(self): + """Return True/False if we have met or exceeded the timeout for the task manager.""" + elapsed = time.time() - self.start_time + if elapsed >= settings.TASK_MANAGER_TIMEOUT: + logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.") + return True + return False @timeit - def get_tasks(self, status_list=('pending', 'waiting', 'running')): - jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] - inventory_updates_qs = ( - InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group') + def get_tasks(self, filter_args): + wf_approval_ctype_id = ContentType.objects.get_for_model(WorkflowApproval).id + qs = ( + UnifiedJob.objects.filter(**filter_args) + .exclude(launch_type='sync') + .exclude(polymorphic_ctype_id=wf_approval_ctype_id) + .order_by('created') + .prefetch_related('dependent_jobs') ) - inventory_updates = [i for i in inventory_updates_qs] - # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)] - all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) - return all_tasks + self.all_tasks = [t for t in qs] - def get_running_workflow_jobs(self): - graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(): + # increment task_manager_schedule_calls regardless if the other + # metrics are recorded + s_metrics.Metrics(auto_pipe_execute=True).inc(f"{self.prefix}__schedule_calls", 1) + # Only record metrics if the last time recording was more + # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. + # Prevents a short-duration task manager that runs directly after a + # long task manager to override useful metrics. + current_time = time.time() + time_last_recorded = current_time - self.subsystem_metrics.decode(f"{self.prefix}_recorded_timestamp") + if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: + logger.debug(f"recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + self.subsystem_metrics.set(f"{self.prefix}_recorded_timestamp", current_time) + self.subsystem_metrics.pipe_execute() + else: + logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") - def get_inventory_source_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + def record_aggregate_metrics_and_exit(self, *args): + self.record_aggregate_metrics() + sys.exit(1) + + def schedule(self): + # Lock + with task_manager_bulk_reschedule(): + with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: + with transaction.atomic(): + if acquired is False: + logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") + return + logger.debug(f"Starting {self.prefix} Scheduler") + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) + self._schedule() + self.record_aggregate_metrics() + logger.debug(f"Finishing {self.prefix} Scheduler") + + +class WorkflowManager(TaskBase): + def __init__(self): + super().__init__(prefix="workflow_manager") @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): + def spawn_workflow_graph_jobs(self): result = [] - for workflow_job in workflow_jobs: + for workflow_job in self.all_tasks: + if self.timed_out(): + logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") + ScheduleWorkflowManager().schedule() + # Do not process any more workflow jobs. Stop here. + # Maybe we should schedule another WorkflowManager run + break dag = WorkflowDAG(workflow_job) status_changed = False if workflow_job.cancel_flag: @@ -228,99 +162,106 @@ class TaskManager: status_changed = True else: workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) + WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run']) + # If workflow is now done, we do special things to mark it as done. is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True + if is_done: + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: if workflow_job.spawned_by_workflow: - schedule_task_manager() + ScheduleWorkflowManager().schedule() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + + if workflow_job.status == 'running': + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " + "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join('<{}>'.format(tmp) for tmp in display_list)) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + return result @timeit - def start_task(self, task, instance_group, dependent_tasks=None, instance=None): - self.subsystem_metrics.inc("task_manager_tasks_started", 1) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - # schedule another run immediately after this task manager - schedule_task_manager() - from awx.main.tasks.system import handle_work_error, handle_work_success - - dependent_tasks = dependent_tasks or [] - - task_actual = { - 'type': get_type_for_model(type(task)), - 'id': task.id, - } - dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] - - task.status = 'waiting' - - (start_status, opts) = task.pre_start() - if not start_status: - task.status = 'failed' - if task.job_explanation: - task.job_explanation += ' ' - task.job_explanation += 'Task failed pre-start check.' - task.save() - # TODO: run error handler to fail sub-tasks and send notifications - else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() - # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) - with disable_activity_stream(): - task.celery_task_id = str(uuid.uuid4()) - task.save() - task.log_lifecycle("waiting") - - def post_commit(): - if task.status != 'failed' and type(task) is not WorkflowJob: - # Before task is dispatched, ensure that job_event partitions exist - create_partition(task.event_class._meta.db_table, start=task.created) - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], - errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], - ) - - task.websocket_emit_status(task.status) # adds to on_commit - connection.on_commit(post_commit) + def get_tasks(self, filter_args): + self.all_tasks = [wf for wf in WorkflowJob.objects.filter(**filter_args)] @timeit - def process_running_tasks(self, running_tasks): - for task in running_tasks: - self.dependency_graph.add_job(task) + def _schedule(self): + self.get_tasks(dict(status__in=["running"], dependencies_processed=True)) + if len(self.all_tasks) > 0: + self.spawn_workflow_graph_jobs() + + +class DependencyManager(TaskBase): + def __init__(self): + super().__init__(prefix="dependency_manager") def create_project_update(self, task, project_id=None): if project_id is None: @@ -341,14 +282,20 @@ class TaskManager: inventory_task.status = 'pending' inventory_task.save() logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format)) - # inventory_sources = self.get_inventory_source_tasks([task]) - # self.process_inventory_sources(inventory_sources) + return inventory_task def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) + def get_inventory_source_tasks(self): + inventory_ids = set() + for task in self.all_tasks: + if isinstance(task, Job): + inventory_ids.add(task.inventory_id) + self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -481,16 +428,166 @@ class TaskManager: return created_dependencies + def process_tasks(self): + deps = self.generate_dependencies(self.all_tasks) + self.generate_dependencies(deps) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps)) + + @timeit + def _schedule(self): + self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + + if len(self.all_tasks) > 0: + self.get_inventory_source_tasks() + self.process_tasks() + ScheduleTaskManager().schedule() + + +class TaskManager(TaskBase): + def __init__(self): + """ + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + """ + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.time_delta_job_explanation = timedelta(seconds=30) + super().__init__(prefix="task_manager") + + def after_lock_init(self): + """ + Init AFTER we know this instance of the task manager will run because the lock is acquired. + """ + self.dependency_graph = DependencyGraph() + self.instances = TaskManagerInstances(self.all_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig + + def job_blocked_by(self, task): + # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph + # in the old task manager this was handled as a method on each task object outside of the graph and + # probably has the side effect of cutting down *a lot* of the logic from this task manager class + blocked_by = self.dependency_graph.task_blocked_by(task) + if blocked_by: + return blocked_by + + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep + + return None + + @timeit + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.dependency_graph.add_job(task) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + ScheduleTaskManager().schedule() + from awx.main.tasks.system import handle_work_error, handle_work_success + + # update capacity for control node and execution node + if task.controller_node: + self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) + if task.execution_node: + self.instances[task.execution_node].consume_capacity(task.task_impact) + + dependent_tasks = dependent_tasks or [] + + task_actual = { + 'type': get_type_for_model(type(task)), + 'id': task.id, + } + dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + + task.status = 'waiting' + + (start_status, opts) = task.pre_start() + if not start_status: + task.status = 'failed' + if task.job_explanation: + task.job_explanation += ' ' + task.job_explanation += 'Task failed pre-start check.' + task.save() + # TODO: run error handler to fail sub-tasks and send notifications + else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + # Call this to ensure Workflow nodes get spawned in timely manner + ScheduleWorkflowManager().schedule() + # at this point we already have control/execution nodes selected for the following cases + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) + with disable_activity_stream(): + task.celery_task_id = str(uuid.uuid4()) + task.save() + task.log_lifecycle("waiting") + + def post_commit(): + if task.status != 'failed' and type(task) is not WorkflowJob: + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + callbacks=[{'task': handle_work_success.name, 'kwargs': {'task_actual': task_actual}}], + errbacks=[{'task': handle_work_error.name, 'args': [task.celery_task_id], 'kwargs': {'subtasks': [task_actual] + dependencies}}], + ) + + task.websocket_emit_status(task.status) # adds to on_commit + connection.on_commit(post_commit) + + @timeit + def process_running_tasks(self, running_tasks): + for task in running_tasks: + if type(task) is WorkflowJob: + ScheduleWorkflowManager().schedule() + self.dependency_graph.add_job(task) + @timeit def process_pending_tasks(self, pending_tasks): - running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] for task in pending_tasks: if self.start_task_limit <= 0: break + if self.timed_out(): + logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early") + break blocked_by = self.job_blocked_by(task) if blocked_by: - self.subsystem_metrics.inc("task_manager_tasks_blocked", 1) + self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -499,19 +596,16 @@ class TaskManager: tasks_to_update_job_explanation.append(task) continue - found_acceptable_queue = False - preferred_instance_groups = task.preferred_instance_groups - if isinstance(task, WorkflowJob): - if task.unified_job_template_id in running_workflow_templates: - if not task.allow_simultaneous: - logger.debug("{} is blocked from running, workflow already running".format(task.log_format)) - continue - else: - running_workflow_templates.add(task.unified_job_template_id) + # Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph. + # Double check that using just the DependencyGraph works for Workflows and Sliced Jobs. self.start_task(task, None, task.get_jobs_fail_chain(), None) continue + found_acceptable_queue = False + + preferred_instance_groups = self.instance_groups.get_instance_groups_from_task_cache(task) + # Determine if there is control capacity for the task if task.capacity_type == 'control': control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT @@ -530,8 +624,6 @@ class TaskManager: # All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups if task.capacity_type == 'control': task.execution_node = control_instance.hostname - control_instance.consume_capacity(control_impact) - self.dependency_graph.add_job(task) execution_instance = self.instances[control_instance.hostname].obj task.log_lifecycle("controller_node_chosen") task.log_lifecycle("execution_node_chosen") @@ -541,7 +633,6 @@ class TaskManager: for instance_group in preferred_instance_groups: if instance_group.is_container_group: - self.dependency_graph.add_job(task) self.start_task(task, instance_group, task.get_jobs_fail_chain(), None) found_acceptable_queue = True break @@ -563,9 +654,7 @@ class TaskManager: control_instance = execution_instance task.controller_node = execution_instance.hostname - control_instance.consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) task.log_lifecycle("controller_node_chosen") - execution_instance.consume_capacity(task.task_impact) task.log_lifecycle("execution_node_chosen") logger.debug( "Starting {} in group {} instance {} (remaining_capacity={})".format( @@ -573,7 +662,6 @@ class TaskManager: ) ) execution_instance = self.instances[execution_instance.hostname].obj - self.dependency_graph.add_job(task) self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True break @@ -599,25 +687,6 @@ class TaskManager: tasks_to_update_job_explanation.append(task) logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) - def timeout_approval_node(self): - workflow_approvals = WorkflowApproval.objects.filter(status='pending') - now = tz_now() - for task in workflow_approvals: - approval_timeout_seconds = timedelta(seconds=task.timeout) - if task.timeout == 0: - continue - if (now - task.created) >= approval_timeout_seconds: - timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format( - name=task.name, pk=task.pk, timeout=task.timeout - ) - logger.warning(timeout_message) - task.timed_out = True - task.status = 'failed' - task.send_approval_notification('timed_out') - task.websocket_emit_status(task.status) - task.job_explanation = timeout_message - task.save(update_fields=['status', 'job_explanation', 'timed_out']) - def reap_jobs_from_orphaned_instances(self): # discover jobs that are in running state but aren't on an execution node # that we know about; this is a fairly rare event, but it can occur if you, @@ -630,92 +699,45 @@ class TaskManager: logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}') reap_job(j, 'failed') - def process_tasks(self, all_sorted_tasks): - running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] + def process_tasks(self): + running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) - self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) - pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] - - undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] - dependencies = self.generate_dependencies(undeped_tasks) - deps_of_deps = self.generate_dependencies(dependencies) - dependencies += deps_of_deps - self.process_pending_tasks(dependencies) - self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies)) + pending_tasks = [t for t in self.all_tasks if t.status == 'pending'] self.process_pending_tasks(pending_tasks) - self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks)) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) + + def timeout_approval_node(self, task): + if self.timed_out(): + logger.warning("Task manager has reached time out while processing approval nodes, exiting loop early") + # Do not process any more workflow approval nodes. Stop here. + # Maybe we should schedule another TaskManager run + return + timeout_message = _("The approval node {name} ({pk}) has expired after {timeout} seconds.").format(name=task.name, pk=task.pk, timeout=task.timeout) + logger.warning(timeout_message) + task.timed_out = True + task.status = 'failed' + task.send_approval_notification('timed_out') + task.websocket_emit_status(task.status) + task.job_explanation = timeout_message + task.save(update_fields=['status', 'job_explanation', 'timed_out']) + + def get_expired_workflow_approvals(self): + # timeout of 0 indicates that it never expires + qs = WorkflowApproval.objects.filter(status='pending').exclude(timeout=0).filter(expires__lt=tz_now()) + return qs @timeit def _schedule(self): - finished_wfjs = [] - all_sorted_tasks = self.get_tasks() + self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) - self.after_lock_init(all_sorted_tasks) + self.after_lock_init() + self.reap_jobs_from_orphaned_instances() - if len(all_sorted_tasks) > 0: - # TODO: Deal with - # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) - # self.process_latest_project_updates(latest_project_updates) + if len(self.all_tasks) > 0: + self.process_tasks() - # latest_inventory_updates = self.get_latest_inventory_update_tasks(all_sorted_tasks) - # self.process_latest_inventory_updates(latest_inventory_updates) - - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - - running_workflow_tasks = self.get_running_workflow_jobs() - finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - - self.spawn_workflow_graph_jobs(running_workflow_tasks) - - self.timeout_approval_node() - self.reap_jobs_from_orphaned_instances() - - self.process_tasks(all_sorted_tasks) - return finished_wfjs - - def record_aggregate_metrics(self, *args): - if not settings.IS_TESTING(): - # increment task_manager_schedule_calls regardless if the other - # metrics are recorded - s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1) - # Only record metrics if the last time recording was more - # than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago. - # Prevents a short-duration task manager that runs directly after a - # long task manager to override useful metrics. - current_time = time.time() - time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp") - if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL: - logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago") - self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time) - self.subsystem_metrics.pipe_execute() - else: - logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago") - - def record_aggregate_metrics_and_exit(self, *args): - self.record_aggregate_metrics() - sys.exit(1) - - def schedule(self): - # Lock - with advisory_lock('task_manager_lock', wait=False) as acquired: - with transaction.atomic(): - if acquired is False: - logger.debug("Not running scheduler, another task holds lock") - return - logger.debug("Starting Scheduler") - with task_manager_bulk_reschedule(): - # if sigterm due to timeout, still record metrics - signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) - self._schedule() - self.record_aggregate_metrics() - logger.debug("Finishing Scheduler") + for workflow_approval in self.get_expired_workflow_approvals(): + self.timeout_approval_node(workflow_approval) diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index 678e545152..cade939343 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -67,6 +67,7 @@ class TaskManagerInstanceGroups: def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None): self.instance_groups = dict() self.controlplane_ig = None + self.pk_ig_map = dict() if instance_groups is not None: # for testing self.instance_groups = instance_groups @@ -81,6 +82,7 @@ class TaskManagerInstanceGroups: instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname ], ) + self.pk_ig_map[instance_group.pk] = instance_group def get_remaining_capacity(self, group_name): instances = self.instance_groups[group_name]['instances'] @@ -121,3 +123,17 @@ class TaskManagerInstanceGroups: elif i.capacity > largest_instance.capacity: largest_instance = i return largest_instance + + def get_instance_groups_from_task_cache(self, task): + igs = [] + if task.preferred_instance_groups_cache: + for pk in task.preferred_instance_groups_cache: + ig = self.pk_ig_map.get(pk, None) + if ig: + igs.append(ig) + else: + logger.warn(f"Unknown instance group with pk {pk} for task {task}") + if len(igs) == 0: + logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}") + return task.global_instance_groups + return igs diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 65c2a88be7..b762e1c429 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -1,15 +1,35 @@ # Python import logging +# Django +from django.conf import settings + # AWX -from awx.main.scheduler import TaskManager +from awx import MODE +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.dispatch.publish import task from awx.main.dispatch import get_local_queuename logger = logging.getLogger('awx.main.scheduler') +def run_manager(manager, prefix): + if MODE == 'development' and settings.AWX_DISABLE_TASK_MANAGERS: + logger.debug(f"Not running {prefix} manager, AWX_DISABLE_TASK_MANAGERS is True. Trigger with GET to /api/debug/{prefix}_manager/") + return + manager().schedule() + + @task(queue=get_local_queuename) -def run_task_manager(): - logger.debug("Running task manager.") - TaskManager().schedule() +def task_manager(): + run_manager(TaskManager, "task") + + +@task(queue=get_local_queuename) +def dependency_manager(): + run_manager(DependencyManager, "dependency") + + +@task(queue=get_local_queuename) +def workflow_manager(): + run_manager(WorkflowManager, "workflow") diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index b828326339..697df497d1 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -53,7 +53,8 @@ from awx.main.dispatch import get_local_queuename, reaper from awx.main.utils.common import ( ignore_inventory_computed_fields, ignore_inventory_group_removal, - schedule_task_manager, + ScheduleWorkflowManager, + ScheduleTaskManager, ) from awx.main.utils.external_logging import reconfigure_rsyslog @@ -657,6 +658,13 @@ def awx_periodic_scheduler(): state.save() +def schedule_manager_success_or_error(instance): + if instance.unifiedjob_blocked_jobs.exists(): + ScheduleTaskManager().schedule() + if instance.spawned_by_workflow: + ScheduleWorkflowManager().schedule() + + @task(queue=get_local_queuename) def handle_work_success(task_actual): try: @@ -666,8 +674,7 @@ def handle_work_success(task_actual): return if not instance: return - - schedule_task_manager() + schedule_manager_success_or_error(instance) @task(queue=get_local_queuename) @@ -709,8 +716,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - schedule_task_manager() - pass + schedule_manager_success_or_error(first_instance) @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index 74ab92fd7b..0b89dfb546 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -13,7 +13,10 @@ from awx.main.models.workflow import ( WorkflowJobTemplateNode, ) from awx.main.models.credential import Credential -from awx.main.scheduler import TaskManager +from awx.main.scheduler import TaskManager, WorkflowManager, DependencyManager + +# Django +from django.utils.timezone import now, timedelta @pytest.fixture @@ -137,8 +140,9 @@ class TestApprovalNodes: post(url, {'name': 'Approve Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201) post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201) wf_job = WorkflowJob.objects.first() + DependencyManager().schedule() # TODO: exclude workflows from this and delete line TaskManager().schedule() - TaskManager().schedule() + WorkflowManager().schedule() wfj_node = wf_job.workflow_nodes.first() approval = wfj_node.job assert approval.name == 'Approve Test' @@ -162,8 +166,9 @@ class TestApprovalNodes: post(url, {'name': 'Deny Test', 'description': '', 'timeout': 0}, user=admin_user, expect=201) post(reverse('api:workflow_job_template_launch', kwargs={'pk': wfjt.pk}), user=admin_user, expect=201) wf_job = WorkflowJob.objects.first() + DependencyManager().schedule() # TODO: exclude workflows from this and delete line TaskManager().schedule() - TaskManager().schedule() + WorkflowManager().schedule() wfj_node = wf_job.workflow_nodes.first() approval = wfj_node.job assert approval.name == 'Deny Test' @@ -216,6 +221,37 @@ class TestApprovalNodes: approval.refresh_from_db() assert approval.status == 'failed' + def test_expires_time_on_creation(self): + now_time = now() + wa = WorkflowApproval.objects.create(timeout=34) + # this is fudged, so we assert that the expires time is in reasonable range + assert timedelta(seconds=33) < (wa.expires - now_time) < timedelta(seconds=35) + + @pytest.mark.parametrize('with_update_fields', [True, False]) + def test_expires_time_update(self, with_update_fields): + wa = WorkflowApproval.objects.create() + assert wa.timeout == 0 + assert wa.expires is None + wa.timeout = 1234 + if with_update_fields: + wa.save(update_fields=['timeout']) + else: + wa.save() + assert wa.created + timedelta(seconds=1234) == wa.expires + + @pytest.mark.parametrize('with_update_fields', [True, False]) + def test_reset_timeout_and_expires(self, with_update_fields): + wa = WorkflowApproval.objects.create() + wa.timeout = 1234 + wa.save() + assert wa.expires + wa.timeout = 0 + if with_update_fields: + wa.save(update_fields=['timeout']) + else: + wa.save() + assert wa.expires is None + @pytest.mark.django_db class TestExclusiveRelationshipEnforcement: diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 4d17d09440..389ea731b9 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -252,12 +252,14 @@ class TestTaskImpact: def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away): job = job_host_limit(5, 2) job.inventory.update_computed_fields() + job.task_impact = job._get_task_impact() assert job.inventory.total_hosts == 5 assert job.task_impact == 2 + 1 # forks becomes constraint def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away): job = job_host_limit(3, 5) job.inventory.update_computed_fields() + job.task_impact = job._get_task_impact() assert job.task_impact == 3 + 1 # hosts becomes constraint def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away): @@ -270,9 +272,13 @@ class TestTaskImpact: # Even distribution - all jobs run on 1 host assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1] jobs[0].inventory.update_computed_fields() + for j in jobs: + j.task_impact = j._get_task_impact() assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact # Uneven distribution - first job takes the extra host jobs[0].inventory.hosts.create(name='remainder_foo') assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1] jobs[0].inventory.update_computed_fields() + # recalculate task_impact + jobs[0].task_impact = jobs[0]._get_task_impact() assert [job.task_impact for job in jobs] == [3, 2, 2] diff --git a/awx/main/tests/functional/task_management/__init__.py b/awx/main/tests/functional/task_management/__init__.py new file mode 100644 index 0000000000..4bb27988f4 --- /dev/null +++ b/awx/main/tests/functional/task_management/__init__.py @@ -0,0 +1,6 @@ +def create_job(jt, dependencies_processed=True): + job = jt.create_unified_job() + job.status = "pending" + job.dependencies_processed = dependencies_processed + job.save() + return job diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 728c60f92d..6bed591147 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -1,9 +1,10 @@ import pytest from unittest import mock from datetime import timedelta -from awx.main.scheduler import TaskManager -from awx.main.models import InstanceGroup, WorkflowJob +from awx.main.scheduler import TaskManager, DependencyManager +from awx.main.models import InstanceGroup from awx.main.tasks.system import apply_cluster_membership_policies +from . import create_job @pytest.mark.django_db @@ -12,16 +13,12 @@ def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_gr i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_still_start"]) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig2) - j2 = objects2.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() + j2 = create_job(objects2.job_template) with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 with mocker.patch("awx.main.scheduler.TaskManager.start_task"): @@ -35,23 +32,26 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory( + 'jt1', + organization='org1', + project='proj1', + inventory='inv1', + credential='cred1', + ) objects1.job_template.instance_groups.add(ig1) + j1 = create_job(objects1.job_template, dependencies_processed=False) p = objects1.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 0 p.scm_type = "git" p.scm_url = "http://github.com/ansible/ansible.git" p.save() - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2', jobs=["job_should_still_start"]) + objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig2) - j2 = objects2.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() + j2 = create_job(objects2.job_template, dependencies_processed=False) with mocker.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() pu = p.project_updates.first() TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0]) @@ -59,6 +59,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta pu.status = "successful" pu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() TaskManager.start_task.assert_any_call(j1, ig1, [], i1) @@ -69,7 +70,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta @pytest.mark.django_db def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker): wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template - wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) + wfj = wfjt.create_unified_job() wfj.status = "pending" wfj.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): @@ -85,39 +86,50 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, control i1.capacity = 1020 i1.save() i2 = instance_factory("i2") + i2.capacity = 1020 + i2.save() ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory( - 'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"] - ) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig1) - j1_1 = objects2.jobs['job_should_also_start'] - j1_1.status = 'pending' - j1_1.save() - objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3', jobs=["job_should_still_start"]) + j1_1 = create_job(objects2.job_template) + objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3') objects3.job_template.instance_groups.add(ig2) - j2 = objects3.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() - objects4 = job_template_factory( - 'jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4', jobs=["job_should_not_start"] - ) + j2 = create_job(objects3.job_template) + objects4 = job_template_factory('jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4') objects4.job_template.instance_groups.add(ig2) - j2_1 = objects4.jobs['job_should_not_start'] - j2_1.status = 'pending' - j2_1.save() - tm = TaskManager() + j2_1 = create_job(objects4.job_template) + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 - with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: - tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j1_1, ig1, [], i1), mock.call(j2, ig2, [], i2)]) - assert mock_job.call_count == 3 + TaskManager().schedule() + + # all jobs should be able to run, plenty of capacity across both instances + for j in [j1, j1_1, j2, j2_1]: + j.refresh_from_db() + assert j.status == "waiting" + + # reset to pending + for j in [j1, j1_1, j2, j2_1]: + j.status = "pending" + j.save() + + # make i2 can only be able to fit 1 job + i2.capacity = 510 + i2.save() + + TaskManager().schedule() + + for j in [j1, j1_1, j2]: + j.refresh_from_db() + assert j.status == "waiting" + + j2_1.refresh_from_db() + # could not run because i2 is full + assert j2_1.status == "pending" @pytest.mark.django_db @@ -126,19 +138,13 @@ def test_failover_group_run(instance_factory, controlplane_instance_group, mocke i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory( - 'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"] - ) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig1) objects2.job_template.instance_groups.add(ig2) - j1_1 = objects2.jobs['job_should_also_start'] - j1_1.status = 'pending' - j1_1.save() + j1_1 = create_job(objects2.job_template) tm = TaskManager() with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index c9194c8b87..4081601918 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -3,21 +3,19 @@ from unittest import mock import json from datetime import timedelta -from awx.main.scheduler import TaskManager -from awx.main.scheduler.dependency_graph import DependencyGraph +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.utils import encrypt_field from awx.main.models import WorkflowJobTemplate, JobTemplate, Job from awx.main.models.ha import Instance +from . import create_job from django.conf import settings @pytest.mark.django_db def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker): instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + j = create_job(objects.job_template) with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) @@ -32,10 +30,8 @@ class TestJobLifeCycle: expect_commit - list of expected on_commit calls If any of these are None, then the assertion is not made. """ - if expect_schedule and len(expect_schedule) > 1: - raise RuntimeError('Task manager should reschedule itself one time, at most.') with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel: - with mock.patch('awx.main.utils.common._schedule_task_manager') as tm_sch: + with mock.patch('awx.main.utils.common.ScheduleManager._schedule') as tm_sch: # Job are ultimately submitted in on_commit hook, but this will not # actually run, because it waits until outer transaction, which is the test # itself in this case @@ -56,22 +52,21 @@ class TestJobLifeCycle: wj = wfjt.create_unified_job() assert wj.workflow_nodes.count() == 2 wj.signal_start() - tm = TaskManager() # Transitions workflow job to running # needs to re-schedule so it spawns jobs next round - self.run_tm(tm, [mock.call('running')], [mock.call()]) + self.run_tm(TaskManager(), [mock.call('running')]) # Spawns jobs # needs re-schedule to submit jobs next round - self.run_tm(tm, [mock.call('pending'), mock.call('pending')], [mock.call()]) + self.run_tm(WorkflowManager(), [mock.call('pending'), mock.call('pending')]) assert jt.jobs.count() == 2 # task manager spawned jobs # Submits jobs # intermission - jobs will run and reschedule TM when finished - self.run_tm(tm, [mock.call('waiting'), mock.call('waiting')], []) - + self.run_tm(DependencyManager()) # flip dependencies_processed to True + self.run_tm(TaskManager(), [mock.call('waiting'), mock.call('waiting')]) # I am the job runner for job in jt.jobs.all(): job.status = 'successful' @@ -79,7 +74,7 @@ class TestJobLifeCycle: # Finishes workflow # no further action is necessary, so rescheduling should not happen - self.run_tm(tm, [mock.call('successful')], []) + self.run_tm(WorkflowManager(), [mock.call('successful')]) def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group): wfjts = [WorkflowJobTemplate.objects.create(name='foo')] @@ -90,16 +85,13 @@ class TestJobLifeCycle: wj = wfjts[0].create_unified_job() wj.signal_start() - tm = TaskManager() - while wfjts[0].status != 'successful': - wfjts[1].refresh_from_db() - if wfjts[1].status == 'successful': - # final run, no more work to do - self.run_tm(tm, expect_schedule=[]) - else: - self.run_tm(tm, expect_schedule=[mock.call()]) + attempts = 10 + while wfjts[0].status != 'successful' and attempts > 0: + self.run_tm(TaskManager()) + self.run_tm(WorkflowManager()) wfjts[0].refresh_from_db() + attempts -= 1 def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance): assert Instance.objects.count() == 2 @@ -113,6 +105,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() tm = TaskManager() self.run_tm(tm) @@ -135,6 +128,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting tm = TaskManager() self.run_tm(tm) @@ -157,6 +151,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting tm = TaskManager() self.run_tm(tm) @@ -197,63 +192,49 @@ class TestJobLifeCycle: @pytest.mark.django_db -def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker): - instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory( - 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] - ) - j1 = objects.jobs["job_should_start"] - j1.status = 'pending' +def test_single_jt_multi_job_launch_blocks_last(job_template_factory): + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + j1 = create_job(objects.job_template) + j2 = create_job(objects.job_template) + + TaskManager().schedule() + j1.refresh_from_db() + j2.refresh_from_db() + assert j1.status == "waiting" + assert j2.status == "pending" + + # mimic running j1 to unblock j2 + j1.status = "successful" j1.save() - j2 = objects.jobs["job_should_not_start"] - j2.status = 'pending' - j2.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) - j1.status = "successful" - j1.save() - with mocker.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance) + TaskManager().schedule() + + j2.refresh_from_db() + assert j2.status == "waiting" @pytest.mark.django_db -def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker): - instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory( - 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] - ) +def test_single_jt_multi_job_launch_allow_simul_allowed(job_template_factory): + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') jt = objects.job_template + jt.allow_simultaneous = True jt.save() - - j1 = objects.jobs["job_should_start"] - j1.allow_simultaneous = True - j1.status = 'pending' - j1.save() - j2 = objects.jobs["job_should_not_start"] - j2.allow_simultaneous = True - j2.status = 'pending' - j2.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_has_calls( - [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)] - ) + j1 = create_job(objects.job_template) + j2 = create_job(objects.job_template) + TaskManager().schedule() + j1.refresh_from_db() + j2.refresh_from_db() + assert j1.status == "waiting" + assert j2.status == "waiting" @pytest.mark.django_db def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker): instance = hybrid_instance controlplane_instance_group = instance.rampart_groups.first() - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) - objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"]) - j1 = objects1.jobs["job_should_start"] - j1.status = 'pending' - j1.save() - j2 = objects2.jobs["job_should_not_start"] - j2.status = 'pending' - j2.save() + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') + objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2') + j1 = create_job(objects1.job_template) + j2 = create_job(objects2.job_template) tm = TaskManager() with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 505 @@ -269,11 +250,9 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke @pytest.mark.django_db def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) p = objects.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 0 @@ -281,12 +260,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job p.scm_url = "http://github.com/ansible/ansible.git" p.save(skip_update=True) with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: + dm.schedule() mock_pu.assert_called_once_with(j) pu = [x for x in p.project_updates.all()] assert len(pu) == 1 + TaskManager().schedule() TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance) pu[0].status = "successful" pu[0].save() @@ -297,11 +277,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job @pytest.mark.django_db def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) i = objects.inventory ii = inventory_source_factory("ec2") ii.source = "ec2" @@ -310,12 +288,13 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g ii.save() i.inventory_sources.add(ii) with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: + dm.schedule() mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1 + TaskManager().schedule() TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance) iu[0].status = "successful" iu[0].save() @@ -334,19 +313,17 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s iu.status = "pending" iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: + dm.schedule() mock_pu.assert_called_with(iu, project_id=project.id) @pytest.mark.django_db def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) i = objects.inventory ii = inventory_source_factory("ec2") ii.source = "ec2" @@ -359,9 +336,9 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te j.start_args = encrypt_field(j, field_name="start_args") j.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: + dm.schedule() mock_iu.assert_not_called() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() @@ -371,13 +348,11 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te @pytest.mark.django_db def test_shared_dependencies_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["first_job", "second_job"]) - j1 = objects.jobs["first_job"] - j1.status = 'pending' - j1.save() - j2 = objects.jobs["second_job"] - j2.status = 'pending' - j2.save() + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + objects.job_template.allow_simultaneous = True + objects.job_template.save() + j1 = create_job(objects.job_template, dependencies_processed=False) + j2 = create_job(objects.job_template, dependencies_processed=False) p = objects.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 300 @@ -392,8 +367,8 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa ii.update_cache_timeout = 300 ii.save() i.inventory_sources.add(ii) - with mock.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() @@ -408,12 +383,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) - j1.status = "successful" - j1.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance) + TaskManager.start_task.assert_has_calls( + [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)] + ) pu = [x for x in p.project_updates.all()] iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 @@ -422,30 +394,27 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa @pytest.mark.django_db def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) - job = objects.jobs["job"] + instance = controlplane_instance_group.instances.all()[0] + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + job = objects.job_template.create_unified_job() job.instance_group = controlplane_instance_group + job.dependencies_process = True job.status = "running" job.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - task_manager = TaskManager() - task_manager._schedule() - proj = objects.project project_update = proj.create_project_update() project_update.instance_group = controlplane_instance_group project_update.status = "pending" project_update.save() - assert not task_manager.job_blocked_by(project_update) - - dependency_graph = DependencyGraph() - dependency_graph.add_job(job) - assert not dependency_graph.task_blocked_by(project_update) + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, [], instance) @pytest.mark.django_db def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory): + instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) job = objects.jobs["job"] job.instance_group = controlplane_instance_group @@ -453,9 +422,6 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp job.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - task_manager = TaskManager() - task_manager._schedule() - inv = objects.inventory inv_source = inventory_source_factory("ec2") inv_source.source = "ec2" @@ -465,11 +431,9 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp inventory_update.status = "pending" inventory_update.save() - assert not task_manager.job_blocked_by(inventory_update) - - dependency_graph = DependencyGraph() - dependency_graph.add_job(job) - assert not dependency_graph.task_blocked_by(inventory_update) + DependencyManager().schedule() + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, [], instance) @pytest.mark.django_db @@ -484,7 +448,7 @@ def test_generate_dependencies_only_once(job_template_factory): # job starts with dependencies_processed as False assert not job.dependencies_processed # run one cycle of ._schedule() to generate dependencies - TaskManager()._schedule() + DependencyManager().schedule() # make sure dependencies_processed is now True job = Job.objects.filter(name="job_gen_dep")[0] @@ -492,7 +456,7 @@ def test_generate_dependencies_only_once(job_template_factory): # Run ._schedule() again, but make sure .generate_dependencies() is not # called with job in the argument list - tm = TaskManager() - tm.generate_dependencies = mock.MagicMock(return_value=[]) - tm._schedule() - tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])]) + dm = DependencyManager() + dm.generate_dependencies = mock.MagicMock(return_value=[]) + dm.schedule() + dm.generate_dependencies.assert_not_called() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b3..29ebff1178 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -78,8 +78,9 @@ __all__ = [ 'IllegalArgumentError', 'get_custom_venv_choices', 'get_external_account', - 'task_manager_bulk_reschedule', - 'schedule_task_manager', + 'ScheduleTaskManager', + 'ScheduleDependencyManager', + 'ScheduleWorkflowManager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout', @@ -846,6 +847,66 @@ def get_mem_effective_capacity(mem_bytes): _inventory_updates = threading.local() _task_manager = threading.local() +_dependency_manager = threading.local() +_workflow_manager = threading.local() + + +@contextlib.contextmanager +def task_manager_bulk_reschedule(): + managers = [ScheduleTaskManager(), ScheduleWorkflowManager(), ScheduleDependencyManager()] + """Context manager to avoid submitting task multiple times.""" + try: + for m in managers: + m.previous_flag = getattr(m.manager_threading_local, 'bulk_reschedule', False) + m.previous_value = getattr(m.manager_threading_local, 'needs_scheduling', False) + m.manager_threading_local.bulk_reschedule = True + m.manager_threading_local.needs_scheduling = False + yield + finally: + for m in managers: + m.manager_threading_local.bulk_reschedule = m.previous_flag + if m.manager_threading_local.needs_scheduling: + m.schedule() + m.manager_threading_local.needs_scheduling = m.previous_value + + +class ScheduleManager: + def __init__(self, manager, manager_threading_local): + self.manager = manager + self.manager_threading_local = manager_threading_local + + def _schedule(self): + from django.db import connection + + # runs right away if not in transaction + connection.on_commit(lambda: self.manager.delay()) + + def schedule(self): + if getattr(self.manager_threading_local, 'bulk_reschedule', False): + self.manager_threading_local.needs_scheduling = True + return + self._schedule() + + +class ScheduleTaskManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import task_manager + + super().__init__(task_manager, _task_manager) + + +class ScheduleDependencyManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import dependency_manager + + super().__init__(dependency_manager, _dependency_manager) + + +class ScheduleWorkflowManager(ScheduleManager): + def __init__(self): + from awx.main.scheduler.tasks import workflow_manager + + super().__init__(workflow_manager, _workflow_manager) @contextlib.contextmanager @@ -861,37 +922,6 @@ def ignore_inventory_computed_fields(): _inventory_updates.is_updating = previous_value -def _schedule_task_manager(): - from awx.main.scheduler.tasks import run_task_manager - from django.db import connection - - # runs right away if not in transaction - connection.on_commit(lambda: run_task_manager.delay()) - - -@contextlib.contextmanager -def task_manager_bulk_reschedule(): - """Context manager to avoid submitting task multiple times.""" - try: - previous_flag = getattr(_task_manager, 'bulk_reschedule', False) - previous_value = getattr(_task_manager, 'needs_scheduling', False) - _task_manager.bulk_reschedule = True - _task_manager.needs_scheduling = False - yield - finally: - _task_manager.bulk_reschedule = previous_flag - if _task_manager.needs_scheduling: - _schedule_task_manager() - _task_manager.needs_scheduling = previous_value - - -def schedule_task_manager(): - if getattr(_task_manager, 'bulk_reschedule', False): - _task_manager.needs_scheduling = True - return - _schedule_task_manager() - - @contextlib.contextmanager def ignore_inventory_group_removal(): """ diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e5151..2751534cfb 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -248,6 +248,11 @@ SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL = 15 # The maximum allowed jobs to start on a given task manager cycle START_TASK_LIMIT = 100 +# Time out task managers if they take longer than this many seconds, plus TASK_MANAGER_TIMEOUT_GRACE_PERIOD +# We have the grace period so the task manager can bail out before the timeout. +TASK_MANAGER_TIMEOUT = 300 +TASK_MANAGER_TIMEOUT_GRACE_PERIOD = 60 + # Disallow sending session cookies over insecure connections SESSION_COOKIE_SECURE = True @@ -442,7 +447,8 @@ CELERYBEAT_SCHEDULE = { 'options': {'expires': 50}, }, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, - 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, + 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, diff --git a/awx/settings/development.py b/awx/settings/development.py index be1c115606..ee500dae7c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -78,18 +78,6 @@ include(optional('/etc/tower/conf.d/*.py'), scope=locals()) BASE_VENV_PATH = "/var/lib/awx/venv/" AWX_VENV_PATH = os.path.join(BASE_VENV_PATH, "awx") -# If any local_*.py files are present in awx/settings/, use them to override -# default settings for development. If not present, we can still run using -# only the defaults. -try: - if os.getenv('AWX_KUBE_DEVEL', False): - include(optional('minikube.py'), scope=locals()) - else: - include(optional('local_*.py'), scope=locals()) -except ImportError: - traceback.print_exc() - sys.exit(1) - # Use SQLite for unit tests instead of PostgreSQL. If the lines below are # commented out, Django will create the test_awx-dev database in PostgreSQL to # run unit tests. @@ -110,5 +98,26 @@ CLUSTER_HOST_ID = socket.gethostname() AWX_CALLBACK_PROFILE = True +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= +# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). +# Allows user to trigger task managers directly for debugging and profiling purposes. +# Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development' +AWX_DISABLE_TASK_MANAGERS = False +# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= + if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa + + +# If any local_*.py files are present in awx/settings/, use them to override +# default settings for development. If not present, we can still run using +# only the defaults. +# this needs to stay at the bottom of this file +try: + if os.getenv('AWX_KUBE_DEVEL', False): + include(optional('minikube.py'), scope=locals()) + else: + include(optional('local_*.py'), scope=locals()) +except ImportError: + traceback.print_exc() + sys.exit(1)