diff --git a/Makefile b/Makefile index c603e6239d..9d2e4e040b 100644 --- a/Makefile +++ b/Makefile @@ -268,13 +268,13 @@ awx-link: cp -f /tmp/awx.egg-link /var/lib/awx/venv/awx/lib/$(PYTHON)/site-packages/awx.egg-link TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests awx/sso/tests - +PYTEST_ARGS ?= -n auto # Run all API unit tests. test: if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider -n auto $(TEST_DIRS) + PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider $(PYTEST_ARGS) $(TEST_DIRS) cd awxkit && $(VENV_BASE)/awx/bin/tox -re py3 awx-manage check_migrations --dry-run --check -n 'missing_migration_file' diff --git a/awx/main/managers.py b/awx/main/managers.py index fd8ab44f81..2614193fe1 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -243,7 +243,13 @@ class InstanceGroupManager(models.Manager): for t in tasks: # TODO: dock capacity for isolated job management tasks running in queue impact = t.task_impact - if t.status == 'waiting' or not t.execution_node: + control_groups = [] + if t.controller_node: + control_groups = instance_ig_mapping.get(t.controller_node, []) + if not control_groups: + logger.warn(f"No instance group found for {t.controller_node}, capacity consumed may be innaccurate.") + + if t.status == 'waiting' or (not t.execution_node and not t.is_container_group_task): # Subtract capacity from any peer groups that share instances if not t.instance_group: impacted_groups = [] @@ -260,6 +266,12 @@ class InstanceGroupManager(models.Manager): graph[group_name][f'consumed_{capacity_type}_capacity'] += impact if breakdown: graph[group_name]['committed_capacity'] += impact + for group_name in control_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) + graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT + if breakdown: + graph[group_name]['committed_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT elif t.status == 'running': # Subtract capacity from all groups that contain the instance if t.execution_node not in instance_ig_mapping: @@ -271,6 +283,7 @@ class InstanceGroupManager(models.Manager): impacted_groups = [] else: impacted_groups = instance_ig_mapping[t.execution_node] + for group_name in impacted_groups: if group_name not in graph: self.zero_out_group(graph, group_name, breakdown) @@ -279,6 +292,12 @@ class InstanceGroupManager(models.Manager): graph[group_name][f'consumed_{capacity_type}_capacity'] += impact if breakdown: graph[group_name]['running_capacity'] += impact + for group_name in control_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) + graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT + if breakdown: + graph[group_name]['running_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT else: logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) return graph diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 7f749f01ea..43c1567119 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -145,7 +145,14 @@ class Instance(HasPolicyEditsMixin, BaseModel): @property def consumed_capacity(self): - return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + capacity_consumed = 0 + if self.node_type in ('hybrid', 'execution'): + capacity_consumed += sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + if self.node_type in ('hybrid', 'control'): + capacity_consumed += sum( + settings.AWX_CONTROL_NODE_TASK_IMPACT for x in UnifiedJob.objects.filter(controller_node=self.hostname, status__in=('running', 'waiting')) + ) + return capacity_consumed @property def remaining_capacity(self): @@ -345,15 +352,21 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): app_label = 'main' @staticmethod - def fit_task_to_most_remaining_capacity_instance(task, instances): + def fit_task_to_most_remaining_capacity_instance(task, instances, impact=None, capacity_type=None, add_hybrid_control_cost=False): + impact = impact if impact else task.task_impact + capacity_type = capacity_type if capacity_type else task.capacity_type instance_most_capacity = None + most_remaining_capacity = -1 for i in instances: - if i.node_type not in (task.capacity_type, 'hybrid'): + if i.node_type not in (capacity_type, 'hybrid'): continue - if i.remaining_capacity >= task.task_impact and ( - instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity - ): + would_be_remaining = i.remaining_capacity - impact + # hybrid nodes _always_ control their own tasks + if add_hybrid_control_cost and i.node_type == 'hybrid': + would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT + if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity): instance_most_capacity = i + most_remaining_capacity = would_be_remaining return instance_most_capacity @staticmethod diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 3fdd98b2b7..a2de97e34f 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -613,26 +613,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage def get_notification_friendly_name(self): return "Project Update" - @property - def preferred_instance_groups(self): - ''' - Project updates should pretty much always run on the control plane - however, we are not yet saying no to custom groupings within the control plane - Thus, we return custom groups and then unconditionally add the control plane - ''' - if self.organization is not None: - organization_groups = [x for x in self.organization.instance_groups.all()] - else: - organization_groups = [] - template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups] - selected_groups = template_groups + organization_groups - - controlplane_ig = self.control_plane_instance_group - if controlplane_ig and controlplane_ig[0] and controlplane_ig[0] not in selected_groups: - selected_groups += controlplane_ig - - return selected_groups - def save(self, *args, **kwargs): added_update_fields = [] if not self.job_tags: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 3be2d4cfc7..8765c2871b 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -70,6 +70,7 @@ class TaskManager: """ instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop') self.real_instances = {i.hostname: i for i in instances} + self.controlplane_ig = None instances_partial = [ SimpleNamespace( @@ -86,6 +87,8 @@ class TaskManager: instances_by_hostname = {i.hostname: i for i in instances_partial} for rampart_group in InstanceGroup.objects.prefetch_related('instances'): + if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: + self.controlplane_ig = rampart_group self.graph[rampart_group.name] = dict( graph=DependencyGraph(), execution_capacity=0, @@ -283,39 +286,13 @@ class TaskManager: task.send_notification_templates('running') logger.debug('Transitioning %s to running status.', task.log_format) schedule_task_manager() - elif rampart_group.is_container_group: - task.instance_group = rampart_group - if task.capacity_type == 'execution': - # find one real, non-containerized instance with capacity to - # act as the controller for k8s API interaction - try: - task.controller_node = Instance.choose_online_control_plane_node() - task.log_lifecycle("controller_node_chosen") - except IndexError: - logger.warning("No control plane nodes available to run containerized job {}".format(task.log_format)) - return - else: - # project updates and system jobs don't *actually* run in pods, so - # just pick *any* non-containerized host and use it as the execution node - task.execution_node = Instance.choose_online_control_plane_node() - task.log_lifecycle("execution_node_chosen") - logger.debug('Submitting containerized {} to queue {}.'.format(task.log_format, task.execution_node)) + # at this point we already have control/execution nodes selected for the following cases else: task.instance_group = rampart_group - task.execution_node = instance.hostname - task.log_lifecycle("execution_node_chosen") - if instance.node_type == 'execution': - try: - task.controller_node = Instance.choose_online_control_plane_node() - task.log_lifecycle("controller_node_chosen") - except IndexError: - logger.warning("No control plane nodes available to manage {}".format(task.log_format)) - return - else: - # control plane nodes will manage jobs locally for performance and resilience - task.controller_node = task.execution_node - task.log_lifecycle("controller_node_chosen") - logger.debug('Submitting job {} to queue {} controlled by {}.'.format(task.log_format, task.execution_node, task.controller_node)) + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {rampart_group.name}{execution_node_msg}.' + ) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -323,6 +300,13 @@ class TaskManager: if rampart_group is not None: self.consume_capacity(task, rampart_group.name, instance=instance) + if task.controller_node: + self.consume_capacity( + task, + settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, + instance=self.real_instances[task.controller_node], + impact=settings.AWX_CONTROL_NODE_TASK_IMPACT, + ) def post_commit(): if task.status != 'failed' and type(task) is not WorkflowJob: @@ -497,9 +481,10 @@ class TaskManager: task.job_explanation = job_explanation tasks_to_update_job_explanation.append(task) continue - preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + preferred_instance_groups = task.preferred_instance_groups + if isinstance(task, WorkflowJob): if task.unified_job_template_id in running_workflow_templates: if not task.allow_simultaneous: @@ -510,9 +495,36 @@ class TaskManager: self.start_task(task, None, task.get_jobs_fail_chain(), None) continue + # Determine if there is control capacity for the task + if task.capacity_type == 'control': + control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT + else: + control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT + control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance( + task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control' + ) + if not control_instance: + self.task_needs_capacity(task, tasks_to_update_job_explanation) + logger.debug(f"Skipping task {task.log_format} in pending, not enough capacity left on controlplane to control new tasks") + continue + + task.controller_node = control_instance.hostname + + # All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups + if task.capacity_type == 'control': + task.execution_node = control_instance.hostname + control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact) + control_instance.jobs_running += 1 + self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task) + execution_instance = self.real_instances[control_instance.hostname] + self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance) + found_acceptable_queue = True + continue + for rampart_group in preferred_instance_groups: - if task.capacity_type == 'execution' and rampart_group.is_container_group: - self.graph[rampart_group.name]['graph'].add_job(task) + if rampart_group.is_container_group: + control_instance.jobs_running += 1 + self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None) found_acceptable_queue = True break @@ -521,28 +533,32 @@ class TaskManager: if settings.IS_K8S and task.capacity_type == 'execution': logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name)) continue - - remaining_capacity = self.get_remaining_capacity(rampart_group.name, capacity_type=task.capacity_type) - if task.task_impact > 0 and remaining_capacity <= 0: - logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity)) - continue - + # at this point we know the instance group is NOT a container group + # because if it was, it would have started the task and broke out of the loop. execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance( - task, self.graph[rampart_group.name]['instances'] + task, self.graph[rampart_group.name]['instances'], add_hybrid_control_cost=True ) or InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'], capacity_type=task.capacity_type) - if execution_instance or rampart_group.is_container_group: - if not rampart_group.is_container_group: - execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact) - execution_instance.jobs_running += 1 - logger.debug( - "Starting {} in group {} instance {} (remaining_capacity={})".format( - task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity - ) - ) + if execution_instance: + task.execution_node = execution_instance.hostname + # If our execution instance is a hybrid, prefer to do control tasks there as well. + if execution_instance.node_type == 'hybrid': + control_instance = execution_instance + task.controller_node = execution_instance.hostname - if execution_instance: - execution_instance = self.real_instances[execution_instance.hostname] + control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_NODE_TASK_IMPACT) + task.log_lifecycle("controller_node_chosen") + if control_instance != execution_instance: + control_instance.jobs_running += 1 + execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact) + execution_instance.jobs_running += 1 + task.log_lifecycle("execution_node_chosen") + logger.debug( + "Starting {} in group {} instance {} (remaining_capacity={})".format( + task.log_format, rampart_group.name, execution_instance.hostname, execution_instance.remaining_capacity + ) + ) + execution_instance = self.real_instances[execution_instance.hostname] self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True @@ -554,18 +570,21 @@ class TaskManager: ) ) if not found_acceptable_queue: - task.log_lifecycle("needs_capacity") - job_explanation = gettext_noop("This job is not ready to start because there is not enough available capacity.") - if task.job_explanation != job_explanation: - if task.created < (tz_now() - self.time_delta_job_explanation): - # Many launched jobs are immediately blocked, but most blocks will resolve in a few seconds. - # Therefore we should only update the job_explanation after some time has elapsed to - # prevent excessive task saves. - task.job_explanation = job_explanation - tasks_to_update_job_explanation.append(task) - logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) + self.task_needs_capacity(task, tasks_to_update_job_explanation) UnifiedJob.objects.bulk_update(tasks_to_update_job_explanation, ['job_explanation']) + def task_needs_capacity(self, task, tasks_to_update_job_explanation): + task.log_lifecycle("needs_capacity") + job_explanation = gettext_noop("This job is not ready to start because there is not enough available capacity.") + if task.job_explanation != job_explanation: + if task.created < (tz_now() - self.time_delta_job_explanation): + # Many launched jobs are immediately blocked, but most blocks will resolve in a few seconds. + # Therefore we should only update the job_explanation after some time has elapsed to + # prevent excessive task saves. + task.job_explanation = job_explanation + tasks_to_update_job_explanation.append(task) + logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) + def timeout_approval_node(self): workflow_approvals = WorkflowApproval.objects.filter(status='pending') now = tz_now() @@ -600,16 +619,17 @@ class TaskManager: def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) - def consume_capacity(self, task, instance_group, instance=None): + def consume_capacity(self, task, instance_group, instance=None, impact=None): + impact = impact if impact else task.task_impact logger.debug( '{} consumed {} capacity units from {} with prior total of {}'.format( - task.log_format, task.task_impact, instance_group, self.graph[instance_group]['consumed_capacity'] + task.log_format, impact, instance_group, self.graph[instance_group]['consumed_capacity'] ) ) - self.graph[instance_group]['consumed_capacity'] += task.task_impact + self.graph[instance_group]['consumed_capacity'] += impact for capacity_type in ('control', 'execution'): if instance is None or instance.node_type in ('hybrid', capacity_type): - self.graph[instance_group][f'consumed_{capacity_type}_capacity'] += task.task_impact + self.graph[instance_group][f'consumed_{capacity_type}_capacity'] += impact def get_remaining_capacity(self, instance_group, capacity_type='execution'): return self.graph[instance_group][f'{capacity_type}_capacity'] - self.graph[instance_group][f'consumed_{capacity_type}_capacity'] diff --git a/awx/main/tests/conftest.py b/awx/main/tests/conftest.py index 79044b868e..0400f025d2 100644 --- a/awx/main/tests/conftest.py +++ b/awx/main/tests/conftest.py @@ -15,6 +15,7 @@ from awx.main.tests.factories import ( ) from django.core.cache import cache +from django.conf import settings def pytest_addoption(parser): @@ -80,13 +81,44 @@ def instance_group_factory(): @pytest.fixture -def default_instance_group(instance_factory, instance_group_factory): - return create_instance_group("default", instances=[create_instance("hostA")]) +def controlplane_instance_group(instance_factory, instance_group_factory): + """There always has to be a controlplane instancegroup and at least one instance in it""" + return create_instance_group(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, create_instance('hybrid-1', node_type='hybrid', capacity=500)) @pytest.fixture -def controlplane_instance_group(instance_factory, instance_group_factory): - return create_instance_group("controlplane", instances=[create_instance("hostA")]) +def default_instance_group(instance_factory, instance_group_factory): + return create_instance_group("default", instances=[create_instance("hostA", node_type='execution')]) + + +@pytest.fixture +def control_instance(): + '''Control instance in the controlplane automatic IG''' + inst = create_instance('control-1', node_type='control', capacity=500) + return inst + + +@pytest.fixture +def control_instance_low_capacity(): + '''Control instance in the controlplane automatic IG that has low capacity''' + inst = create_instance('control-1', node_type='control', capacity=5) + return inst + + +@pytest.fixture +def execution_instance(): + '''Execution node in the automatic default IG''' + ig = create_instance_group('default') + inst = create_instance('receptor-1', node_type='execution', capacity=500) + ig.instances.add(inst) + return inst + + +@pytest.fixture +def hybrid_instance(): + '''Hybrid node in the default controlplane IG''' + inst = create_instance('hybrid-1', node_type='hybrid', capacity=500) + return inst @pytest.fixture diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py index 267e5717b3..574916a84f 100644 --- a/awx/main/tests/factories/fixtures.py +++ b/awx/main/tests/factories/fixtures.py @@ -28,12 +28,15 @@ from awx.main.models import ( # -def mk_instance(persisted=True, hostname='instance.example.org'): +def mk_instance(persisted=True, hostname='instance.example.org', node_type='hybrid', capacity=100): if not persisted: raise RuntimeError('creating an Instance requires persisted=True') from django.conf import settings - return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname)[0] + instance = Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname, node_type=node_type, capacity=capacity)[0] + if node_type in ('control', 'hybrid'): + mk_instance_group(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, instance=instance) + return instance def mk_instance_group(name='default', instance=None, minimum=0, percentage=0): @@ -52,7 +55,9 @@ def mk_organization(name, description=None, persisted=True): description = description or '{}-description'.format(name) org = Organization(name=name, description=description) if persisted: - mk_instance(persisted) + instances = Instance.objects.all() + if not instances: + mk_instance(persisted) org.save() return org diff --git a/awx/main/tests/factories/tower.py b/awx/main/tests/factories/tower.py index 6cf632f40b..2688dbde19 100644 --- a/awx/main/tests/factories/tower.py +++ b/awx/main/tests/factories/tower.py @@ -132,8 +132,8 @@ def generate_teams(organization, persisted, **kwargs): return teams -def create_instance(name, instance_groups=None): - return mk_instance(hostname=name) +def create_instance(name, instance_groups=None, node_type='hybrid', capacity=200): + return mk_instance(hostname=name, node_type=node_type, capacity=capacity) def create_instance_group(name, instances=None, minimum=0, percentage=0): diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index ecbe284b6a..74ab92fd7b 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -127,7 +127,7 @@ class TestApprovalNodes: ] @pytest.mark.django_db - def test_approval_node_approve(self, post, admin_user, job_template): + def test_approval_node_approve(self, post, admin_user, job_template, controlplane_instance_group): # This test ensures that a user (with permissions to do so) can APPROVE # workflow approvals. Also asserts that trying to APPROVE approvals # that have already been dealt with will throw an error. @@ -152,7 +152,7 @@ class TestApprovalNodes: post(reverse('api:workflow_approval_approve', kwargs={'pk': approval.pk}), user=admin_user, expect=400) @pytest.mark.django_db - def test_approval_node_deny(self, post, admin_user, job_template): + def test_approval_node_deny(self, post, admin_user, job_template, controlplane_instance_group): # This test ensures that a user (with permissions to do so) can DENY # workflow approvals. Also asserts that trying to DENY approvals # that have already been dealt with will throw an error. diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index db39d30e22..728c60f92d 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -7,7 +7,7 @@ from awx.main.tasks.system import apply_cluster_membership_policies @pytest.mark.django_db -def test_multi_group_basic_job_launch(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory): +def test_multi_group_basic_job_launch(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) @@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, controlplane_insta @pytest.mark.django_db -def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_instance_group, mocker): +def test_workflow_job_no_instancegroup(workflow_job_template_factory, controlplane_instance_group, mocker): wfjt = workflow_job_template_factory('anicedayforawalk').workflow_job_template wfj = WorkflowJob.objects.create(workflow_job_template=wfjt) wfj.status = "pending" @@ -79,9 +79,10 @@ def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_in @pytest.mark.django_db -def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory): +def test_overcapacity_blocking_other_groups_unaffected(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory): i1 = instance_factory("i1") - i1.capacity = 1000 + # need to account a little extra for controller node capacity impact + i1.capacity = 1020 i1.save() i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) @@ -120,7 +121,7 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default @pytest.mark.django_db -def test_failover_group_run(instance_factory, default_instance_group, mocker, instance_group_factory, job_template_factory): +def test_failover_group_run(instance_factory, controlplane_instance_group, mocker, instance_group_factory, job_template_factory): i1 = instance_factory("i1") i2 = instance_factory("i2") ig1 = instance_group_factory("ig1", instances=[i1]) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 8b4db09a75..9ceda70eed 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -7,19 +7,20 @@ from awx.main.scheduler import TaskManager from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.utils import encrypt_field from awx.main.models import WorkflowJobTemplate, JobTemplate, Job -from awx.main.models.ha import Instance, InstanceGroup +from awx.main.models.ha import Instance +from django.conf import settings @pytest.mark.django_db -def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker): - instance = default_instance_group.instances.all()[0] +def test_single_job_scheduler_launch(hybrid_instance, controlplane_instance_group, job_template_factory, mocker): + instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) j = objects.jobs["job_should_start"] j.status = 'pending' j.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) + TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) @pytest.mark.django_db @@ -47,7 +48,7 @@ class TestJobLifeCycle: if expect_commit is not None: assert mock_commit.mock_calls == expect_commit - def test_task_manager_workflow_rescheduling(self, job_template_factory, inventory, project, default_instance_group): + def test_task_manager_workflow_rescheduling(self, job_template_factory, inventory, project, controlplane_instance_group): jt = JobTemplate.objects.create(allow_simultaneous=True, inventory=inventory, project=project, playbook='helloworld.yml') wfjt = WorkflowJobTemplate.objects.create(name='foo') for i in range(2): @@ -80,7 +81,7 @@ class TestJobLifeCycle: # no further action is necessary, so rescheduling should not happen self.run_tm(tm, [mock.call('successful')], []) - def test_task_manager_workflow_workflow_rescheduling(self): + def test_task_manager_workflow_workflow_rescheduling(self, controlplane_instance_group): wfjts = [WorkflowJobTemplate.objects.create(name='foo')] for i in range(5): wfjt = WorkflowJobTemplate.objects.create(name='foo{}'.format(i)) @@ -100,22 +101,6 @@ class TestJobLifeCycle: self.run_tm(tm, expect_schedule=[mock.call()]) wfjts[0].refresh_from_db() - @pytest.fixture - def control_instance(self): - '''Control instance in the controlplane automatic IG''' - ig = InstanceGroup.objects.create(name='controlplane') - inst = Instance.objects.create(hostname='control-1', node_type='control', capacity=500) - ig.instances.add(inst) - return inst - - @pytest.fixture - def execution_instance(self): - '''Execution node in the automatic default IG''' - ig = InstanceGroup.objects.create(name='default') - inst = Instance.objects.create(hostname='receptor-1', node_type='execution', capacity=500) - ig.instances.add(inst) - return inst - def test_control_and_execution_instance(self, project, system_job_template, job_template, inventory_source, control_instance, execution_instance): assert Instance.objects.count() == 2 @@ -142,10 +127,78 @@ class TestJobLifeCycle: assert uj.capacity_type == 'execution' assert [uj.execution_node, uj.controller_node] == [execution_instance.hostname, control_instance.hostname], uj + @pytest.mark.django_db + def test_job_fails_to_launch_when_no_control_capacity(self, job_template, control_instance_low_capacity, execution_instance): + enough_capacity = job_template.create_unified_job() + insufficient_capacity = job_template.create_unified_job() + all_ujs = [enough_capacity, insufficient_capacity] + for uj in all_ujs: + uj.signal_start() + + # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting + tm = TaskManager() + self.run_tm(tm) + + for uj in all_ujs: + uj.refresh_from_db() + assert enough_capacity.status == 'waiting' + assert insufficient_capacity.status == 'pending' + assert [enough_capacity.execution_node, enough_capacity.controller_node] == [ + execution_instance.hostname, + control_instance_low_capacity.hostname, + ], enough_capacity + + @pytest.mark.django_db + def test_hybrid_capacity(self, job_template, hybrid_instance): + enough_capacity = job_template.create_unified_job() + insufficient_capacity = job_template.create_unified_job() + expected_task_impact = enough_capacity.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT + all_ujs = [enough_capacity, insufficient_capacity] + for uj in all_ujs: + uj.signal_start() + + # There is only enough control capacity to run one of the jobs so one should end up in pending and the other in waiting + tm = TaskManager() + self.run_tm(tm) + + for uj in all_ujs: + uj.refresh_from_db() + assert enough_capacity.status == 'waiting' + assert insufficient_capacity.status == 'pending' + assert [enough_capacity.execution_node, enough_capacity.controller_node] == [ + hybrid_instance.hostname, + hybrid_instance.hostname, + ], enough_capacity + assert expected_task_impact == hybrid_instance.consumed_capacity + + @pytest.mark.django_db + def test_project_update_capacity(self, project, hybrid_instance, instance_group_factory, controlplane_instance_group): + pu = project.create_unified_job() + instance_group_factory(name='second_ig', instances=[hybrid_instance]) + expected_task_impact = pu.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT + pu.signal_start() + + tm = TaskManager() + self.run_tm(tm) + + pu.refresh_from_db() + assert pu.status == 'waiting' + assert [pu.execution_node, pu.controller_node] == [ + hybrid_instance.hostname, + hybrid_instance.hostname, + ], pu + assert expected_task_impact == hybrid_instance.consumed_capacity + # The hybrid node is in both instance groups, but the project update should + # always get assigned to the controlplane + assert pu.instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME + pu.status = 'successful' + pu.save() + assert hybrid_instance.consumed_capacity == 0 + @pytest.mark.django_db -def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker): - instance = default_instance_group.instances.all()[0] +def test_single_jt_multi_job_launch_blocks_last(controlplane_instance_group, job_template_factory, mocker): + instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory( 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] ) @@ -157,17 +210,17 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance) + TaskManager.start_task.assert_called_once_with(j1, controlplane_instance_group, [], instance) j1.status = "successful" j1.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance) + TaskManager.start_task.assert_called_once_with(j2, controlplane_instance_group, [], instance) @pytest.mark.django_db -def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker): - instance = default_instance_group.instances.all()[0] +def test_single_jt_multi_job_launch_allow_simul_allowed(controlplane_instance_group, job_template_factory, mocker): + instance = controlplane_instance_group.instances.all()[0] objects = job_template_factory( 'jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"] ) @@ -184,12 +237,15 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, [], instance), mock.call(j2, default_instance_group, [], instance)]) + TaskManager.start_task.assert_has_calls( + [mock.call(j1, controlplane_instance_group, [], instance), mock.call(j2, controlplane_instance_group, [], instance)] + ) @pytest.mark.django_db -def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker): - instance = default_instance_group.instances.all()[0] +def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocker): + instance = hybrid_instance + controlplane_instance_group = instance.rampart_groups.first() objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) objects2 = job_template_factory('jt2', organization='org2', project='proj2', inventory='inv2', credential='cred2', jobs=["job_should_not_start"]) j1 = objects1.jobs["job_should_start"] @@ -200,15 +256,15 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory j2.save() tm = TaskManager() with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact: - mock_task_impact.return_value = 500 + mock_task_impact.return_value = 505 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j1, default_instance_group, [], instance) + mock_job.assert_called_once_with(j1, controlplane_instance_group, [], instance) j1.status = "successful" j1.save() with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j2, default_instance_group, [], instance) + mock_job.assert_called_once_with(j2, controlplane_instance_group, [], instance) @pytest.mark.django_db @@ -240,9 +296,9 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job @pytest.mark.django_db -def test_single_job_dependencies_inventory_update_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory): +def test_single_job_dependencies_inventory_update_launch(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) - instance = default_instance_group.instances.all()[0] + instance = controlplane_instance_group.instances.all()[0] j = objects.jobs["job_should_start"] j.status = 'pending' j.save() @@ -260,18 +316,18 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1 - TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j], instance) + TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, [j], instance) iu[0].status = "successful" iu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) + TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) @pytest.mark.django_db -def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory): +def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) - instance = default_instance_group.instances.all()[0] + instance = controlplane_instance_group.instances.all()[0] j = objects.jobs["job_should_start"] j.status = 'pending' j.save() @@ -293,7 +349,7 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat mock_iu.assert_not_called() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) + TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) @pytest.mark.django_db @@ -349,10 +405,10 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa @pytest.mark.django_db -def test_job_not_blocking_project_update(default_instance_group, job_template_factory): +def test_job_not_blocking_project_update(controlplane_instance_group, job_template_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) job = objects.jobs["job"] - job.instance_group = default_instance_group + job.instance_group = controlplane_instance_group job.status = "running" job.save() @@ -362,7 +418,7 @@ def test_job_not_blocking_project_update(default_instance_group, job_template_fa proj = objects.project project_update = proj.create_project_update() - project_update.instance_group = default_instance_group + project_update.instance_group = controlplane_instance_group project_update.status = "pending" project_update.save() assert not task_manager.job_blocked_by(project_update) @@ -373,10 +429,10 @@ def test_job_not_blocking_project_update(default_instance_group, job_template_fa @pytest.mark.django_db -def test_job_not_blocking_inventory_update(default_instance_group, job_template_factory, inventory_source_factory): +def test_job_not_blocking_inventory_update(controlplane_instance_group, job_template_factory, inventory_source_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job"]) job = objects.jobs["job"] - job.instance_group = default_instance_group + job.instance_group = controlplane_instance_group job.status = "running" job.save() @@ -389,7 +445,7 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_ inv_source.source = "ec2" inv.inventory_sources.add(inv_source) inventory_update = inv_source.create_inventory_update() - inventory_update.instance_group = default_instance_group + inventory_update.instance_group = controlplane_instance_group inventory_update.status = "pending" inventory_update.save() diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index c8efd8b39e..81771a7253 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -1,7 +1,7 @@ import pytest from unittest import mock -from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate, ProjectUpdate +from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance, InstanceGroup from awx.main.tasks.system import apply_cluster_membership_policies @@ -92,7 +92,9 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan @pytest.mark.django_db def test_policy_instance_few_instances(instance_factory, instance_group_factory): - i1 = instance_factory("i1") + # we need to use node_type=execution because node_type=hybrid will implicitly + # create the controlplane execution group if it doesn't already exist + i1 = instance_factory("i1", node_type='execution') ig_1 = instance_group_factory("ig1", percentage=25) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) @@ -113,7 +115,7 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory) assert len(ig_4.instances.all()) == 1 assert i1 in ig_4.instances.all() - i2 = instance_factory("i2") + i2 = instance_factory("i2", node_type='execution') count += 1 apply_cluster_membership_policies() assert ActivityStream.objects.count() == count @@ -334,13 +336,14 @@ def test_mixed_group_membership(instance_factory, instance_group_factory): @pytest.mark.django_db def test_instance_group_capacity(instance_factory, instance_group_factory): - i1 = instance_factory("i1") - i2 = instance_factory("i2") - i3 = instance_factory("i3") + node_capacity = 100 + i1 = instance_factory("i1", capacity=node_capacity) + i2 = instance_factory("i2", capacity=node_capacity) + i3 = instance_factory("i3", capacity=node_capacity) ig_all = instance_group_factory("all", instances=[i1, i2, i3]) - assert ig_all.capacity == 300 + assert ig_all.capacity == node_capacity * 3 ig_single = instance_group_factory("single", instances=[i1]) - assert ig_single.capacity == 100 + assert ig_single.capacity == node_capacity @pytest.mark.django_db @@ -385,16 +388,6 @@ class TestInstanceGroupOrdering: # API does not allow setting IGs on inventory source, so ignore those assert iu.preferred_instance_groups == [ig_inv, ig_org] - def test_project_update_instance_groups(self, instance_group_factory, project, controlplane_instance_group): - pu = ProjectUpdate.objects.create(project=project, organization=project.organization) - assert pu.preferred_instance_groups == [controlplane_instance_group] - ig_org = instance_group_factory("OrgIstGrp", [controlplane_instance_group.instances.first()]) - ig_tmp = instance_group_factory("TmpIstGrp", [controlplane_instance_group.instances.first()]) - project.organization.instance_groups.add(ig_org) - assert pu.preferred_instance_groups == [ig_org, controlplane_instance_group] - project.instance_groups.add(ig_tmp) - assert pu.preferred_instance_groups == [ig_tmp, ig_org, controlplane_instance_group] - def test_job_instance_groups(self, instance_group_factory, inventory, project, default_instance_group): jt = JobTemplate.objects.create(inventory=inventory, project=project) job = jt.create_unified_job() diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py index fab27c6c76..2b7cc2399b 100644 --- a/awx/main/tests/unit/test_capacity.py +++ b/awx/main/tests/unit/test_capacity.py @@ -18,6 +18,8 @@ class FakeObject(object): class Job(FakeObject): task_impact = 43 is_container_group_task = False + controller_node = '' + execution_node = '' def log_format(self): return 'job 382 (fake)' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 31c0957987..5ddefb66bd 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -73,6 +73,9 @@ AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default') # Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s". AWX_CONTAINER_GROUP_POD_PENDING_TIMEOUT = "2h" +# How much capacity controlling a task costs a hybrid or control node +AWX_CONTROL_NODE_TASK_IMPACT = 1 + # Internationalization # https://docs.djangoproject.com/en/dev/topics/i18n/ # diff --git a/awx/settings/development.py b/awx/settings/development.py index 8fb1c8c1c6..70b64643dd 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -21,7 +21,6 @@ from split_settings.tools import optional, include # Load default settings. from .defaults import * # NOQA - # awx-manage shell_plus --notebook NOTEBOOK_ARGUMENTS = ['--NotebookApp.token=', '--ip', '0.0.0.0', '--port', '8888', '--allow-root', '--no-browser'] diff --git a/docs/capacity.md b/docs/capacity.md index c4d75d4de4..2b40b46da7 100644 --- a/docs/capacity.md +++ b/docs/capacity.md @@ -77,15 +77,41 @@ When a job is made to run, AWX will add `1` to the number of forks selected to c systems with a `forks` value of `5`, then the actual `forks` value from the perspective of Job Impact will be 6. #### Impact of Job Types in AWX +Jobs have two types of impact. Task "execution" impact and task "control" impact. + +For instances that are the "controller_node" for a task, +the impact is set by settings.AWX_CONTROL_NODE_TASK_IMPACT and it is the same no matter what type of job. + +For instances that are the "execution_node" for a task, the impact is calculated as following: Jobs and Ad-hoc jobs follow the above model, `forks + 1`. -Other job types have a fixed impact: +Other job types have a fixed execution impact: * Inventory Updates: 1 * Project Updates: 1 * System Jobs: 5 +For jobs that execute on the same node as they are controlled by, both settings.AWX_CONTROL_NODE_TASK_IMPACT and the job task execution impact apply. + +Examples: +Given settings.AWX_CONTROL_NODE_TASK_IMPACT is 1: + - Project updates (where the execution_node is always the same as the controller_node), have a total impact of 2. + - Container group jobs (where the execution node is not a member of the cluster) only control impact applies, and the controller node has a total task impact of 1. + +### Selecting the Right settings.AWX_CONTROL_NODE_TASK_IMPACT + +This setting allows you to determine how much impact controlling jobs has. This +can be helpful if you notice symptoms of your control plane exceeding desired +CPU or memory usage, as it effectivly throttles how many jobs can be run +concurrently by your control plane. This is usually a concern with container +groups, which at this time effectively have infinite capacity, so it is easy to +end up with too many jobs running concurrently, overwhelming the control plane +pods with events and control processes. + +If you want more throttling behavior, increase the setting. +If you want less throttling behavior, lower the setting. + ### Selecting the Right Capacity Selecting between a memory-focused capacity algorithm and a CPU-focused capacity for your AWX use means you'll be selecting between a minimum