diff --git a/Makefile b/Makefile index ae70de0583..db670a123e 100644 --- a/Makefile +++ b/Makefile @@ -173,7 +173,7 @@ init: . $(VENV_BASE)/awx/bin/activate; \ fi; \ $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \ - $(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100; + $(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100; # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 96cd3a5c1d..6613ca5858 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -4,6 +4,8 @@ # Python import logging +from django.conf import settings + # Django REST Framework from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework import permissions @@ -245,7 +247,7 @@ class IsSuperUser(permissions.BasePermission): class InstanceGroupTowerPermission(ModelAccessPermission): def has_object_permission(self, request, view, obj): - if request.method == 'DELETE' and obj.name == "tower": + if request.method == 'DELETE' and obj.name in [settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]: return False return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index dc50b72237..733b1f6925 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4918,8 +4918,12 @@ class InstanceGroupSerializer(BaseSerializer): return value def validate_name(self, value): - if self.instance and self.instance.name == 'tower' and value != 'tower': - raise serializers.ValidationError(_('tower instance group name may not be changed.')) + if self.instance and self.instance.name == settings.DEFAULT_EXECUTION_QUEUE_NAME and value != settings.DEFAULT_EXECUTION_QUEUE_NAME: + raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_EXECUTION_QUEUE_NAME)) + + if self.instance and self.instance.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME and value != settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: + raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)) + return value def validate_credential(self, value): diff --git a/awx/main/conf.py b/awx/main/conf.py index 09360fc54f..c121dcda51 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -177,6 +177,24 @@ register( read_only=True, ) +register( + 'DEFAULT_CONTROL_PLANE_QUEUE_NAME', + field_class=fields.CharField, + label=_('The instance group where control plane tasks run'), + category=_('System'), + category_slug='system', + read_only=True, +) + +register( + 'DEFAULT_EXECUTION_QUEUE_NAME', + field_class=fields.CharField, + label=_('The instance group where user jobs run (currently only on non-VM installs)'), + category=_('System'), + category_slug='system', + read_only=True, +) + register( 'DEFAULT_EXECUTION_ENVIRONMENT', field_class=fields.PrimaryKeyRelatedField, diff --git a/awx/main/managers.py b/awx/main/managers.py index 3355b4e8e4..955d730213 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -146,8 +146,8 @@ class InstanceManager(models.Manager): pod_ip = os.environ.get('MY_POD_IP') registered = self.register(ip_address=pod_ip) - is_container_group = settings.IS_K8S - RegisterQueue('tower', 100, 0, [], is_container_group).register() + RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register() + RegisterQueue(settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True).register() return registered else: return (False, self.me()) @@ -156,10 +156,6 @@ class InstanceManager(models.Manager): """Return count of active Tower nodes for licensing.""" return self.all().count() - def my_role(self): - # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing - return "tower" - class InstanceGroupManager(models.Manager): """A custom manager class for the Instance model. diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index fd5ec56596..a132a2ae6c 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -130,12 +130,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): return self.modified < ref_time - timedelta(seconds=grace_period) def refresh_capacity(self): - if settings.IS_K8S: - self.capacity = self.cpu = self.memory = self.cpu_capacity = self.mem_capacity = 0 # noqa - self.version = awx_application_version - self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) - return - cpu = get_cpu_capacity() mem = get_mem_capacity() if self.enabled: diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 0bdd61a397..70e204c796 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1253,10 +1253,12 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): return UnpartitionedSystemJobEvent return SystemJobEvent + @property + def can_run_on_control_plane(self): + return True + @property def task_impact(self): - if settings.IS_K8S: - return 0 return 5 @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7192265412..d2b62eac7e 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -553,6 +553,10 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage websocket_data.update(dict(project_id=self.project.id)) return websocket_data + @property + def can_run_on_control_plane(self): + return True + @property def event_class(self): if self.has_unpartitioned_events: @@ -561,8 +565,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage @property def task_impact(self): - if settings.IS_K8S: - return 0 return 0 if self.job_type == 'run' else 1 @property @@ -623,6 +625,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage organization_groups = [] template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups] selected_groups = template_groups + organization_groups + if not any([not group.is_container_group for group in selected_groups]): + selected_groups = selected_groups + list(self.control_plane_instance_group) if not selected_groups: return self.global_instance_groups return selected_groups diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 1407f8418a..2b2f05a51a 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -736,6 +736,13 @@ class UnifiedJob( def _get_task_class(cls): raise NotImplementedError # Implement in subclasses. + @property + def can_run_on_control_plane(self): + if settings.IS_K8S: + return False + + return True + @property def can_run_containerized(self): return False @@ -1415,14 +1422,21 @@ class UnifiedJob( return [] return list(self.unified_job_template.instance_groups.all()) + @property + def control_plane_instance_group(self): + from awx.main.models.ha import InstanceGroup + + control_plane_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME) + + return list(control_plane_instance_group) + @property def global_instance_groups(self): from awx.main.models.ha import InstanceGroup - default_instance_group = InstanceGroup.objects.filter(name='tower') - if default_instance_group.exists(): - return [default_instance_group.first()] - return [] + default_instance_groups = InstanceGroup.objects.filter(name__in=[settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]) + + return list(default_instance_groups) def awx_meta_vars(self): """ diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 36c0d879b8..867a4d649a 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -474,6 +474,7 @@ class TaskManager: tasks_to_update_job_explanation.append(task) continue preferred_instance_groups = task.preferred_instance_groups + found_acceptable_queue = False if isinstance(task, WorkflowJob): if task.unified_job_template_id in running_workflow_templates: @@ -484,6 +485,7 @@ class TaskManager: running_workflow_templates.add(task.unified_job_template_id) self.start_task(task, None, task.get_jobs_fail_chain(), None) continue + for rampart_group in preferred_instance_groups: if task.can_run_containerized and rampart_group.is_container_group: self.graph[rampart_group.name]['graph'].add_job(task) @@ -491,12 +493,12 @@ class TaskManager: found_acceptable_queue = True break + if not task.can_run_on_control_plane: + logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name)) + continue + remaining_capacity = self.get_remaining_capacity(rampart_group.name) - if ( - task.task_impact > 0 - and not rampart_group.is_container_group # project updates have a cost of zero - and self.get_remaining_capacity(rampart_group.name) <= 0 - ): + if task.task_impact > 0 and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity)) continue diff --git a/awx/main/tests/conftest.py b/awx/main/tests/conftest.py index 11c88823e5..012a8f1c93 100644 --- a/awx/main/tests/conftest.py +++ b/awx/main/tests/conftest.py @@ -81,7 +81,7 @@ def instance_group_factory(): @pytest.fixture def default_instance_group(instance_factory, instance_group_factory): - return create_instance_group("tower", instances=[create_instance("hostA")]) + return create_instance_group("default", instances=[create_instance("hostA")]) @pytest.fixture diff --git a/awx/main/tests/functional/api/test_instance_group.py b/awx/main/tests/functional/api/test_instance_group.py index 884856eed0..22309df142 100644 --- a/awx/main/tests/functional/api/test_instance_group.py +++ b/awx/main/tests/functional/api/test_instance_group.py @@ -13,7 +13,7 @@ from awx.main.utils import camelcase_to_underscore @pytest.fixture def tower_instance_group(): - ig = InstanceGroup(name='tower') + ig = InstanceGroup(name='default') ig.save() return ig @@ -117,8 +117,8 @@ def test_delete_rename_tower_instance_group_prevented(delete, options, tower_ins assert 'GET' in resp.data['actions'] assert 'PUT' in resp.data['actions'] - # Rename 'tower' instance group denied - patch(url, {'name': 'tower_prime'}, super_user, expect=400) + # Rename 'default' instance group denied + patch(url, {'name': 'default_prime'}, super_user, expect=400) # Rename, other instance group OK url = reverse("api:instance_group_detail", kwargs={'pk': instance_group.pk}) diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py index ee05150255..d50833fd8e 100644 --- a/awx/main/tests/functional/task_management/test_capacity.py +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -10,21 +10,21 @@ class TestCapacityMapping(TransactionTestCase): def sample_cluster(self): ig_small = InstanceGroup.objects.create(name='ig_small') ig_large = InstanceGroup.objects.create(name='ig_large') - tower = InstanceGroup.objects.create(name='tower') + default = InstanceGroup.objects.create(name='default') i1 = Instance.objects.create(hostname='i1', capacity=200) i2 = Instance.objects.create(hostname='i2', capacity=200) i3 = Instance.objects.create(hostname='i3', capacity=200) ig_small.instances.add(i1) ig_large.instances.add(i2, i3) - tower.instances.add(i2) - return [tower, ig_large, ig_small] + default.instances.add(i2) + return [default, ig_large, ig_small] def test_mapping(self): self.sample_cluster() with self.assertNumQueries(2): inst_map, ig_map = InstanceGroup.objects.capacity_mapping() assert inst_map['i1'] == set(['ig_small']) - assert inst_map['i2'] == set(['ig_large', 'tower']) + assert inst_map['i2'] == set(['ig_large', 'default']) assert ig_map['ig_small'] == set(['ig_small']) - assert ig_map['ig_large'] == set(['ig_large', 'tower']) - assert ig_map['tower'] == set(['ig_large', 'tower']) + assert ig_map['ig_large'] == set(['ig_large', 'default']) + assert ig_map['default'] == set(['ig_large', 'default']) diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py index cbb4124b52..8f35210088 100644 --- a/awx/main/tests/unit/test_capacity.py +++ b/awx/main/tests/unit/test_capacity.py @@ -43,34 +43,34 @@ def sample_cluster(): ig_small = InstanceGroup(name='ig_small') ig_large = InstanceGroup(name='ig_large') - tower = InstanceGroup(name='tower') + default = InstanceGroup(name='default') i1 = Instance(hostname='i1', capacity=200) i2 = Instance(hostname='i2', capacity=200) i3 = Instance(hostname='i3', capacity=200) ig_small.instances.add(i1) ig_large.instances.add(i2, i3) - tower.instances.add(i2) - return [tower, ig_large, ig_small] + default.instances.add(i2) + return [default, ig_large, ig_small] return stand_up_cluster def test_committed_capacity(sample_cluster): - tower, ig_large, ig_small = sample_cluster() - tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True) + default, ig_large, ig_small = sample_cluster() + tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True) # Jobs submitted to either tower or ig_larg must count toward both - assert capacities['tower']['committed_capacity'] == 43 * 2 + assert capacities['default']['committed_capacity'] == 43 * 2 assert capacities['ig_large']['committed_capacity'] == 43 * 2 assert capacities['ig_small']['committed_capacity'] == 43 def test_running_capacity(sample_cluster): - tower, ig_large, ig_small = sample_cluster() + default, ig_large, ig_small = sample_cluster() tasks = [Job(status='running', execution_node='i1'), Job(status='running', execution_node='i2'), Job(status='running', execution_node='i3')] - capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True) + capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True) # Tower is only given 1 instance - assert capacities['tower']['running_capacity'] == 43 + assert capacities['default']['running_capacity'] == 43 # Large IG has 2 instances assert capacities['ig_large']['running_capacity'] == 43 * 2 assert capacities['ig_small']['running_capacity'] == 43 @@ -81,10 +81,10 @@ def test_offline_node_running(sample_cluster): Assure that algorithm doesn't explode if a job is marked running in an offline node """ - tower, ig_large, ig_small = sample_cluster() + default, ig_large, ig_small = sample_cluster() ig_small.instance_list[0].capacity = 0 tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks) + capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks) assert capacities['ig_small']['consumed_capacity'] == 43 @@ -92,10 +92,10 @@ def test_offline_node_waiting(sample_cluster): """ Same but for a waiting job """ - tower, ig_large, ig_small = sample_cluster() + default, ig_large, ig_small = sample_cluster() ig_small.instance_list[0].capacity = 0 tasks = [Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks) + capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks) assert capacities['ig_small']['consumed_capacity'] == 43 @@ -105,9 +105,9 @@ def test_RBAC_reduced_filter(sample_cluster): but user does not have permission to see those actual instance groups. Verify that this does not blow everything up. """ - tower, ig_large, ig_small = sample_cluster() - tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[tower], tasks=tasks, breakdown=True) + default, ig_large, ig_small = sample_cluster() + tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values(qs=[default], tasks=tasks, breakdown=True) # Cross-links between groups not visible to current user, # so a naieve accounting of capacities is returned instead - assert capacities['tower']['committed_capacity'] == 43 + assert capacities['default']['committed_capacity'] == 43 diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1f4853b11d..98561c98fa 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -945,3 +945,8 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} + +# Name of the default task queue +DEFAULT_EXECUTION_QUEUE_NAME = 'default' +# Name of the default controlplane queue +DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'