diff --git a/Makefile b/Makefile index ae70de0583..db670a123e 100644 --- a/Makefile +++ b/Makefile @@ -173,7 +173,7 @@ init: . $(VENV_BASE)/awx/bin/activate; \ fi; \ $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \ - $(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100; + $(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100; # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 7740c3c003..6613ca5858 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -247,7 +247,7 @@ class IsSuperUser(permissions.BasePermission): class InstanceGroupTowerPermission(ModelAccessPermission): def has_object_permission(self, request, view, obj): - if request.method == 'DELETE' and obj.name == settings.DEFAULT_QUEUE_NAME: + if request.method == 'DELETE' and obj.name in [settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]: return False return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index cda97a5e8d..733b1f6925 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4918,8 +4918,12 @@ class InstanceGroupSerializer(BaseSerializer): return value def validate_name(self, value): - if self.instance and self.instance.name == settings.DEFAULT_QUEUE_NAME and value != settings.DEFAULT_QUEUE_NAME: - raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_QUEUE_NAME)) + if self.instance and self.instance.name == settings.DEFAULT_EXECUTION_QUEUE_NAME and value != settings.DEFAULT_EXECUTION_QUEUE_NAME: + raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_EXECUTION_QUEUE_NAME)) + + if self.instance and self.instance.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME and value != settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: + raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)) + return value def validate_credential(self, value): diff --git a/awx/main/managers.py b/awx/main/managers.py index 3359a6d7e3..955d730213 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -33,7 +33,7 @@ class HostManager(models.Manager): - Only consider results that are unique - Return the count of this query """ - return self.order_by().exclude(inventory_sources__source=settings.DEFAULT_QUEUE_NAME).values('name').distinct().count() + return self.order_by().exclude(inventory_sources__source='tower').values('name').distinct().count() def org_active_count(self, org_id): """Return count of active, unique hosts used by an organization. @@ -146,8 +146,8 @@ class InstanceManager(models.Manager): pod_ip = os.environ.get('MY_POD_IP') registered = self.register(ip_address=pod_ip) - is_container_group = settings.IS_K8S - RegisterQueue('tower', 100, 0, [], is_container_group).register() + RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register() + RegisterQueue(settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True).register() return registered else: return (False, self.me()) @@ -156,10 +156,6 @@ class InstanceManager(models.Manager): """Return count of active Tower nodes for licensing.""" return self.all().count() - def my_role(self): - # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing - return "tower" - class InstanceGroupManager(models.Manager): """A custom manager class for the Instance model. diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index fd5ec56596..a132a2ae6c 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -130,12 +130,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): return self.modified < ref_time - timedelta(seconds=grace_period) def refresh_capacity(self): - if settings.IS_K8S: - self.capacity = self.cpu = self.memory = self.cpu_capacity = self.mem_capacity = 0 # noqa - self.version = awx_application_version - self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']) - return - cpu = get_cpu_capacity() mem = get_mem_capacity() if self.enabled: diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 38d7ebd805..43842c1346 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1265,10 +1265,12 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): return UnpartitionedSystemJobEvent return SystemJobEvent + @property + def can_run_on_control_plane(self): + return True + @property def task_impact(self): - if settings.IS_K8S: - return 0 return 5 @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7192265412..d2b62eac7e 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -553,6 +553,10 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage websocket_data.update(dict(project_id=self.project.id)) return websocket_data + @property + def can_run_on_control_plane(self): + return True + @property def event_class(self): if self.has_unpartitioned_events: @@ -561,8 +565,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage @property def task_impact(self): - if settings.IS_K8S: - return 0 return 0 if self.job_type == 'run' else 1 @property @@ -623,6 +625,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage organization_groups = [] template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups] selected_groups = template_groups + organization_groups + if not any([not group.is_container_group for group in selected_groups]): + selected_groups = selected_groups + list(self.control_plane_instance_group) if not selected_groups: return self.global_instance_groups return selected_groups diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 273f894135..2b2f05a51a 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -736,6 +736,13 @@ class UnifiedJob( def _get_task_class(cls): raise NotImplementedError # Implement in subclasses. + @property + def can_run_on_control_plane(self): + if settings.IS_K8S: + return False + + return True + @property def can_run_containerized(self): return False @@ -1415,14 +1422,21 @@ class UnifiedJob( return [] return list(self.unified_job_template.instance_groups.all()) + @property + def control_plane_instance_group(self): + from awx.main.models.ha import InstanceGroup + + control_plane_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME) + + return list(control_plane_instance_group) + @property def global_instance_groups(self): from awx.main.models.ha import InstanceGroup - default_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_QUEUE_NAME) - if default_instance_group.exists(): - return [default_instance_group.first()] - return [] + default_instance_groups = InstanceGroup.objects.filter(name__in=[settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]) + + return list(default_instance_groups) def awx_meta_vars(self): """ diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 36c0d879b8..867a4d649a 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -474,6 +474,7 @@ class TaskManager: tasks_to_update_job_explanation.append(task) continue preferred_instance_groups = task.preferred_instance_groups + found_acceptable_queue = False if isinstance(task, WorkflowJob): if task.unified_job_template_id in running_workflow_templates: @@ -484,6 +485,7 @@ class TaskManager: running_workflow_templates.add(task.unified_job_template_id) self.start_task(task, None, task.get_jobs_fail_chain(), None) continue + for rampart_group in preferred_instance_groups: if task.can_run_containerized and rampart_group.is_container_group: self.graph[rampart_group.name]['graph'].add_job(task) @@ -491,12 +493,12 @@ class TaskManager: found_acceptable_queue = True break + if not task.can_run_on_control_plane: + logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name)) + continue + remaining_capacity = self.get_remaining_capacity(rampart_group.name) - if ( - task.task_impact > 0 - and not rampart_group.is_container_group # project updates have a cost of zero - and self.get_remaining_capacity(rampart_group.name) <= 0 - ): + if task.task_impact > 0 and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity)) continue diff --git a/awx/main/tests/functional/models/test_inventory.py b/awx/main/tests/functional/models/test_inventory.py index 31bf62a168..eab7e02895 100644 --- a/awx/main/tests/functional/models/test_inventory.py +++ b/awx/main/tests/functional/models/test_inventory.py @@ -104,7 +104,7 @@ class TestActiveCount: def test_active_count_minus_tower(self, inventory): inventory.hosts.create(name='locally-managed-host') - source = inventory.inventory_sources.create(name='tower-source', source='default') + source = inventory.inventory_sources.create(name='tower-source', source='tower') source.hosts.create(name='remotely-managed-host', inventory=inventory) assert Host.objects.active_count() == 1 diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 67d9a39005..98561c98fa 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -946,5 +946,7 @@ BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} -# Default name of the task queue -DEFAULT_QUEUE_NAME = 'default' +# Name of the default task queue +DEFAULT_EXECUTION_QUEUE_NAME = 'default' +# Name of the default controlplane queue +DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'