Cache task_impact

task_impact is now a field on the database
It is calculated and set during create_unified_job

set task_impact on .save for adhoc commands
This commit is contained in:
Seth Foster
2022-07-19 14:02:45 -04:00
parent d06a3f060d
commit e6f8852b05
16 changed files with 279 additions and 248 deletions

View File

@@ -27,4 +27,9 @@ class Migration(migrations.Migration):
blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True
), ),
), ),
migrations.AddField(
model_name='unifiedjob',
name='task_impact',
field=models.PositiveIntegerField(default=0, editable=False),
),
] ]

View File

@@ -181,12 +181,12 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def get_passwords_needed_to_start(self): def get_passwords_needed_to_start(self):
return self.passwords_needed_to_start return self.passwords_needed_to_start
@property def _get_task_impact(self):
def task_impact(self):
# NOTE: We sorta have to assume the host count matches and that forks default to 5 # NOTE: We sorta have to assume the host count matches and that forks default to 5
from awx.main.models.inventory import Host if self.inventory:
count_hosts = self.inventory.total_hosts
count_hosts = Host.objects.filter(enabled=True, inventory__ad_hoc_commands__pk=self.pk).count() else:
count_hosts = 5
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
def copy(self): def copy(self):
@@ -210,10 +210,20 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', []) update_fields = kwargs.get('update_fields', [])
def add_to_update_fields(name):
if name not in update_fields:
update_fields.append(name)
if not self.preferred_instance_groups_cache:
self.preferred_instance_groups_cache = self._get_preferred_instance_group_cache()
add_to_update_fields("preferred_instance_groups_cache")
if not self.name: if not self.name:
self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512) self.name = Truncator(u': '.join(filter(None, (self.module_name, self.module_args)))).chars(512)
if 'name' not in update_fields: add_to_update_fields("name")
update_fields.append('name') if self.task_impact == 0:
self.task_impact = self._get_task_impact()
add_to_update_fields("task_impact")
super(AdHocCommand, self).save(*args, **kwargs) super(AdHocCommand, self).save(*args, **kwargs)
@property @property

View File

@@ -12,6 +12,7 @@ from django.dispatch import receiver
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django.conf import settings from django.conf import settings
from django.utils.timezone import now, timedelta from django.utils.timezone import now, timedelta
from django.db.models import Sum
import redis import redis
from solo.models import SingletonModel from solo.models import SingletonModel
@@ -149,10 +150,13 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def consumed_capacity(self): def consumed_capacity(self):
capacity_consumed = 0 capacity_consumed = 0
if self.node_type in ('hybrid', 'execution'): if self.node_type in ('hybrid', 'execution'):
capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) capacity_consumed += (
UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).aggregate(Sum("task_impact"))["task_impact__sum"]
or 0
)
if self.node_type in ('hybrid', 'control'): if self.node_type in ('hybrid', 'control'):
capacity_consumed += sum( capacity_consumed += (
settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')) settings.AWX_CONTROL_NODE_TASK_IMPACT * UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')).count()
) )
return capacity_consumed return capacity_consumed

View File

@@ -337,9 +337,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
else: else:
active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES) active_inventory_sources = self.inventory_sources.filter(source__in=CLOUD_INVENTORY_SOURCES)
failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True) failed_inventory_sources = active_inventory_sources.filter(last_job_failed=True)
total_hosts = active_hosts.count()
if total_hosts != self.total_hosts:
update_task_impact = True
else:
update_task_impact = False
computed_fields = { computed_fields = {
'has_active_failures': bool(failed_hosts.count()), 'has_active_failures': bool(failed_hosts.count()),
'total_hosts': active_hosts.count(), 'total_hosts': total_hosts,
'hosts_with_active_failures': failed_hosts.count(), 'hosts_with_active_failures': failed_hosts.count(),
'total_groups': active_groups.count(), 'total_groups': active_groups.count(),
'has_inventory_sources': bool(active_inventory_sources.count()), 'has_inventory_sources': bool(active_inventory_sources.count()),
@@ -357,6 +362,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
computed_fields.pop(field) computed_fields.pop(field)
if computed_fields: if computed_fields:
iobj.save(update_fields=computed_fields.keys()) iobj.save(update_fields=computed_fields.keys())
if update_task_impact:
# if total hosts count has changed, re-calculate task_impact for any
# job that is still in pending for this inventory, since task_impact
# is cached on task creation and used in task management system
tasks = self.jobs.filter(status="pending")
for t in tasks:
t.task_impact = t._get_task_impact()
UnifiedJob.objects.bulk_update(tasks, ['task_impact'])
logger.debug("Finished updating inventory computed fields, pk={0}, in " "{1:.3f} seconds".format(self.pk, time.time() - start_time)) logger.debug("Finished updating inventory computed fields, pk={0}, in " "{1:.3f} seconds".format(self.pk, time.time() - start_time))
def websocket_emit_status(self, status): def websocket_emit_status(self, status):
@@ -1220,8 +1233,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
return UnpartitionedInventoryUpdateEvent return UnpartitionedInventoryUpdateEvent
return InventoryUpdateEvent return InventoryUpdateEvent
@property def _get_task_impact(self):
def task_impact(self):
return 1 return 1
# InventoryUpdate credential required # InventoryUpdate credential required

View File

@@ -644,8 +644,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
raise ParseError(_('{status_value} is not a valid status option.').format(status_value=status)) raise ParseError(_('{status_value} is not a valid status option.').format(status_value=status))
return self._get_hosts(**kwargs) return self._get_hosts(**kwargs)
@property def _get_task_impact(self):
def task_impact(self):
if self.launch_type == 'callback': if self.launch_type == 'callback':
count_hosts = 2 count_hosts = 2
else: else:
@@ -1241,8 +1240,7 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
return UnpartitionedSystemJobEvent return UnpartitionedSystemJobEvent
return SystemJobEvent return SystemJobEvent
@property def _get_task_impact(self):
def task_impact(self):
return 5 return 5
@property @property

View File

@@ -563,8 +563,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
return UnpartitionedProjectUpdateEvent return UnpartitionedProjectUpdateEvent
return ProjectUpdateEvent return ProjectUpdateEvent
@property def _get_task_impact(self):
def task_impact(self):
return 0 if self.job_type == 'run' else 1 return 0 if self.job_type == 'run' else 1
@property @property

View File

@@ -46,6 +46,7 @@ from awx.main.utils.common import (
parse_yaml_or_json, parse_yaml_or_json,
getattr_dne, getattr_dne,
ScheduleDependencyManager, ScheduleDependencyManager,
ScheduleTaskManager,
get_event_partition_epoch, get_event_partition_epoch,
get_capacity_type, get_capacity_type,
) )
@@ -381,9 +382,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
unified_job.survey_passwords = new_job_passwords unified_job.survey_passwords = new_job_passwords
kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch
unified_job.preferred_instance_groups_cache = [ig.pk for ig in unified_job.preferred_instance_groups] unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache()
unified_job._set_default_dependencies_processed() unified_job._set_default_dependencies_processed()
unified_job.task_impact = unified_job._get_task_impact()
from awx.main.signals import disable_activity_stream, activity_stream_create from awx.main.signals import disable_activity_stream, activity_stream_create
@@ -704,6 +706,10 @@ class UnifiedJob(
editable=False, editable=False,
help_text=_("A cached list with pk values from preferred instance groups."), help_text=_("A cached list with pk values from preferred instance groups."),
) )
task_impact = models.PositiveIntegerField(
default=0,
editable=False,
)
organization = models.ForeignKey( organization = models.ForeignKey(
'Organization', 'Organization',
blank=True, blank=True,
@@ -765,6 +771,9 @@ class UnifiedJob(
def _get_parent_field_name(self): def _get_parent_field_name(self):
return 'unified_job_template' # Override in subclasses. return 'unified_job_template' # Override in subclasses.
def _get_preferred_instance_group_cache(self):
return [ig.pk for ig in self.preferred_instance_groups]
@classmethod @classmethod
def _get_unified_job_template_class(cls): def _get_unified_job_template_class(cls):
""" """
@@ -1254,9 +1263,8 @@ class UnifiedJob(
except JobLaunchConfig.DoesNotExist: except JobLaunchConfig.DoesNotExist:
return False return False
@property def _get_task_impact(self):
def task_impact(self): return self.task_impact # return default, should implement in subclass.
raise NotImplementedError # Implement in subclass.
def websocket_emit_data(self): def websocket_emit_data(self):
'''Return extra data that should be included when submitting data to the browser over the websocket connection''' '''Return extra data that should be included when submitting data to the browser over the websocket connection'''
@@ -1371,7 +1379,10 @@ class UnifiedJob(
self.update_fields(start_args=json.dumps(kwargs), status='pending') self.update_fields(start_args=json.dumps(kwargs), status='pending')
self.websocket_emit_status("pending") self.websocket_emit_status("pending")
ScheduleDependencyManager().schedule() if self.dependencies_processed:
ScheduleTaskManager().schedule()
else:
ScheduleDependencyManager().schedule()
# Each type of unified job has a different Task class; get the # Each type of unified job has a different Task class; get the
# appropirate one. # appropirate one.

View File

@@ -672,8 +672,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
) )
return result return result
@property def _get_task_impact(self):
def task_impact(self):
return 0 return 0
def get_ancestor_workflows(self): def get_ancestor_workflows(self):

View File

@@ -37,10 +37,9 @@ from awx.main.utils.pglock import advisory_lock
from awx.main.utils import ( from awx.main.utils import (
get_type_for_model, get_type_for_model,
ScheduleTaskManager, ScheduleTaskManager,
ScheduleDependencyManager,
ScheduleWorkflowManager, ScheduleWorkflowManager,
) )
from awx.main.utils.common import create_partition from awx.main.utils.common import create_partition, task_manager_bulk_reschedule
from awx.main.signals import disable_activity_stream from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.dependency_graph import DependencyGraph
@@ -121,23 +120,22 @@ class TaskBase:
def schedule(self): def schedule(self):
# Lock # Lock
with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired: with task_manager_bulk_reschedule():
with transaction.atomic(): with advisory_lock(f"{self.prefix}_lock", wait=False) as acquired:
if acquired is False: with transaction.atomic():
logger.debug(f"Not running {self.prefix} scheduler, another task holds lock") if acquired is False:
return logger.debug(f"Not running {self.prefix} scheduler, another task holds lock")
logger.debug(f"Starting {self.prefix} Scheduler") return
with self.schedule_manager.task_manager_bulk_reschedule(): logger.debug(f"Starting {self.prefix} Scheduler")
# if sigterm due to timeout, still record metrics # if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit) signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule() self._schedule()
self.record_aggregate_metrics() self.record_aggregate_metrics()
logger.debug(f"Finishing {self.prefix} Scheduler") logger.debug(f"Finishing {self.prefix} Scheduler")
class WorkflowManager(TaskBase): class WorkflowManager(TaskBase):
def __init__(self): def __init__(self):
self.schedule_manager = ScheduleWorkflowManager()
super().__init__(prefix="workflow_manager") super().__init__(prefix="workflow_manager")
@timeit @timeit
@@ -146,7 +144,7 @@ class WorkflowManager(TaskBase):
for workflow_job in self.all_tasks: for workflow_job in self.all_tasks:
if self.timed_out(): if self.timed_out():
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
self.schedule_manager.schedule() ScheduleWorkflowManager().schedule()
# Do not process any more workflow jobs. Stop here. # Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run # Maybe we should schedule another WorkflowManager run
break break
@@ -263,7 +261,6 @@ class WorkflowManager(TaskBase):
class DependencyManager(TaskBase): class DependencyManager(TaskBase):
def __init__(self): def __init__(self):
self.schedule_manager = ScheduleDependencyManager()
super().__init__(prefix="dependency_manager") super().__init__(prefix="dependency_manager")
def create_project_update(self, task, project_id=None): def create_project_update(self, task, project_id=None):
@@ -432,8 +429,9 @@ class DependencyManager(TaskBase):
return created_dependencies return created_dependencies
def process_tasks(self): def process_tasks(self):
self.generate_dependencies(self.all_tasks) deps = self.generate_dependencies(self.all_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks)) self.generate_dependencies(deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps))
@timeit @timeit
def _schedule(self): def _schedule(self):
@@ -462,7 +460,6 @@ class TaskManager(TaskBase):
# 5 minutes to start pending jobs. If this limit is reached, pending jobs # 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle. # will no longer be started and will be started on the next task manager cycle.
self.time_delta_job_explanation = timedelta(seconds=30) self.time_delta_job_explanation = timedelta(seconds=30)
self.schedule_manager = ScheduleTaskManager()
super().__init__(prefix="task_manager") super().__init__(prefix="task_manager")
def after_lock_init(self): def after_lock_init(self):
@@ -575,6 +572,8 @@ class TaskManager(TaskBase):
@timeit @timeit
def process_running_tasks(self, running_tasks): def process_running_tasks(self, running_tasks):
for task in running_tasks: for task in running_tasks:
if type(task) is WorkflowJob:
ScheduleWorkflowManager().schedule()
self.dependency_graph.add_job(task) self.dependency_graph.add_job(task)
@timeit @timeit

View File

@@ -54,6 +54,7 @@ from awx.main.utils.common import (
ignore_inventory_computed_fields, ignore_inventory_computed_fields,
ignore_inventory_group_removal, ignore_inventory_group_removal,
ScheduleWorkflowManager, ScheduleWorkflowManager,
ScheduleTaskManager,
) )
from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.external_logging import reconfigure_rsyslog
@@ -657,6 +658,13 @@ def awx_periodic_scheduler():
state.save() state.save()
def schedule_manager_success_or_error(instance):
if instance.unifiedjob_blocked_jobs.exists():
ScheduleTaskManager().schedule()
if instance.spawned_by_workflow:
ScheduleWorkflowManager().schedule()
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def handle_work_success(task_actual): def handle_work_success(task_actual):
try: try:
@@ -666,8 +674,7 @@ def handle_work_success(task_actual):
return return
if not instance: if not instance:
return return
schedule_manager_success_or_error(instance)
ScheduleWorkflowManager().schedule()
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
@@ -709,8 +716,7 @@ def handle_work_error(task_id, *args, **kwargs):
# what the job complete message handler does then we may want to send a # what the job complete message handler does then we may want to send a
# completion event for each job here. # completion event for each job here.
if first_instance: if first_instance:
ScheduleWorkflowManager().schedule() schedule_manager_success_or_error(first_instance)
pass
@task(queue=get_local_queuename) @task(queue=get_local_queuename)

View File

@@ -252,12 +252,14 @@ class TestTaskImpact:
def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away): def test_limit_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(5, 2) job = job_host_limit(5, 2)
job.inventory.update_computed_fields() job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.inventory.total_hosts == 5 assert job.inventory.total_hosts == 5
assert job.task_impact == 2 + 1 # forks becomes constraint assert job.task_impact == 2 + 1 # forks becomes constraint
def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away): def test_host_task_impact(self, job_host_limit, run_computed_fields_right_away):
job = job_host_limit(3, 5) job = job_host_limit(3, 5)
job.inventory.update_computed_fields() job.inventory.update_computed_fields()
job.task_impact = job._get_task_impact()
assert job.task_impact == 3 + 1 # hosts becomes constraint assert job.task_impact == 3 + 1 # hosts becomes constraint
def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away): def test_shard_task_impact(self, slice_job_factory, run_computed_fields_right_away):
@@ -270,9 +272,13 @@ class TestTaskImpact:
# Even distribution - all jobs run on 1 host # Even distribution - all jobs run on 1 host
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1] assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [1, 1, 1]
jobs[0].inventory.update_computed_fields() jobs[0].inventory.update_computed_fields()
for j in jobs:
j.task_impact = j._get_task_impact()
assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact
# Uneven distribution - first job takes the extra host # Uneven distribution - first job takes the extra host
jobs[0].inventory.hosts.create(name='remainder_foo') jobs[0].inventory.hosts.create(name='remainder_foo')
assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1] assert [len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) for i in range(3)] == [2, 1, 1]
jobs[0].inventory.update_computed_fields() jobs[0].inventory.update_computed_fields()
# recalculate task_impact
jobs[0].task_impact = jobs[0]._get_task_impact()
assert [job.task_impact for job in jobs] == [3, 2, 2] assert [job.task_impact for job in jobs] == [3, 2, 2]

View File

@@ -0,0 +1,6 @@
def create_job(jt, dependencies_processed=True):
job = jt.create_unified_job()
job.status = "pending"
job.dependencies_processed = dependencies_processed
job.save()
return job

View File

@@ -1,9 +1,10 @@
import pytest import pytest
from unittest import mock from unittest import mock
from datetime import timedelta from datetime import timedelta
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager, DependencyManager
from awx.main.models import InstanceGroup, WorkflowJob from awx.main.models import InstanceGroup
from awx.main.tasks.system import apply_cluster_membership_policies from awx.main.tasks.system import apply_cluster_membership_policies
from . import create_job
@pytest.mark.django_db @pytest.mark.django_db
@@ -12,16 +13,12 @@ def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_gr
i2 = instance_factory("i2") i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1]) ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2]) ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1) objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start'] j1 = create_job(objects1.job_template)
j1.status = 'pending' objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2')
j1.save()
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_still_start"])
objects2.job_template.instance_groups.add(ig2) objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start'] j2 = create_job(objects2.job_template)
j2.status = 'pending'
j2.save()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500 mock_task_impact.return_value = 500
with mocker.patch("awx.main.scheduler.TaskManager.start_task"): with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
@@ -35,23 +32,26 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
i2 = instance_factory("i2") i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1]) ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2]) ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects1 = job_template_factory(
'jt1',
organization='org1',
project='proj1',
inventory='inv1',
credential='cred1',
)
objects1.job_template.instance_groups.add(ig1) objects1.job_template.instance_groups.add(ig1)
j1 = create_job(objects1.job_template, dependencies_processed=False)
p = objects1.project p = objects1.project
p.scm_update_on_launch = True p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0 p.scm_update_cache_timeout = 0
p.scm_type = "git" p.scm_type = "git"
p.scm_url = "http://github.com/ansible/ansible.git" p.scm_url = "http://github.com/ansible/ansible.git"
p.save() p.save()
j1 = objects1.jobs['job_should_start'] objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2')
j1.status = 'pending'
j1.save()
objects2 = job_template_factory('jt2', organization=objects1.organization, project=p, inventory='inv2', credential='cred2', jobs=["job_should_still_start"])
objects2.job_template.instance_groups.add(ig2) objects2.job_template.instance_groups.add(ig2)
j2 = objects2.jobs['job_should_still_start'] j2 = create_job(objects2.job_template, dependencies_processed=False)
j2.status = 'pending'
j2.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"): with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule() TaskManager().schedule()
pu = p.project_updates.first() pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0]) TaskManager.start_task.assert_called_once_with(pu, controlplane_instance_group, [j1, j2], controlplane_instance_group.instances.all()[0])
@@ -59,6 +59,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
pu.status = "successful" pu.status = "successful"
pu.save() pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_any_call(j1, ig1, [], i1) TaskManager.start_task.assert_any_call(j1, ig1, [], i1)
@@ -69,7 +70,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
@pytest.mark.django_db @pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker): def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) wfj = wfjt.create_unified_job()
wfj.status = "pending" wfj.status = "pending"
wfj.save() wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"): with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
@@ -85,39 +86,50 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, control
i1.capacity = 1020 i1.capacity = 1020
i1.save() i1.save()
i2 = instance_factory("i2") i2 = instance_factory("i2")
i2.capacity = 1020
i2.save()
ig1 = instance_group_factory("ig1", instances=[i1]) ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2]) ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1) objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start'] j1 = create_job(objects1.job_template)
j1.status = 'pending' objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2')
j1.save()
objects2 = job_template_factory(
'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"]
)
objects2.job_template.instance_groups.add(ig1) objects2.job_template.instance_groups.add(ig1)
j1_1 = objects2.jobs['job_should_also_start'] j1_1 = create_job(objects2.job_template)
j1_1.status = 'pending' objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3')
j1_1.save()
objects3 = job_template_factory('jt3', organization='org2', project='proj3', inventory='inv3', credential='cred3', jobs=["job_should_still_start"])
objects3.job_template.instance_groups.add(ig2) objects3.job_template.instance_groups.add(ig2)
j2 = objects3.jobs['job_should_still_start'] j2 = create_job(objects3.job_template)
j2.status = 'pending' objects4 = job_template_factory('jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4')
j2.save()
objects4 = job_template_factory(
'jt4', organization=objects3.organization, project='proj4', inventory='inv4', credential='cred4', jobs=["job_should_not_start"]
)
objects4.job_template.instance_groups.add(ig2) objects4.job_template.instance_groups.add(ig2)
j2_1 = objects4.jobs['job_should_not_start'] j2_1 = create_job(objects4.job_template)
j2_1.status = 'pending'
j2_1.save()
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500 mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: TaskManager().schedule()
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j1_1, ig1, [], i1), mock.call(j2, ig2, [], i2)]) # all jobs should be able to run, plenty of capacity across both instances
assert mock_job.call_count == 3 for j in [j1, j1_1, j2, j2_1]:
j.refresh_from_db()
assert j.status == "waiting"
# reset to pending
for j in [j1, j1_1, j2, j2_1]:
j.status = "pending"
j.save()
# make i2 can only be able to fit 1 job
i2.capacity = 510
i2.save()
TaskManager().schedule()
for j in [j1, j1_1, j2]:
j.refresh_from_db()
assert j.status == "waiting"
j2_1.refresh_from_db()
# could not run because i2 is full
assert j2_1.status == "pending"
@pytest.mark.django_db @pytest.mark.django_db
@@ -126,19 +138,13 @@ def test_failover_group_run(instance_factory, controlplane_instance_group, mocke
i2 = instance_factory("i2") i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1]) ig1 = instance_group_factory("ig1", instances=[i1])
ig2 = instance_group_factory("ig2", instances=[i2]) ig2 = instance_group_factory("ig2", instances=[i2])
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects1.job_template.instance_groups.add(ig1) objects1.job_template.instance_groups.add(ig1)
j1 = objects1.jobs['job_should_start'] j1 = create_job(objects1.job_template)
j1.status = 'pending' objects2 = job_template_factory('jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2')
j1.save()
objects2 = job_template_factory(
'jt2', organization=objects1.organization, project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_start", "job_should_also_start"]
)
objects2.job_template.instance_groups.add(ig1) objects2.job_template.instance_groups.add(ig1)
objects2.job_template.instance_groups.add(ig2) objects2.job_template.instance_groups.add(ig2)
j1_1 = objects2.jobs['job_should_also_start'] j1_1 = create_job(objects2.job_template)
j1_1.status = 'pending'
j1_1.save()
tm = TaskManager() tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500 mock_task_impact.return_value = 500

View File

@@ -3,21 +3,19 @@ from unittest import mock
import json import json
from datetime import timedelta from datetime import timedelta
from awx.main.scheduler import TaskManager from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.utils import encrypt_field from awx.main.utils import encrypt_field
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job from awx.main.models import WorkflowJobTemplate, JobTemplate, Job
from awx.main.models.ha import Instance from awx.main.models.ha import Instance
from . import create_job
from django.conf import settings from django.conf import settings
@pytest.mark.django_db @pytest.mark.django_db
def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker): def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0] instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
j = objects.jobs["job_should_start"] j = create_job(objects.job_template)
j.status = 'pending'
j.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"): with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@@ -32,10 +30,8 @@ class TestJobLifeCycle:
expect_commit - list of expected on_commit calls expect_commit - list of expected on_commit calls
If any of these are None, then the assertion is not made. If any of these are None, then the assertion is not made.
""" """
if expect_schedule and len(expect_schedule) > 1:
raise RuntimeError('Task manager should reschedule itself one time, at most.')
with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel: with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel:
with mock.patch('awx.main.utils.common._schedule_task_manager') as tm_sch: with mock.patch('awx.main.utils.common.ScheduleManager._schedule') as tm_sch:
# Job are ultimately submitted in on_commit hook, but this will not # Job are ultimately submitted in on_commit hook, but this will not
# actually run, because it waits until outer transaction, which is the test # actually run, because it waits until outer transaction, which is the test
# itself in this case # itself in this case
@@ -56,22 +52,21 @@ class TestJobLifeCycle:
wj = wfjt.create_unified_job() wj = wfjt.create_unified_job()
assert wj.workflow_nodes.count() == 2 assert wj.workflow_nodes.count() == 2
wj.signal_start() wj.signal_start()
tm = TaskManager()
# Transitions workflow job to running # Transitions workflow job to running
# needs to re-schedule so it spawns jobs next round # needs to re-schedule so it spawns jobs next round
self.run_tm(tm, [mock.call('running')], [mock.call()]) self.run_tm(TaskManager(), [mock.call('running')])
# Spawns jobs # Spawns jobs
# needs re-schedule to submit jobs next round # needs re-schedule to submit jobs next round
self.run_tm(tm, [mock.call('pending'), mock.call('pending')], [mock.call()]) self.run_tm(WorkflowManager(), [mock.call('pending'), mock.call('pending')])
assert jt.jobs.count() == 2 # task manager spawned jobs assert jt.jobs.count() == 2 # task manager spawned jobs
# Submits jobs # Submits jobs
# intermission - jobs will run and reschedule TM when finished # intermission - jobs will run and reschedule TM when finished
self.run_tm(tm, [mock.call('waiting'), mock.call('waiting')], []) self.run_tm(DependencyManager()) # flip dependencies_processed to True
self.run_tm(TaskManager(), [mock.call('waiting'), mock.call('waiting')])
# I am the job runner # I am the job runner
for job in jt.jobs.all(): for job in jt.jobs.all():
job.status = 'successful' job.status = 'successful'
@@ -79,7 +74,7 @@ class TestJobLifeCycle:
# Finishes workflow # Finishes workflow
# no further action is necessary, so rescheduling should not happen # no further action is necessary, so rescheduling should not happen
self.run_tm(tm, [mock.call('successful')], []) self.run_tm(WorkflowManager(), [mock.call('successful')])
def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group): def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group):
wfjts = [WorkflowJobTemplate.objects.create(name='foo')] wfjts = [WorkflowJobTemplate.objects.create(name='foo')]
@@ -90,16 +85,13 @@ class TestJobLifeCycle:
wj = wfjts[0].create_unified_job() wj = wfjts[0].create_unified_job()
wj.signal_start() wj.signal_start()
tm = TaskManager()
while wfjts[0].status != 'successful': attempts = 10
wfjts[1].refresh_from_db() while wfjts[0].status != 'successful' and attempts > 0:
if wfjts[1].status == 'successful': self.run_tm(TaskManager())
# final run, no more work to do self.run_tm(WorkflowManager())
self.run_tm(tm, expect_schedule=[])
else:
self.run_tm(tm, expect_schedule=[mock.call()])
wfjts[0].refresh_from_db() wfjts[0].refresh_from_db()
attempts -= 1
def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance): def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance):
assert Instance.objects.count() == 2 assert Instance.objects.count() == 2
@@ -113,6 +105,7 @@ class TestJobLifeCycle:
for uj in all_ujs: for uj in all_ujs:
uj.signal_start() uj.signal_start()
DependencyManager().schedule()
tm = TaskManager() tm = TaskManager()
self.run_tm(tm) self.run_tm(tm)
@@ -135,6 +128,7 @@ class TestJobLifeCycle:
for uj in all_ujs: for uj in all_ujs:
uj.signal_start() uj.signal_start()
DependencyManager().schedule()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager() tm = TaskManager()
self.run_tm(tm) self.run_tm(tm)
@@ -157,6 +151,7 @@ class TestJobLifeCycle:
for uj in all_ujs: for uj in all_ujs:
uj.signal_start() uj.signal_start()
DependencyManager().schedule()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager() tm = TaskManager()
self.run_tm(tm) self.run_tm(tm)
@@ -197,63 +192,49 @@ class TestJobLifeCycle:
@pytest.mark.django_db @pytest.mark.django_db
def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker): def test_single_jt_multi_job_launch_blocks_last(job_template_factory):
instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
objects = job_template_factory( j1 = create_job(objects.job_template)
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] j2 = create_job(objects.job_template)
)
j1 = objects.jobs["job_should_start"] TaskManager().schedule()
j1.status = 'pending' j1.refresh_from_db()
j2.refresh_from_db()
assert j1.status == "waiting"
assert j2.status == "pending"
# mimic running j1 to unblock j2
j1.status = "successful"
j1.save() j1.save()
j2 = objects.jobs["job_should_not_start"] TaskManager().schedule()
j2.status = 'pending'
j2.save() j2.refresh_from_db()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): assert j2.status == "waiting"
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance)
@pytest.mark.django_db @pytest.mark.django_db
def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker): def test_single_jt_multi_job_launch_allow_simul_allowed(job_template_factory):
instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
objects = job_template_factory(
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]
)
jt = objects.job_template jt = objects.job_template
jt.allow_simultaneous = True
jt.save() jt.save()
j1 = create_job(objects.job_template)
j1 = objects.jobs["job_should_start"] j2 = create_job(objects.job_template)
j1.allow_simultaneous = True TaskManager().schedule()
j1.status = 'pending' j1.refresh_from_db()
j1.save() j2.refresh_from_db()
j2 = objects.jobs["job_should_not_start"] assert j1.status == "waiting"
j2.allow_simultaneous = True assert j2.status == "waiting"
j2.status = 'pending'
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls(
[mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
)
@pytest.mark.django_db @pytest.mark.django_db
def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker): def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker):
instance = hybrid_instance instance = hybrid_instance
controlplane_instance_group = instance.rampart_groups.first() controlplane_instance_group = instance.rampart_groups.first()
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1')
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"]) objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2')
j1 = objects1.jobs["job_should_start"] j1 = create_job(objects1.job_template)
j1.status = 'pending' j2 = create_job(objects2.job_template)
j1.save()
j2 = objects2.jobs["job_should_not_start"]
j2.status = 'pending'
j2.save()
tm = TaskManager() tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 505 mock_task_impact.return_value = 505
@@ -269,11 +250,9 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke
@pytest.mark.django_db @pytest.mark.django_db
def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker): def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0] instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"] j = create_job(objects.job_template, dependencies_processed=False)
j.status = 'pending'
j.save()
p = objects.project p = objects.project
p.scm_update_on_launch = True p.scm_update_on_launch = True
p.scm_update_cache_timeout = 0 p.scm_update_cache_timeout = 0
@@ -281,12 +260,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
p.scm_url = "http://github.com/ansible/ansible.git" p.scm_url = "http://github.com/ansible/ansible.git"
p.save(skip_update=True) p.save(skip_update=True)
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager() dm = DependencyManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu:
tm.schedule() dm.schedule()
mock_pu.assert_called_once_with(j) mock_pu.assert_called_once_with(j)
pu = [x for x in p.project_updates.all()] pu = [x for x in p.project_updates.all()]
assert len(pu) == 1 assert len(pu) == 1
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance) TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, [j], instance)
pu[0].status = "successful" pu[0].status = "successful"
pu[0].save() pu[0].save()
@@ -297,11 +277,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
@pytest.mark.django_db @pytest.mark.django_db
def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0] instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"] j = create_job(objects.job_template, dependencies_processed=False)
j.status = 'pending'
j.save()
i = objects.inventory i = objects.inventory
ii = inventory_source_factory("ec2") ii = inventory_source_factory("ec2")
ii.source = "ec2" ii.source = "ec2"
@@ -310,12 +288,13 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
ii.save() ii.save()
i.inventory_sources.add(ii) i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager() dm = DependencyManager()
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu:
tm.schedule() dm.schedule()
mock_iu.assert_called_once_with(j, ii) mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()] iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1 assert len(iu) == 1
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance) TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance)
iu[0].status = "successful" iu[0].status = "successful"
iu[0].save() iu[0].save()
@@ -334,19 +313,17 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s
iu.status = "pending" iu.status = "pending"
iu.save() iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager() dm = DependencyManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu:
tm.schedule() dm.schedule()
mock_pu.assert_called_with(iu, project_id=project.id) mock_pu.assert_called_with(iu, project_id=project.id)
@pytest.mark.django_db @pytest.mark.django_db
def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
instance = controlplane_instance_group.instances.all()[0] instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"] j = create_job(objects.job_template, dependencies_processed=False)
j.status = 'pending'
j.save()
i = objects.inventory i = objects.inventory
ii = inventory_source_factory("ec2") ii = inventory_source_factory("ec2")
ii.source = "ec2" ii.source = "ec2"
@@ -359,9 +336,9 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
j.start_args = encrypt_field(j, field_name="start_args") j.start_args = encrypt_field(j, field_name="start_args")
j.save() j.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager() dm = DependencyManager()
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu: with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu:
tm.schedule() dm.schedule()
mock_iu.assert_not_called() mock_iu.assert_not_called()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
@@ -371,13 +348,11 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
@pytest.mark.django_db @pytest.mark.django_db
def test_shared_dependencies_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): def test_shared_dependencies_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
instance = controlplane_instance_group.instances.all()[0] instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["first_job", "second_job"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
j1 = objects.jobs["first_job"] objects.job_template.allow_simultaneous = True
j1.status = 'pending' objects.job_template.save()
j1.save() j1 = create_job(objects.job_template, dependencies_processed=False)
j2 = objects.jobs["second_job"] j2 = create_job(objects.job_template, dependencies_processed=False)
j2.status = 'pending'
j2.save()
p = objects.project p = objects.project
p.scm_update_on_launch = True p.scm_update_on_launch = True
p.scm_update_cache_timeout = 300 p.scm_update_cache_timeout = 300
@@ -392,8 +367,8 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
ii.update_cache_timeout = 300 ii.update_cache_timeout = 300
ii.save() ii.save()
i.inventory_sources.add(ii) i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
DependencyManager().schedule()
TaskManager().schedule() TaskManager().schedule()
pu = p.project_updates.first() pu = p.project_updates.first()
iu = ii.inventory_updates.first() iu = ii.inventory_updates.first()
@@ -408,12 +383,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
iu.save() iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) TaskManager.start_task.assert_has_calls(
j1.status = "successful" [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
j1.save() )
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance)
pu = [x for x in p.project_updates.all()] pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()] iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1 assert len(pu) == 1
@@ -422,30 +394,27 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
@pytest.mark.django_db @pytest.mark.django_db
def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory): def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) instance = controlplane_instance_group.instances.all()[0]
job = objects.jobs["job"] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')
job = objects.job_template.create_unified_job()
job.instance_group = controlplane_instance_group job.instance_group = controlplane_instance_group
job.dependencies_process = True
job.status = "running" job.status = "running"
job.save() job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
task_manager = TaskManager()
task_manager._schedule()
proj = objects.project proj = objects.project
project_update = proj.create_project_update() project_update = proj.create_project_update()
project_update.instance_group = controlplane_instance_group project_update.instance_group = controlplane_instance_group
project_update.status = "pending" project_update.status = "pending"
project_update.save() project_update.save()
assert not task_manager.job_blocked_by(project_update) TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(project_update, controlplane_instance_group, [], instance)
dependency_graph = DependencyGraph()
dependency_graph.add_job(job)
assert not dependency_graph.task_blocked_by(project_update)
@pytest.mark.django_db @pytest.mark.django_db
def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory): def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"])
job = objects.jobs["job"] job = objects.jobs["job"]
job.instance_group = controlplane_instance_group job.instance_group = controlplane_instance_group
@@ -453,9 +422,6 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp
job.save() job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
task_manager = TaskManager()
task_manager._schedule()
inv = objects.inventory inv = objects.inventory
inv_source = inventory_source_factory("ec2") inv_source = inventory_source_factory("ec2")
inv_source.source = "ec2" inv_source.source = "ec2"
@@ -465,11 +431,9 @@ def test_job_not_blocking_inventory_update(controlplane_instance_group, job_temp
inventory_update.status = "pending" inventory_update.status = "pending"
inventory_update.save() inventory_update.save()
assert not task_manager.job_blocked_by(inventory_update) DependencyManager().schedule()
TaskManager().schedule()
dependency_graph = DependencyGraph() TaskManager.start_task.assert_called_once_with(inventory_update, controlplane_instance_group, [], instance)
dependency_graph.add_job(job)
assert not dependency_graph.task_blocked_by(inventory_update)
@pytest.mark.django_db @pytest.mark.django_db
@@ -484,7 +448,7 @@ def test_generate_dependencies_only_once(job_template_factory):
# job starts with dependencies_processed as False # job starts with dependencies_processed as False
assert not job.dependencies_processed assert not job.dependencies_processed
# run one cycle of ._schedule() to generate dependencies # run one cycle of ._schedule() to generate dependencies
TaskManager()._schedule() DependencyManager().schedule()
# make sure dependencies_processed is now True # make sure dependencies_processed is now True
job = Job.objects.filter(name="job_gen_dep")[0] job = Job.objects.filter(name="job_gen_dep")[0]
@@ -492,7 +456,7 @@ def test_generate_dependencies_only_once(job_template_factory):
# Run ._schedule() again, but make sure .generate_dependencies() is not # Run ._schedule() again, but make sure .generate_dependencies() is not
# called with job in the argument list # called with job in the argument list
tm = TaskManager() dm = DependencyManager()
tm.generate_dependencies = mock.MagicMock(return_value=[]) dm.generate_dependencies = mock.MagicMock(return_value=[])
tm._schedule() dm.schedule()
tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])]) dm.generate_dependencies.assert_not_called()

View File

@@ -851,34 +851,41 @@ _dependency_manager = threading.local()
_workflow_manager = threading.local() _workflow_manager = threading.local()
@contextlib.contextmanager
def task_manager_bulk_reschedule():
managers = [ScheduleTaskManager(), ScheduleWorkflowManager(), ScheduleDependencyManager()]
"""Context manager to avoid submitting task multiple times."""
try:
for m in managers:
m.previous_flag = getattr(m.manager_threading_local, 'bulk_reschedule', False)
m.previous_value = getattr(m.manager_threading_local, 'needs_scheduling', False)
m.manager_threading_local.bulk_reschedule = True
m.manager_threading_local.needs_scheduling = False
yield
finally:
for m in managers:
m.manager_threading_local.bulk_reschedule = m.previous_flag
if m.manager_threading_local.needs_scheduling:
m.schedule()
m.manager_threading_local.needs_scheduling = m.previous_value
class ScheduleManager: class ScheduleManager:
def __init__(self, manager, manager_threading_local): def __init__(self, manager, manager_threading_local):
self.manager = manager self.manager = manager
self.manager_threading_local = manager_threading_local self.manager_threading_local = manager_threading_local
def schedule(self): def _schedule(self):
if getattr(self.manager_threading_local, 'bulk_reschedule', False):
self.manager_threading_local.needs_scheduling = True
return
from django.db import connection from django.db import connection
# runs right away if not in transaction # runs right away if not in transaction
connection.on_commit(lambda: self.manager.delay()) connection.on_commit(lambda: self.manager.delay())
@contextlib.contextmanager def schedule(self):
def task_manager_bulk_reschedule(self): if getattr(self.manager_threading_local, 'bulk_reschedule', False):
"""Context manager to avoid submitting task multiple times.""" self.manager_threading_local.needs_scheduling = True
try: return
previous_flag = getattr(self.manager_threading_local, 'bulk_reschedule', False) self._schedule()
previous_value = getattr(self.manager_threading_local, 'needs_scheduling', False)
self.manager_threading_local.bulk_reschedule = True
self.manager_threading_local.needs_scheduling = False
yield
finally:
self.manager_threading_local.bulk_reschedule = previous_flag
if self.manager_threading_local.needs_scheduling:
self.schedule()
self.manager_threading_local.needs_scheduling = previous_value
class ScheduleTaskManager(ScheduleManager): class ScheduleTaskManager(ScheduleManager):

View File

@@ -449,7 +449,6 @@ CELERYBEAT_SCHEDULE = {
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)}, 'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'workflow_manager': {'task': 'awx.main.scheduler.tasks.workflow_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},