diff --git a/awx/main/migrations/0165_task_manager_refactor.py b/awx/main/migrations/0165_task_manager_refactor.py index a8e51e6e2f..f2dd13512d 100644 --- a/awx/main/migrations/0165_task_manager_refactor.py +++ b/awx/main/migrations/0165_task_manager_refactor.py @@ -27,4 +27,9 @@ class Migration(migrations.Migration): blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True ), ), + migrations.AddField( + model_name='unifiedjob', + name='task_impact', + field=models.PositiveIntegerField(default=0, editable=False), + ), ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index f45a03e0be..7543162080 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -181,12 +181,12 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): def get_passwords_needed_to_start(self): return self.passwords_needed_to_start - @property - def task_impact(self): + def _get_task_impact(self): # NOTE: We sorta have to assume the host count matches and that forks default to 5 - from awx.main.models.inventory import Host - - count_hosts = Host.objects.filter(enabled=True, inventory__ad_hoc_commands__pk=self.pk).count() + if self.inventory: + count_hosts = self.inventory.total_hosts + else: + count_hosts = 5 return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 def copy(self): @@ -210,10 +210,20 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): def save(self, *args, **kwargs): update_fields = kwargs.get('update_fields', []) + + def add_to_update_fields(name): + if name not in update_fields: + update_fields.append(name) + + if not self.preferred_instance_groups_cache: + self.preferred_instance_groups_cache = self._get_preferred_instance_group_cache() + add_to_update_fields("preferred_instance_groups_cache") if not self.name: self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512) - if 'name' not in update_fields: - update_fields.append('name') + add_to_update_fields("name") + if self.task_impact == 0: + self.task_impact = self._get_task_impact() + add_to_update_fields("task_impact") super(AdHocCommand, self).save(*args, **kwargs) @property diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 782ca59344..3a6b7740a2 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -12,6 +12,7 @@ from django.dispatch import receiver from django.utils.translation import gettext_lazy as _ from django.conf import settings from django.utils.timezone import now, timedelta +from django.db.models import Sum import redis from solo.models import SingletonModel @@ -149,10 +150,13 @@ class Instance(HasPolicyEditsMixin, BaseModel): def consumed_capacity(self): capacity_consumed = 0 if self.node_type in ('hybrid', 'execution'): - capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + capacity_consumed += ( + UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).aggregate(Sum("task_impact"))["task_impact__sum"] + or 0 + ) if self.node_type in ('hybrid', 'control'): - capacity_consumed += sum( - settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')) + capacity_consumed += ( + settings.AWX_CONTROL_NODE_TASK_IMPACT * UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')).count() ) return capacity_consumed diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 9a386db9c2..26586a2e6d 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -337,9 +337,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): else: active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES) failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True) + total_hosts = active_hosts.count() + if total_hosts != self.total_hosts: + update_task_impact = True + else: + update_task_impact = False computed_fields = { 'has_active_failures': bool(failed_hosts.count()), - 'total_hosts': active_hosts.count(), + 'total_hosts': total_hosts, 'hosts_with_active_failures': failed_hosts.count(), 'total_groups': active_groups.count(), 'has_inventory_sources': bool(active_inventory_sources.count()), @@ -357,6 +362,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): computed_fields.pop(field) if computed_fields: iobj.save(update_fields=computed_fields.keys()) + if update_task_impact: + # if total hosts count has changed, re-calculate task_impact for any + # job that is still in pending for this inventory, since task_impact + # is cached on task creation and used in task management system + tasks = self.jobs.filter(status="pending") + for t in tasks: + t.task_impact = t._get_task_impact() + UnifiedJob.objects.bulk_update(tasks, ['task_impact']) logger.debug("Finished updating inventory computed fields, pk={0}, in " "{1:.3f} seconds".format(self.pk, time.time() - start_time)) def websocket_emit_status(self, status): @@ -1220,8 +1233,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, return UnpartitionedInventoryUpdateEvent return InventoryUpdateEvent - @property - def task_impact(self): + def _get_task_impact(self): return 1 # InventoryUpdate credential required diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 622cfb009c..fa313dfc23 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -644,8 +644,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana raise ParseError(_('{status_value} is not a valid status option.').format(status_value=status)) return self._get_hosts(**kwargs) - @property - def task_impact(self): + def _get_task_impact(self): if self.launch_type == 'callback': count_hosts = 2 else: @@ -1241,8 +1240,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): return UnpartitionedSystemJobEvent return SystemJobEvent - @property - def task_impact(self): + def _get_task_impact(self): return 5 @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index f2163c69ae..5b8fabde97 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -563,8 +563,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage return UnpartitionedProjectUpdateEvent return ProjectUpdateEvent - @property - def task_impact(self): + def _get_task_impact(self): return 0 if self.job_type == 'run' else 1 @property diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 30685d9371..596eb8a4e0 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -46,6 +46,7 @@ from awx.main.utils.common import ( parse_yaml_or_json, getattr_dne, ScheduleDependencyManager, + ScheduleTaskManager, get_event_partition_epoch, get_capacity_type, ) @@ -381,9 +382,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn unified_job.survey_passwords = new_job_passwords kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch - unified_job.preferred_instance_groups_cache = [ig.pk for ig in unified_job.preferred_instance_groups] + unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache() unified_job._set_default_dependencies_processed() + unified_job.task_impact = unified_job._get_task_impact() from awx.main.signals import disable_activity_stream, activity_stream_create @@ -704,6 +706,10 @@ class UnifiedJob( editable=False, help_text=_("A cached list with pk values from preferred instance groups."), ) + task_impact = models.PositiveIntegerField( + default=0, + editable=False, + ) organization = models.ForeignKey( 'Organization', blank=True, @@ -765,6 +771,9 @@ class UnifiedJob( def _get_parent_field_name(self): return 'unified_job_template' # Override in subclasses. + def _get_preferred_instance_group_cache(self): + return [ig.pk for ig in self.preferred_instance_groups] + @classmethod def _get_unified_job_template_class(cls): """ @@ -1254,9 +1263,8 @@ class UnifiedJob( except JobLaunchConfig.DoesNotExist: return False - @property - def task_impact(self): - raise NotImplementedError # Implement in subclass. + def _get_task_impact(self): + return self.task_impact # return default, should implement in subclass. def websocket_emit_data(self): '''Return extra data that should be included when submitting data to the browser over the websocket connection''' @@ -1371,7 +1379,10 @@ class UnifiedJob( self.update_fields(start_args=json.dumps(kwargs), status='pending') self.websocket_emit_status("pending") - ScheduleDependencyManager().schedule() + if self.dependencies_processed: + ScheduleTaskManager().schedule() + else: + ScheduleDependencyManager().schedule() # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 6d59670a27..c9301f769a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -672,8 +672,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio ) return result - @property - def task_impact(self): + def _get_task_impact(self): return 0 def get_ancestor_workflows(self): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2ef89a74bc..bac8379c8e 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -37,10 +37,9 @@ from awx.main.utils.pglock import advisory_lock from awx.main.utils import ( get_type_for_model, ScheduleTaskManager, - ScheduleDependencyManager, ScheduleWorkflowManager, ) -from awx.main.utils.common import create_partition +from awx.main.utils.common import create_partition, task_manager_bulk_reschedule from awx.main.signals import disable_activity_stream from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph @@ -121,23 +120,22 @@ class TaskBase: def schedule(self): # Lock - with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: - with transaction.atomic(): - if acquired is False: - logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") - return - logger.debug(f"Starting {self.prefix} Scheduler") - with self.schedule_manager.task_manager_bulk_reschedule(): + with task_manager_bulk_reschedule(): + with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: + with transaction.atomic(): + if acquired is False: + logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") + return + logger.debug(f"Starting {self.prefix} Scheduler") # if sigterm due to timeout, still record metrics signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) self._schedule() self.record_aggregate_metrics() - logger.debug(f"Finishing {self.prefix} Scheduler") + logger.debug(f"Finishing {self.prefix} Scheduler") class WorkflowManager(TaskBase): def __init__(self): - self.schedule_manager = ScheduleWorkflowManager() super().__init__(prefix="workflow_manager") @timeit @@ -146,7 +144,7 @@ class WorkflowManager(TaskBase): for workflow_job in self.all_tasks: if self.timed_out(): logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") - self.schedule_manager.schedule() + ScheduleWorkflowManager().schedule() # Do not process any more workflow jobs. Stop here. # Maybe we should schedule another WorkflowManager run break @@ -263,7 +261,6 @@ class WorkflowManager(TaskBase): class DependencyManager(TaskBase): def __init__(self): - self.schedule_manager = ScheduleDependencyManager() super().__init__(prefix="dependency_manager") def create_project_update(self, task, project_id=None): @@ -432,8 +429,9 @@ class DependencyManager(TaskBase): return created_dependencies def process_tasks(self): - self.generate_dependencies(self.all_tasks) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks)) + deps = self.generate_dependencies(self.all_tasks) + self.generate_dependencies(deps) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps)) @timeit def _schedule(self): @@ -462,7 +460,6 @@ class TaskManager(TaskBase): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.time_delta_job_explanation = timedelta(seconds=30) - self.schedule_manager = ScheduleTaskManager() super().__init__(prefix="task_manager") def after_lock_init(self): @@ -575,6 +572,8 @@ class TaskManager(TaskBase): @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: + if type(task) is WorkflowJob: + ScheduleWorkflowManager().schedule() self.dependency_graph.add_job(task) @timeit diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 30e1fdddda..697df497d1 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -54,6 +54,7 @@ from awx.main.utils.common import ( ignore_inventory_computed_fields, ignore_inventory_group_removal, ScheduleWorkflowManager, + ScheduleTaskManager, ) from awx.main.utils.external_logging import reconfigure_rsyslog @@ -657,6 +658,13 @@ def awx_periodic_scheduler(): state.save() +def schedule_manager_success_or_error(instance): + if instance.unifiedjob_blocked_jobs.exists(): + ScheduleTaskManager().schedule() + if instance.spawned_by_workflow: + ScheduleWorkflowManager().schedule() + + @task(queue=get_local_queuename) def handle_work_success(task_actual): try: @@ -666,8 +674,7 @@ def handle_work_success(task_actual): return if not instance: return - - ScheduleWorkflowManager().schedule() + schedule_manager_success_or_error(instance) @task(queue=get_local_queuename) @@ -709,8 +716,7 @@ def handle_work_error(task_id, *args, **kwargs): # what the job complete message handler does then we may want to send a # completion event for each job here. if first_instance: - ScheduleWorkflowManager().schedule() - pass + schedule_manager_success_or_error(first_instance) @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 4d17d09440..389ea731b9 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -252,12 +252,14 @@ class TestTaskImpact: def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away): job = job_host_limit(5, 2) job.inventory.update_computed_fields() + job.task_impact = job._get_task_impact() assert job.inventory.total_hosts == 5 assert job.task_impact == 2 + 1 # forks becomes constraint def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away): job = job_host_limit(3, 5) job.inventory.update_computed_fields() + job.task_impact = job._get_task_impact() assert job.task_impact == 3 + 1 # hosts becomes constraint def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away): @@ -270,9 +272,13 @@ class TestTaskImpact: # Even distribution - all jobs run on 1 host assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1] jobs[0].inventory.update_computed_fields() + for j in jobs: + j.task_impact = j._get_task_impact() assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact # Uneven distribution - first job takes the extra host jobs[0].inventory.hosts.create(name='remainder_foo') assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1] jobs[0].inventory.update_computed_fields() + # recalculate task_impact + jobs[0].task_impact = jobs[0]._get_task_impact() assert [job.task_impact for job in jobs] == [3, 2, 2] diff --git a/awx/main/tests/functional/task_management/__init__.py b/awx/main/tests/functional/task_management/__init__.py new file mode 100644 index 0000000000..4bb27988f4 --- /dev/null +++ b/awx/main/tests/functional/task_management/__init__.py @@ -0,0 +1,6 @@ +def create_job(jt, dependencies_processed=True): + job = jt.create_unified_job() + job.status = "pending" + job.dependencies_processed = dependencies_processed + job.save() + return job diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index 728c60f92d..6bed591147 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -1,9 +1,10 @@ import pytest from unittest import mock from datetime import timedelta -from awx.main.scheduler import TaskManager -from awx.main.models import InstanceGroup, WorkflowJob +from awx.main.scheduler import TaskManager, DependencyManager +from awx.main.models import InstanceGroup from awx.main.tasks.system import apply_cluster_membership_policies +from . import create_job @pytest.mark.django_db @@ -12,16 +13,12 @@ def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_gr i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_still_start"]) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig2) - j2 = objects2.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() + j2 = create_job(objects2.job_template) with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 with mocker.patch("awx.main.scheduler.TaskManager.start_task"): @@ -35,23 +32,26 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory( + 'jt1', + organization='org1', + project='proj1', + inventory='inv1', + credential='cred1', + ) objects1.job_template.instance_groups.add(ig1) + j1 = create_job(objects1.job_template, dependencies_processed=False) p = objects1.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 0 p.scm_type = "git" p.scm_url = "http://github.com/ansible/ansible.git" p.save() - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2', jobs=["job_should_still_start"]) + objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig2) - j2 = objects2.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() + j2 = create_job(objects2.job_template, dependencies_processed=False) with mocker.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() pu = p.project_updates.first() TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0]) @@ -59,6 +59,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta pu.status = "successful" pu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() TaskManager.start_task.assert_any_call(j1, ig1, [], i1) @@ -69,7 +70,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta @pytest.mark.django_db def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker): wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template - wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) + wfj = wfjt.create_unified_job() wfj.status = "pending" wfj.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): @@ -85,39 +86,50 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, control i1.capacity = 1020 i1.save() i2 = instance_factory("i2") + i2.capacity = 1020 + i2.save() ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory( - 'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"] - ) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig1) - j1_1 = objects2.jobs['job_should_also_start'] - j1_1.status = 'pending' - j1_1.save() - objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3', jobs=["job_should_still_start"]) + j1_1 = create_job(objects2.job_template) + objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3') objects3.job_template.instance_groups.add(ig2) - j2 = objects3.jobs['job_should_still_start'] - j2.status = 'pending' - j2.save() - objects4 = job_template_factory( - 'jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4', jobs=["job_should_not_start"] - ) + j2 = create_job(objects3.job_template) + objects4 = job_template_factory('jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4') objects4.job_template.instance_groups.add(ig2) - j2_1 = objects4.jobs['job_should_not_start'] - j2_1.status = 'pending' - j2_1.save() - tm = TaskManager() + j2_1 = create_job(objects4.job_template) + with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 - with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: - tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j1_1, ig1, [], i1), mock.call(j2, ig2, [], i2)]) - assert mock_job.call_count == 3 + TaskManager().schedule() + + # all jobs should be able to run, plenty of capacity across both instances + for j in [j1, j1_1, j2, j2_1]: + j.refresh_from_db() + assert j.status == "waiting" + + # reset to pending + for j in [j1, j1_1, j2, j2_1]: + j.status = "pending" + j.save() + + # make i2 can only be able to fit 1 job + i2.capacity = 510 + i2.save() + + TaskManager().schedule() + + for j in [j1, j1_1, j2]: + j.refresh_from_db() + assert j.status == "waiting" + + j2_1.refresh_from_db() + # could not run because i2 is full + assert j2_1.status == "pending" @pytest.mark.django_db @@ -126,19 +138,13 @@ def test_failover_group_run(instance_factory, controlplane_instance_group, mocke i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) ig2 = instance_group_factory("ig2", instances=[i2]) - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') objects1.job_template.instance_groups.add(ig1) - j1 = objects1.jobs['job_should_start'] - j1.status = 'pending' - j1.save() - objects2 = job_template_factory( - 'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"] - ) + j1 = create_job(objects1.job_template) + objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2') objects2.job_template.instance_groups.add(ig1) objects2.job_template.instance_groups.add(ig2) - j1_1 = objects2.jobs['job_should_also_start'] - j1_1.status = 'pending' - j1_1.save() + j1_1 = create_job(objects2.job_template) tm = TaskManager() with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 500 diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index c9194c8b87..4081601918 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -3,21 +3,19 @@ from unittest import mock import json from datetime import timedelta -from awx.main.scheduler import TaskManager -from awx.main.scheduler.dependency_graph import DependencyGraph +from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager from awx.main.utils import encrypt_field from awx.main.models import WorkflowJobTemplate, JobTemplate, Job from awx.main.models.ha import Instance +from . import create_job from django.conf import settings @pytest.mark.django_db def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker): instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + j = create_job(objects.job_template) with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) @@ -32,10 +30,8 @@ class TestJobLifeCycle: expect_commit - list of expected on_commit calls If any of these are None, then the assertion is not made. """ - if expect_schedule and len(expect_schedule) > 1: - raise RuntimeError('Task manager should reschedule itself one time, at most.') with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel: - with mock.patch('awx.main.utils.common._schedule_task_manager') as tm_sch: + with mock.patch('awx.main.utils.common.ScheduleManager._schedule') as tm_sch: # Job are ultimately submitted in on_commit hook, but this will not # actually run, because it waits until outer transaction, which is the test # itself in this case @@ -56,22 +52,21 @@ class TestJobLifeCycle: wj = wfjt.create_unified_job() assert wj.workflow_nodes.count() == 2 wj.signal_start() - tm = TaskManager() # Transitions workflow job to running # needs to re-schedule so it spawns jobs next round - self.run_tm(tm, [mock.call('running')], [mock.call()]) + self.run_tm(TaskManager(), [mock.call('running')]) # Spawns jobs # needs re-schedule to submit jobs next round - self.run_tm(tm, [mock.call('pending'), mock.call('pending')], [mock.call()]) + self.run_tm(WorkflowManager(), [mock.call('pending'), mock.call('pending')]) assert jt.jobs.count() == 2 # task manager spawned jobs # Submits jobs # intermission - jobs will run and reschedule TM when finished - self.run_tm(tm, [mock.call('waiting'), mock.call('waiting')], []) - + self.run_tm(DependencyManager()) # flip dependencies_processed to True + self.run_tm(TaskManager(), [mock.call('waiting'), mock.call('waiting')]) # I am the job runner for job in jt.jobs.all(): job.status = 'successful' @@ -79,7 +74,7 @@ class TestJobLifeCycle: # Finishes workflow # no further action is necessary, so rescheduling should not happen - self.run_tm(tm, [mock.call('successful')], []) + self.run_tm(WorkflowManager(), [mock.call('successful')]) def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group): wfjts = [WorkflowJobTemplate.objects.create(name='foo')] @@ -90,16 +85,13 @@ class TestJobLifeCycle: wj = wfjts[0].create_unified_job() wj.signal_start() - tm = TaskManager() - while wfjts[0].status != 'successful': - wfjts[1].refresh_from_db() - if wfjts[1].status == 'successful': - # final run, no more work to do - self.run_tm(tm, expect_schedule=[]) - else: - self.run_tm(tm, expect_schedule=[mock.call()]) + attempts = 10 + while wfjts[0].status != 'successful' and attempts > 0: + self.run_tm(TaskManager()) + self.run_tm(WorkflowManager()) wfjts[0].refresh_from_db() + attempts -= 1 def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance): assert Instance.objects.count() == 2 @@ -113,6 +105,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() tm = TaskManager() self.run_tm(tm) @@ -135,6 +128,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting tm = TaskManager() self.run_tm(tm) @@ -157,6 +151,7 @@ class TestJobLifeCycle: for uj in all_ujs: uj.signal_start() + DependencyManager().schedule() # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting tm = TaskManager() self.run_tm(tm) @@ -197,63 +192,49 @@ class TestJobLifeCycle: @pytest.mark.django_db -def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker): - instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory( - 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] - ) - j1 = objects.jobs["job_should_start"] - j1.status = 'pending' +def test_single_jt_multi_job_launch_blocks_last(job_template_factory): + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + j1 = create_job(objects.job_template) + j2 = create_job(objects.job_template) + + TaskManager().schedule() + j1.refresh_from_db() + j2.refresh_from_db() + assert j1.status == "waiting" + assert j2.status == "pending" + + # mimic running j1 to unblock j2 + j1.status = "successful" j1.save() - j2 = objects.jobs["job_should_not_start"] - j2.status = 'pending' - j2.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) - j1.status = "successful" - j1.save() - with mocker.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance) + TaskManager().schedule() + + j2.refresh_from_db() + assert j2.status == "waiting" @pytest.mark.django_db -def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker): - instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory( - 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] - ) +def test_single_jt_multi_job_launch_allow_simul_allowed(job_template_factory): + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') jt = objects.job_template + jt.allow_simultaneous = True jt.save() - - j1 = objects.jobs["job_should_start"] - j1.allow_simultaneous = True - j1.status = 'pending' - j1.save() - j2 = objects.jobs["job_should_not_start"] - j2.allow_simultaneous = True - j2.status = 'pending' - j2.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_has_calls( - [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)] - ) + j1 = create_job(objects.job_template) + j2 = create_job(objects.job_template) + TaskManager().schedule() + j1.refresh_from_db() + j2.refresh_from_db() + assert j1.status == "waiting" + assert j2.status == "waiting" @pytest.mark.django_db def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker): instance = hybrid_instance controlplane_instance_group = instance.rampart_groups.first() - objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) - objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"]) - j1 = objects1.jobs["job_should_start"] - j1.status = 'pending' - j1.save() - j2 = objects2.jobs["job_should_not_start"] - j2.status = 'pending' - j2.save() + objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1') + objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2') + j1 = create_job(objects1.job_template) + j2 = create_job(objects2.job_template) tm = TaskManager() with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: mock_task_impact.return_value = 505 @@ -269,11 +250,9 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke @pytest.mark.django_db def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) p = objects.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 0 @@ -281,12 +260,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job p.scm_url = "http://github.com/ansible/ansible.git" p.save(skip_update=True) with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: + dm.schedule() mock_pu.assert_called_once_with(j) pu = [x for x in p.project_updates.all()] assert len(pu) == 1 + TaskManager().schedule() TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance) pu[0].status = "successful" pu[0].save() @@ -297,11 +277,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job @pytest.mark.django_db def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) i = objects.inventory ii = inventory_source_factory("ec2") ii.source = "ec2" @@ -310,12 +288,13 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g ii.save() i.inventory_sources.add(ii) with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: + dm.schedule() mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1 + TaskManager().schedule() TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance) iu[0].status = "successful" iu[0].save() @@ -334,19 +313,17 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s iu.status = "pending" iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: + dm.schedule() mock_pu.assert_called_with(iu, project_id=project.id) @pytest.mark.django_db def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') instance = controlplane_instance_group.instances.all()[0] - j = objects.jobs["job_should_start"] - j.status = 'pending' - j.save() + j = create_job(objects.job_template, dependencies_processed=False) i = objects.inventory ii = inventory_source_factory("ec2") ii.source = "ec2" @@ -359,9 +336,9 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te j.start_args = encrypt_field(j, field_name="start_args") j.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - tm = TaskManager() - with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: - tm.schedule() + dm = DependencyManager() + with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: + dm.schedule() mock_iu.assert_not_called() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() @@ -371,13 +348,11 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te @pytest.mark.django_db def test_shared_dependencies_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): instance = controlplane_instance_group.instances.all()[0] - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["first_job", "second_job"]) - j1 = objects.jobs["first_job"] - j1.status = 'pending' - j1.save() - j2 = objects.jobs["second_job"] - j2.status = 'pending' - j2.save() + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + objects.job_template.allow_simultaneous = True + objects.job_template.save() + j1 = create_job(objects.job_template, dependencies_processed=False) + j2 = create_job(objects.job_template, dependencies_processed=False) p = objects.project p.scm_update_on_launch = True p.scm_update_cache_timeout = 300 @@ -392,8 +367,8 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa ii.update_cache_timeout = 300 ii.save() i.inventory_sources.add(ii) - with mock.patch("awx.main.scheduler.TaskManager.start_task"): + DependencyManager().schedule() TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() @@ -408,12 +383,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) - j1.status = "successful" - j1.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance) + TaskManager.start_task.assert_has_calls( + [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)] + ) pu = [x for x in p.project_updates.all()] iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 @@ -422,30 +394,27 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa @pytest.mark.django_db def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory): - objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) - job = objects.jobs["job"] + instance = controlplane_instance_group.instances.all()[0] + objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred') + job = objects.job_template.create_unified_job() job.instance_group = controlplane_instance_group + job.dependencies_process = True job.status = "running" job.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - task_manager = TaskManager() - task_manager._schedule() - proj = objects.project project_update = proj.create_project_update() project_update.instance_group = controlplane_instance_group project_update.status = "pending" project_update.save() - assert not task_manager.job_blocked_by(project_update) - - dependency_graph = DependencyGraph() - dependency_graph.add_job(job) - assert not dependency_graph.task_blocked_by(project_update) + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, [], instance) @pytest.mark.django_db def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory): + instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) job = objects.jobs["job"] job.instance_group = controlplane_instance_group @@ -453,9 +422,6 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp job.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): - task_manager = TaskManager() - task_manager._schedule() - inv = objects.inventory inv_source = inventory_source_factory("ec2") inv_source.source = "ec2" @@ -465,11 +431,9 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp inventory_update.status = "pending" inventory_update.save() - assert not task_manager.job_blocked_by(inventory_update) - - dependency_graph = DependencyGraph() - dependency_graph.add_job(job) - assert not dependency_graph.task_blocked_by(inventory_update) + DependencyManager().schedule() + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, [], instance) @pytest.mark.django_db @@ -484,7 +448,7 @@ def test_generate_dependencies_only_once(job_template_factory): # job starts with dependencies_processed as False assert not job.dependencies_processed # run one cycle of ._schedule() to generate dependencies - TaskManager()._schedule() + DependencyManager().schedule() # make sure dependencies_processed is now True job = Job.objects.filter(name="job_gen_dep")[0] @@ -492,7 +456,7 @@ def test_generate_dependencies_only_once(job_template_factory): # Run ._schedule() again, but make sure .generate_dependencies() is not # called with job in the argument list - tm = TaskManager() - tm.generate_dependencies = mock.MagicMock(return_value=[]) - tm._schedule() - tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])]) + dm = DependencyManager() + dm.generate_dependencies = mock.MagicMock(return_value=[]) + dm.schedule() + dm.generate_dependencies.assert_not_called() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index c0cf302d66..29ebff1178 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -851,34 +851,41 @@ _dependency_manager = threading.local() _workflow_manager = threading.local() +@contextlib.contextmanager +def task_manager_bulk_reschedule(): + managers = [ScheduleTaskManager(), ScheduleWorkflowManager(), ScheduleDependencyManager()] + """Context manager to avoid submitting task multiple times.""" + try: + for m in managers: + m.previous_flag = getattr(m.manager_threading_local, 'bulk_reschedule', False) + m.previous_value = getattr(m.manager_threading_local, 'needs_scheduling', False) + m.manager_threading_local.bulk_reschedule = True + m.manager_threading_local.needs_scheduling = False + yield + finally: + for m in managers: + m.manager_threading_local.bulk_reschedule = m.previous_flag + if m.manager_threading_local.needs_scheduling: + m.schedule() + m.manager_threading_local.needs_scheduling = m.previous_value + + class ScheduleManager: def __init__(self, manager, manager_threading_local): self.manager = manager self.manager_threading_local = manager_threading_local - def schedule(self): - if getattr(self.manager_threading_local, 'bulk_reschedule', False): - self.manager_threading_local.needs_scheduling = True - return + def _schedule(self): from django.db import connection # runs right away if not in transaction connection.on_commit(lambda: self.manager.delay()) - @contextlib.contextmanager - def task_manager_bulk_reschedule(self): - """Context manager to avoid submitting task multiple times.""" - try: - previous_flag = getattr(self.manager_threading_local, 'bulk_reschedule', False) - previous_value = getattr(self.manager_threading_local, 'needs_scheduling', False) - self.manager_threading_local.bulk_reschedule = True - self.manager_threading_local.needs_scheduling = False - yield - finally: - self.manager_threading_local.bulk_reschedule = previous_flag - if self.manager_threading_local.needs_scheduling: - self.schedule() - self.manager_threading_local.needs_scheduling = previous_value + def schedule(self): + if getattr(self.manager_threading_local, 'bulk_reschedule', False): + self.manager_threading_local.needs_scheduling = True + return + self._schedule() class ScheduleTaskManager(ScheduleManager): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index a2b19a82d8..2751534cfb 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -449,7 +449,6 @@ CELERYBEAT_SCHEDULE = { 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, - 'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},