Consume control capacity (#11665)

* Select control node before start task

Consume capacity on control nodes for controlling tasks and consider
remainging capacity on control nodes before selecting them.

This depends on the requirement that control and hybrid nodes should all
be in the instance group named 'controlplane'. Many tests do not satisfy that
requirement. I'll update the tests in another commit.

* update tests to use controlplane

We don't start any tasks if we don't have a controlplane instance group

Due to updates to fixtures, update tests to set node type and capacity
explicitly so they get expected result.

* Fixes for accounting of control capacity consumed

Update method is used to account for currently consumed capacity for
instance groups in the in-memory capacity tracking data structure we initialize in
after_lock_init and then update via calculate_capacity_consumed (both in
task_manager.py)

Also update fit_task_to_instance to consider control impact on instances

Trust that these functions do the right thing looking for a
node with capacity, and cut out redundant check for the whole group's
capacity per Alan's reccomendation.

* Refactor now redundant code

Deal with control type tasks before we loop over the preferred instance
groups, which cuts out the need for some redundant logic.

Also, fix a bug where I was missing assigning the execution node in one case!

* set job explanation on tasks that need capacity

move the job explanation for jobs that need capacity to a function
so we can re-use it in the three places we need it.

* project updates always run on the controlplane

Instance group ordering makes no sense on project updates because they
always need to run on the control plane.

Also, since hybrid nodes should always run the control processes for the
jobs running on them as execution nodes, account for this when looking for a
execution node.

* fix misleading message

the variables and wording were both misleading, fix to be more accurate
description in the two different cases where this log may be emitted.

* use settings correctly

use settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME instead of a hardcoded
name
cache the controlplane_ig object during the after lock init to avoid
an uneccesary query
eliminate mistakenly duplicated AWX_CONTROL_PLANE_TASK_IMPACT and use
only AWX_CONTROL_NODE_TASK_IMPACT

* add test for control capacity consumption

add test to verify that when there are 2 jobs and only capacity for one
that one will move into waiting and the other stays in pending

* add test for hybrid node capacity consumption

assert that the hybrid node is used for both control and execution and
capacity is deducted correctly

* add test for task.capacity_type = control

Test that control type tasks have the right capacity consumed and
get assigned to the right instance group

Also fix lint in the tests

* jobs_running not accurate for control nodes

We can either NOT use "idle instances" for control nodes, or we need
to update the jobs_running property on the Instance model to count
jobs where the node is the controller_node.

I didn't do that because it may be an expensive query, and it would be
hard to make it match with jobs_running on the InstanceGroup which
filters on tasks assigned to the instance group.

This change chooses to stop considering "idle" control nodes an option,
since we can't acurrately identify them.

The way things are without any change, is we are continuing to over consume capacity on control nodes
because this method sees all control nodes as "idle" at the beginning
of the task manager run, and then only counts jobs started in that run
in the in-memory tracking. So jobs which last over a number of task
manager runs build up consuming capacity, which is accurately reported
via Instance.consumed_capacity

* Reduce default task impact for control nodes

This is something we can experiment with as far as what users
want at install time, but start with just 1 for now.

* update capacity docs

Describe usage of the new setting and the concept of control impact.

Co-authored-by: Alan Rominger <arominge@redhat.com>
Co-authored-by: Rebeccah <rhunter@redhat.com>
This commit is contained in:
Elijah DeLee 2022-02-14 10:13:22 -05:00 committed by GitHub
parent 60b6faff19
commit 604cbc1737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 327 additions and 178 deletions

View File

@ -268,13 +268,13 @@ awx-link:
cp -f /tmp/awx.egg-link /var/lib/awx/venv/awx/lib/$(PYTHON)/site-packages/awx.egg-link
TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests awx/sso/tests
PYTEST_ARGS ?= -n auto
# Run all API unit tests.
test:
if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider -n auto $(TEST_DIRS)
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider $(PYTEST_ARGS) $(TEST_DIRS)
cd awxkit && $(VENV_BASE)/awx/bin/tox -re py3
awx-manage check_migrations --dry-run --check -n 'missing_migration_file'

View File

@ -243,7 +243,13 @@ class InstanceGroupManager(models.Manager):
for t in tasks:
# TODO: dock capacity for isolated job management tasks running in queue
impact = t.task_impact
if t.status == 'waiting' or not t.execution_node:
control_groups = []
if t.controller_node:
control_groups = instance_ig_mapping.get(t.controller_node, [])
if not control_groups:
logger.warn(f"No instance group found for {t.controller_node}, capacity consumed may be innaccurate.")
if t.status == 'waiting' or (not t.execution_node and not t.is_container_group_task):
# Subtract capacity from any peer groups that share instances
if not t.instance_group:
impacted_groups = []
@ -260,6 +266,12 @@ class InstanceGroupManager(models.Manager):
graph[group_name][f'consumed_{capacity_type}_capacity'] += impact
if breakdown:
graph[group_name]['committed_capacity'] += impact
for group_name in control_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT
if breakdown:
graph[group_name]['committed_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT
elif t.status == 'running':
# Subtract capacity from all groups that contain the instance
if t.execution_node not in instance_ig_mapping:
@ -271,6 +283,7 @@ class InstanceGroupManager(models.Manager):
impacted_groups = []
else:
impacted_groups = instance_ig_mapping[t.execution_node]
for group_name in impacted_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
@ -279,6 +292,12 @@ class InstanceGroupManager(models.Manager):
graph[group_name][f'consumed_{capacity_type}_capacity'] += impact
if breakdown:
graph[group_name]['running_capacity'] += impact
for group_name in control_groups:
if group_name not in graph:
self.zero_out_group(graph, group_name, breakdown)
graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT
if breakdown:
graph[group_name]['running_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT
else:
logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format)
return graph

View File

@ -145,7 +145,14 @@ class Instance(HasPolicyEditsMixin, BaseModel):
@property
def consumed_capacity(self):
return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')))
capacity_consumed = 0
if self.node_type in ('hybrid', 'execution'):
capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')))
if self.node_type in ('hybrid', 'control'):
capacity_consumed += sum(
settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting'))
)
return capacity_consumed
@property
def remaining_capacity(self):
@ -345,15 +352,21 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
app_label = 'main'
@staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances):
def fit_task_to_most_remaining_capacity_instance(task, instances, impact=None, capacity_type=None, add_hybrid_control_cost=False):
impact = impact if impact else task.task_impact
capacity_type = capacity_type if capacity_type else task.capacity_type
instance_most_capacity = None
most_remaining_capacity = -1
for i in instances:
if i.node_type not in (task.capacity_type, 'hybrid'):
if i.node_type not in (capacity_type, 'hybrid'):
continue
if i.remaining_capacity >= task.task_impact and (
instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity
):
would_be_remaining = i.remaining_capacity - impact
# hybrid nodes _always_ control their own tasks
if add_hybrid_control_cost and i.node_type == 'hybrid':
would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT
if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity):
instance_most_capacity = i
most_remaining_capacity = would_be_remaining
return instance_most_capacity
@staticmethod

View File

@ -613,26 +613,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
def get_notification_friendly_name(self):
return "Project Update"
@property
def preferred_instance_groups(self):
'''
Project updates should pretty much always run on the control plane
however, we are not yet saying no to custom groupings within the control plane
Thus, we return custom groups and then unconditionally add the control plane
'''
if self.organization is not None:
organization_groups = [x for x in self.organization.instance_groups.all()]
else:
organization_groups = []
template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups]
selected_groups = template_groups + organization_groups
controlplane_ig = self.control_plane_instance_group
if controlplane_ig and controlplane_ig[0] and controlplane_ig[0] not in selected_groups:
selected_groups += controlplane_ig
return selected_groups
def save(self, *args, **kwargs):
added_update_fields = []
if not self.job_tags:

View File

@ -70,6 +70,7 @@ class TaskManager:
"""
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances}
self.controlplane_ig = None
instances_partial = [
SimpleNamespace(
@ -86,6 +87,8 @@ class TaskManager:
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = rampart_group
self.graph[rampart_group.name] = dict(
graph=DependencyGraph(),
execution_capacity=0,
@ -283,39 +286,13 @@ class TaskManager:
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
schedule_task_manager()
elif rampart_group.is_container_group:
task.instance_group = rampart_group
if task.capacity_type == 'execution':
# find one real, non-containerized instance with capacity to
# act as the controller for k8s API interaction
try:
task.controller_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("controller_node_chosen")
except IndexError:
logger.warning("No control plane nodes available to run containerized job {}".format(task.log_format))
return
else:
# project updates and system jobs don't *actually* run in pods, so
# just pick *any* non-containerized host and use it as the execution node
task.execution_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("execution_node_chosen")
logger.debug('Submitting containerized {} to queue {}.'.format(task.log_format, task.execution_node))
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = rampart_group
task.execution_node = instance.hostname
task.log_lifecycle("execution_node_chosen")
if instance.node_type == 'execution':
try:
task.controller_node = Instance.choose_online_control_plane_node()
task.log_lifecycle("controller_node_chosen")
except IndexError:
logger.warning("No control plane nodes available to manage {}".format(task.log_format))
return
else:
# control plane nodes will manage jobs locally for performance and resilience
task.controller_node = task.execution_node
task.log_lifecycle("controller_node_chosen")
logger.debug('Submitting job {} to queue {} controlled by {}.'.format(task.log_format, task.execution_node, task.controller_node))
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {rampart_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
@ -323,6 +300,13 @@ class TaskManager:
if rampart_group is not None:
self.consume_capacity(task, rampart_group.name, instance=instance)
if task.controller_node:
self.consume_capacity(
task,
settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME,
instance=self.real_instances[task.controller_node],
impact=settings.AWX_CONTROL_NODE_TASK_IMPACT,
)
def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
@ -497,9 +481,10 @@ class TaskManager:
task.job_explanation = job_explanation
tasks_to_update_job_explanation.append(task)
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
preferred_instance_groups = task.preferred_instance_groups
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
@ -510,9 +495,36 @@ class TaskManager:
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
else:
control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(
task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control'
)
if not control_instance:
self.task_needs_capacity(task, tasks_to_update_job_explanation)
logger.debug(f"Skipping task {task.log_format} in pending, not enough capacity left on controlplane to control new tasks")
continue
task.controller_node = control_instance.hostname
# All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups
if task.capacity_type == 'control':
task.execution_node = control_instance.hostname
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact)
control_instance.jobs_running += 1
self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task)
execution_instance = self.real_instances[control_instance.hostname]
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
continue
for rampart_group in preferred_instance_groups:
if task.capacity_type == 'execution' and rampart_group.is_container_group:
self.graph[rampart_group.name]['graph'].add_job(task)
if rampart_group.is_container_group:
control_instance.jobs_running += 1
self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break
@ -521,28 +533,32 @@ class TaskManager:
if settings.IS_K8S and task.capacity_type == 'execution':
logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name))
continue
remaining_capacity = self.get_remaining_capacity(rampart_group.name, capacity_type=task.capacity_type)
if task.task_impact > 0 and remaining_capacity <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity))
continue
# at this point we know the instance group is NOT a container group
# because if it was, it would have started the task and broke out of the loop.
execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(
task, self.graph[rampart_group.name]['instances']
task, self.graph[rampart_group.name]['instances'], add_hybrid_control_cost=True
) or InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'], capacity_type=task.capacity_type)
if execution_instance or rampart_group.is_container_group:
if not rampart_group.is_container_group:
execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
logger.debug(
"Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity
)
)
if execution_instance:
task.execution_node = execution_instance.hostname
# If our execution instance is a hybrid, prefer to do control tasks there as well.
if execution_instance.node_type == 'hybrid':
control_instance = execution_instance
task.controller_node = execution_instance.hostname
if execution_instance:
execution_instance = self.real_instances[execution_instance.hostname]
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_NODE_TASK_IMPACT)
task.log_lifecycle("controller_node_chosen")
if control_instance != execution_instance:
control_instance.jobs_running += 1
execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
task.log_lifecycle("execution_node_chosen")
logger.debug(
"Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, execution_instance.remaining_capacity
)
)
execution_instance = self.real_instances[execution_instance.hostname]
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
@ -554,18 +570,21 @@ class TaskManager:
)
)
if not found_acceptable_queue:
task.log_lifecycle("needs_capacity")
job_explanation = gettext_noop("This job is not ready to start because there is not enough available capacity.")
if task.job_explanation != job_explanation:
if task.created < (tz_now() - self.time_delta_job_explanation):
# Many launched jobs are immediately blocked, but most blocks will resolve in a few seconds.
# Therefore we should only update the job_explanation after some time has elapsed to
# prevent excessive task saves.
task.job_explanation = job_explanation
tasks_to_update_job_explanation.append(task)
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
self.task_needs_capacity(task, tasks_to_update_job_explanation)
UnifiedJob.objects.bulk_update(tasks_to_update_job_explanation, ['job_explanation'])
def task_needs_capacity(self, task, tasks_to_update_job_explanation):
task.log_lifecycle("needs_capacity")
job_explanation = gettext_noop("This job is not ready to start because there is not enough available capacity.")
if task.job_explanation != job_explanation:
if task.created < (tz_now() - self.time_delta_job_explanation):
# Many launched jobs are immediately blocked, but most blocks will resolve in a few seconds.
# Therefore we should only update the job_explanation after some time has elapsed to
# prevent excessive task saves.
task.job_explanation = job_explanation
tasks_to_update_job_explanation.append(task)
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
def timeout_approval_node(self):
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
now = tz_now()
@ -600,16 +619,17 @@ class TaskManager:
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
def consume_capacity(self, task, instance_group, instance=None):
def consume_capacity(self, task, instance_group, instance=None, impact=None):
impact = impact if impact else task.task_impact
logger.debug(
'{} consumed {} capacity units from {} with prior total of {}'.format(
task.log_format, task.task_impact, instance_group, self.graph[instance_group]['consumed_capacity']
task.log_format, impact, instance_group, self.graph[instance_group]['consumed_capacity']
)
)
self.graph[instance_group]['consumed_capacity'] += task.task_impact
self.graph[instance_group]['consumed_capacity'] += impact
for capacity_type in ('control', 'execution'):
if instance is None or instance.node_type in ('hybrid', capacity_type):
self.graph[instance_group][f'consumed_{capacity_type}_capacity'] += task.task_impact
self.graph[instance_group][f'consumed_{capacity_type}_capacity'] += impact
def get_remaining_capacity(self, instance_group, capacity_type='execution'):
return self.graph[instance_group][f'{capacity_type}_capacity'] - self.graph[instance_group][f'consumed_{capacity_type}_capacity']

View File

@ -15,6 +15,7 @@ from awx.main.tests.factories import (
)
from django.core.cache import cache
from django.conf import settings
def pytest_addoption(parser):
@ -80,13 +81,44 @@ def instance_group_factory():
@pytest.fixture
def default_instance_group(instance_factory, instance_group_factory):
return create_instance_group("default", instances=[create_instance("hostA")])
def controlplane_instance_group(instance_factory, instance_group_factory):
"""There always has to be a controlplane instancegroup and at least one instance in it"""
return create_instance_group(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, create_instance('hybrid-1', node_type='hybrid', capacity=500))
@pytest.fixture
def controlplane_instance_group(instance_factory, instance_group_factory):
return create_instance_group("controlplane", instances=[create_instance("hostA")])
def default_instance_group(instance_factory, instance_group_factory):
return create_instance_group("default", instances=[create_instance("hostA", node_type='execution')])
@pytest.fixture
def control_instance():
'''Control instance in the controlplane automatic IG'''
inst = create_instance('control-1', node_type='control', capacity=500)
return inst
@pytest.fixture
def control_instance_low_capacity():
'''Control instance in the controlplane automatic IG that has low capacity'''
inst = create_instance('control-1', node_type='control', capacity=5)
return inst
@pytest.fixture
def execution_instance():
'''Execution node in the automatic default IG'''
ig = create_instance_group('default')
inst = create_instance('receptor-1', node_type='execution', capacity=500)
ig.instances.add(inst)
return inst
@pytest.fixture
def hybrid_instance():
'''Hybrid node in the default controlplane IG'''
inst = create_instance('hybrid-1', node_type='hybrid', capacity=500)
return inst
@pytest.fixture

View File

@ -28,12 +28,15 @@ from awx.main.models import (
#
def mk_instance(persisted=True, hostname='instance.example.org'):
def mk_instance(persisted=True, hostname='instance.example.org', node_type='hybrid', capacity=100):
if not persisted:
raise RuntimeError('creating an Instance requires persisted=True')
from django.conf import settings
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname)[0]
instance = Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname, node_type=node_type, capacity=capacity)[0]
if node_type in ('control', 'hybrid'):
mk_instance_group(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, instance=instance)
return instance
def mk_instance_group(name='default', instance=None, minimum=0, percentage=0):
@ -52,7 +55,9 @@ def mk_organization(name, description=None, persisted=True):
description = description or '{}-description'.format(name)
org = Organization(name=name, description=description)
if persisted:
mk_instance(persisted)
instances = Instance.objects.all()
if not instances:
mk_instance(persisted)
org.save()
return org

View File

@ -132,8 +132,8 @@ def generate_teams(organization, persisted, **kwargs):
return teams
def create_instance(name, instance_groups=None):
return mk_instance(hostname=name)
def create_instance(name, instance_groups=None, node_type='hybrid', capacity=200):
return mk_instance(hostname=name, node_type=node_type, capacity=capacity)
def create_instance_group(name, instances=None, minimum=0, percentage=0):

View File

@ -127,7 +127,7 @@ class TestApprovalNodes:
]
@pytest.mark.django_db
def test_approval_node_approve(self, post, admin_user, job_template):
def test_approval_node_approve(self, post, admin_user, job_template, controlplane_instance_group):
# This test ensures that a user (with permissions to do so) can APPROVE
# workflow approvals. Also asserts that trying to APPROVE approvals
# that have already been dealt with will throw an error.
@ -152,7 +152,7 @@ class TestApprovalNodes:
post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}), user=admin_user, expect=400)
@pytest.mark.django_db
def test_approval_node_deny(self, post, admin_user, job_template):
def test_approval_node_deny(self, post, admin_user, job_template, controlplane_instance_group):
# This test ensures that a user (with permissions to do so) can DENY
# workflow approvals. Also asserts that trying to DENY approvals
# that have already been dealt with will throw an error.

View File

@ -7,7 +7,7 @@ from awx.main.tasks.system import apply_cluster_membership_policies
@pytest.mark.django_db
def test_multi_group_basic_job_launch(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory):
def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta
@pytest.mark.django_db
def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker):
def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker):
wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
wfj.status = "pending"
@ -79,9 +79,10 @@ def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_in
@pytest.mark.django_db
def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory):
def test_overcapacity_blocking_other_groups_unaffected(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i1.capacity = 1000
# need to account a little extra for controller node capacity impact
i1.capacity = 1020
i1.save()
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])
@ -120,7 +121,7 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default
@pytest.mark.django_db
def test_failover_group_run(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory):
def test_failover_group_run(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
ig1 = instance_group_factory("ig1", instances=[i1])

View File

@ -7,19 +7,20 @@ from awx.main.scheduler import TaskManager
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.utils import encrypt_field
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job
from awx.main.models.ha import Instance, InstanceGroup
from awx.main.models.ha import Instance
from django.conf import settings
@pytest.mark.django_db
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@pytest.mark.django_db
@ -47,7 +48,7 @@ class TestJobLifeCycle:
if expect_commit is not None:
assert mock_commit.mock_calls == expect_commit
def test_task_manager_workflow_rescheduling(self, job_template_factory, inventory, project, default_instance_group):
def test_task_manager_workflow_rescheduling(self, job_template_factory, inventory, project, controlplane_instance_group):
jt = JobTemplate.objects.create(allow_simultaneous=True, inventory=inventory, project=project, playbook='helloworld.yml')
wfjt = WorkflowJobTemplate.objects.create(name='foo')
for i in range(2):
@ -80,7 +81,7 @@ class TestJobLifeCycle:
# no further action is necessary, so rescheduling should not happen
self.run_tm(tm, [mock.call('successful')], [])
def test_task_manager_workflow_workflow_rescheduling(self):
def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group):
wfjts = [WorkflowJobTemplate.objects.create(name='foo')]
for i in range(5):
wfjt = WorkflowJobTemplate.objects.create(name='foo{}'.format(i))
@ -100,22 +101,6 @@ class TestJobLifeCycle:
self.run_tm(tm, expect_schedule=[mock.call()])
wfjts[0].refresh_from_db()
@pytest.fixture
def control_instance(self):
'''Control instance in the controlplane automatic IG'''
ig = InstanceGroup.objects.create(name='controlplane')
inst = Instance.objects.create(hostname='control-1', node_type='control', capacity=500)
ig.instances.add(inst)
return inst
@pytest.fixture
def execution_instance(self):
'''Execution node in the automatic default IG'''
ig = InstanceGroup.objects.create(name='default')
inst = Instance.objects.create(hostname='receptor-1', node_type='execution', capacity=500)
ig.instances.add(inst)
return inst
def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance):
assert Instance.objects.count() == 2
@ -142,10 +127,78 @@ class TestJobLifeCycle:
assert uj.capacity_type == 'execution'
assert [uj.execution_node, uj.controller_node] == [execution_instance.hostname, control_instance.hostname], uj
@pytest.mark.django_db
def test_job_fails_to_launch_when_no_control_capacity(self, job_template, control_instance_low_capacity, execution_instance):
enough_capacity = job_template.create_unified_job()
insufficient_capacity = job_template.create_unified_job()
all_ujs = [enough_capacity, insufficient_capacity]
for uj in all_ujs:
uj.signal_start()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager()
self.run_tm(tm)
for uj in all_ujs:
uj.refresh_from_db()
assert enough_capacity.status == 'waiting'
assert insufficient_capacity.status == 'pending'
assert [enough_capacity.execution_node, enough_capacity.controller_node] == [
execution_instance.hostname,
control_instance_low_capacity.hostname,
], enough_capacity
@pytest.mark.django_db
def test_hybrid_capacity(self, job_template, hybrid_instance):
enough_capacity = job_template.create_unified_job()
insufficient_capacity = job_template.create_unified_job()
expected_task_impact = enough_capacity.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
all_ujs = [enough_capacity, insufficient_capacity]
for uj in all_ujs:
uj.signal_start()
# There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting
tm = TaskManager()
self.run_tm(tm)
for uj in all_ujs:
uj.refresh_from_db()
assert enough_capacity.status == 'waiting'
assert insufficient_capacity.status == 'pending'
assert [enough_capacity.execution_node, enough_capacity.controller_node] == [
hybrid_instance.hostname,
hybrid_instance.hostname,
], enough_capacity
assert expected_task_impact == hybrid_instance.consumed_capacity
@pytest.mark.django_db
def test_project_update_capacity(self, project, hybrid_instance, instance_group_factory, controlplane_instance_group):
pu = project.create_unified_job()
instance_group_factory(name='second_ig', instances=[hybrid_instance])
expected_task_impact = pu.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
pu.signal_start()
tm = TaskManager()
self.run_tm(tm)
pu.refresh_from_db()
assert pu.status == 'waiting'
assert [pu.execution_node, pu.controller_node] == [
hybrid_instance.hostname,
hybrid_instance.hostname,
], pu
assert expected_task_impact == hybrid_instance.consumed_capacity
# The hybrid node is in both instance groups, but the project update should
# always get assigned to the controlplane
assert pu.instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME
pu.status = 'successful'
pu.save()
assert hybrid_instance.consumed_capacity == 0
@pytest.mark.django_db
def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory(
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]
)
@ -157,17 +210,17 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance)
TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance)
TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker):
instance = controlplane_instance_group.instances.all()[0]
objects = job_template_factory(
'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]
)
@ -184,12 +237,15 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group,
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, [], instance), mock.call(j2, default_instance_group, [], instance)])
TaskManager.start_task.assert_has_calls(
[mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)]
)
@pytest.mark.django_db
def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker):
instance = hybrid_instance
controlplane_instance_group = instance.rampart_groups.first()
objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"])
objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"])
j1 = objects1.jobs["job_should_start"]
@ -200,15 +256,15 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory
j2.save()
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 500
mock_task_impact.return_value = 505
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j1, default_instance_group, [], instance)
mock_job.assert_called_once_with(j1, controlplane_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j2, default_instance_group, [], instance)
mock_job.assert_called_once_with(j2, controlplane_instance_group, [], instance)
@pytest.mark.django_db
@ -240,9 +296,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
@pytest.mark.django_db
def test_single_job_dependencies_inventory_update_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
instance = default_instance_group.instances.all()[0]
instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
@ -260,18 +316,18 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group,
mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j], instance)
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance)
iu[0].status = "successful"
iu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory):
def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
instance = default_instance_group.instances.all()[0]
instance = controlplane_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
@ -293,7 +349,7 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat
mock_iu.assert_not_called()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@pytest.mark.django_db
@ -349,10 +405,10 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
@pytest.mark.django_db
def test_job_not_blocking_project_update(default_instance_group, job_template_factory):
def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"])
job = objects.jobs["job"]
job.instance_group = default_instance_group
job.instance_group = controlplane_instance_group
job.status = "running"
job.save()
@ -362,7 +418,7 @@ def test_job_not_blocking_project_update(default_instance_group, job_template_fa
proj = objects.project
project_update = proj.create_project_update()
project_update.instance_group = default_instance_group
project_update.instance_group = controlplane_instance_group
project_update.status = "pending"
project_update.save()
assert not task_manager.job_blocked_by(project_update)
@ -373,10 +429,10 @@ def test_job_not_blocking_project_update(default_instance_group, job_template_fa
@pytest.mark.django_db
def test_job_not_blocking_inventory_update(default_instance_group, job_template_factory, inventory_source_factory):
def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"])
job = objects.jobs["job"]
job.instance_group = default_instance_group
job.instance_group = controlplane_instance_group
job.status = "running"
job.save()
@ -389,7 +445,7 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_
inv_source.source = "ec2"
inv.inventory_sources.add(inv_source)
inventory_update = inv_source.create_inventory_update()
inventory_update.instance_group = default_instance_group
inventory_update.instance_group = controlplane_instance_group
inventory_update.status = "pending"
inventory_update.save()

View File

@ -1,7 +1,7 @@
import pytest
from unittest import mock
from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate, ProjectUpdate
from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate
from awx.main.models.activity_stream import ActivityStream
from awx.main.models.ha import Instance, InstanceGroup
from awx.main.tasks.system import apply_cluster_membership_policies
@ -92,7 +92,9 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
@pytest.mark.django_db
def test_policy_instance_few_instances(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
# we need to use node_type=execution because node_type=hybrid will implicitly
# create the controlplane execution group if it doesn't already exist
i1 = instance_factory("i1", node_type='execution')
ig_1 = instance_group_factory("ig1", percentage=25)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
@ -113,7 +115,7 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory)
assert len(ig_4.instances.all()) == 1
assert i1 in ig_4.instances.all()
i2 = instance_factory("i2")
i2 = instance_factory("i2", node_type='execution')
count += 1
apply_cluster_membership_policies()
assert ActivityStream.objects.count() == count
@ -334,13 +336,14 @@ def test_mixed_group_membership(instance_factory, instance_group_factory):
@pytest.mark.django_db
def test_instance_group_capacity(instance_factory, instance_group_factory):
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
node_capacity = 100
i1 = instance_factory("i1", capacity=node_capacity)
i2 = instance_factory("i2", capacity=node_capacity)
i3 = instance_factory("i3", capacity=node_capacity)
ig_all = instance_group_factory("all", instances=[i1, i2, i3])
assert ig_all.capacity == 300
assert ig_all.capacity == node_capacity * 3
ig_single = instance_group_factory("single", instances=[i1])
assert ig_single.capacity == 100
assert ig_single.capacity == node_capacity
@pytest.mark.django_db
@ -385,16 +388,6 @@ class TestInstanceGroupOrdering:
# API does not allow setting IGs on inventory source, so ignore those
assert iu.preferred_instance_groups == [ig_inv, ig_org]
def test_project_update_instance_groups(self, instance_group_factory, project, controlplane_instance_group):
pu = ProjectUpdate.objects.create(project=project, organization=project.organization)
assert pu.preferred_instance_groups == [controlplane_instance_group]
ig_org = instance_group_factory("OrgIstGrp", [controlplane_instance_group.instances.first()])
ig_tmp = instance_group_factory("TmpIstGrp", [controlplane_instance_group.instances.first()])
project.organization.instance_groups.add(ig_org)
assert pu.preferred_instance_groups == [ig_org, controlplane_instance_group]
project.instance_groups.add(ig_tmp)
assert pu.preferred_instance_groups == [ig_tmp, ig_org, controlplane_instance_group]
def test_job_instance_groups(self, instance_group_factory, inventory, project, default_instance_group):
jt = JobTemplate.objects.create(inventory=inventory, project=project)
job = jt.create_unified_job()

View File

@ -18,6 +18,8 @@ class FakeObject(object):
class Job(FakeObject):
task_impact = 43
is_container_group_task = False
controller_node = ''
execution_node = ''
def log_format(self):
return 'job 382 (fake)'

View File

@ -73,6 +73,9 @@ AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default')
# Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s".
AWX_CONTAINER_GROUP_POD_PENDING_TIMEOUT = "2h"
# How much capacity controlling a task costs a hybrid or control node
AWX_CONTROL_NODE_TASK_IMPACT = 1
# Internationalization
# https://docs.djangoproject.com/en/dev/topics/i18n/
#

View File

@ -21,7 +21,6 @@ from split_settings.tools import optional, include
# Load default settings.
from .defaults import * # NOQA
# awx-manage shell_plus --notebook
NOTEBOOK_ARGUMENTS = ['--NotebookApp.token=', '--ip', '0.0.0.0', '--port', '8888', '--allow-root', '--no-browser']

View File

@ -77,15 +77,41 @@ When a job is made to run, AWX will add `1` to the number of forks selected to c
systems with a `forks` value of `5`, then the actual `forks` value from the perspective of Job Impact will be 6.
#### Impact of Job Types in AWX
Jobs have two types of impact. Task "execution" impact and task "control" impact.
For instances that are the "controller_node" for a task,
the impact is set by settings.AWX_CONTROL_NODE_TASK_IMPACT and it is the same no matter what type of job.
For instances that are the "execution_node" for a task, the impact is calculated as following:
Jobs and Ad-hoc jobs follow the above model, `forks + 1`.
Other job types have a fixed impact:
Other job types have a fixed execution impact:
* Inventory Updates: 1
* Project Updates: 1
* System Jobs: 5
For jobs that execute on the same node as they are controlled by, both settings.AWX_CONTROL_NODE_TASK_IMPACT and the job task execution impact apply.
Examples:
Given settings.AWX_CONTROL_NODE_TASK_IMPACT is 1:
- Project updates (where the execution_node is always the same as the controller_node), have a total impact of 2.
- Container group jobs (where the execution node is not a member of the cluster) only control impact applies, and the controller node has a total task impact of 1.
### Selecting the Right settings.AWX_CONTROL_NODE_TASK_IMPACT
This setting allows you to determine how much impact controlling jobs has. This
can be helpful if you notice symptoms of your control plane exceeding desired
CPU or memory usage, as it effectivly throttles how many jobs can be run
concurrently by your control plane. This is usually a concern with container
groups, which at this time effectively have infinite capacity, so it is easy to
end up with too many jobs running concurrently, overwhelming the control plane
pods with events and control processes.
If you want more throttling behavior, increase the setting.
If you want less throttling behavior, lower the setting.
### Selecting the Right Capacity
Selecting between a memory-focused capacity algorithm and a CPU-focused capacity for your AWX use means you'll be selecting between a minimum