Merge pull request #10324 from shanemcd/default_queue_name

Introduce distinct controlplane instance group

Reviewed-by: Alan Rominger <arominge@redhat.com>
Reviewed-by: Shane McDonald <me@shanemcd.com>
Reviewed-by: Matthew Jones <bsdmatburt@gmail.com>
Reviewed-by: Yanis Guenane <None>
Reviewed-by: Bianca Henderson <beeankha@gmail.com>
This commit is contained in:
softwarefactory-project-zuul[bot]
2021-06-08 19:18:35 +00:00
committed by GitHub
15 changed files with 98 additions and 57 deletions

View File

@@ -173,7 +173,7 @@ init:
. $(VENV_BASE)/awx/bin/activate; \ . $(VENV_BASE)/awx/bin/activate; \
fi; \ fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \ $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \
$(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100; $(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;
# Refresh development environment after pulling new code. # Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate refresh: clean requirements_dev version_file develop migrate

View File

@@ -4,6 +4,8 @@
# Python # Python
import logging import logging
from django.conf import settings
# Django REST Framework # Django REST Framework
from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.exceptions import MethodNotAllowed, PermissionDenied
from rest_framework import permissions from rest_framework import permissions
@@ -245,7 +247,7 @@ class IsSuperUser(permissions.BasePermission):
class InstanceGroupTowerPermission(ModelAccessPermission): class InstanceGroupTowerPermission(ModelAccessPermission):
def has_object_permission(self, request, view, obj): def has_object_permission(self, request, view, obj):
if request.method == 'DELETE' and obj.name == "tower": if request.method == 'DELETE' and obj.name in [settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]:
return False return False
return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj) return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj)

View File

@@ -4918,8 +4918,12 @@ class InstanceGroupSerializer(BaseSerializer):
return value return value
def validate_name(self, value): def validate_name(self, value):
if self.instance and self.instance.name == 'tower' and value != 'tower': if self.instance and self.instance.name == settings.DEFAULT_EXECUTION_QUEUE_NAME and value != settings.DEFAULT_EXECUTION_QUEUE_NAME:
raise serializers.ValidationError(_('tower instance group name may not be changed.')) raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_EXECUTION_QUEUE_NAME))
if self.instance and self.instance.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME and value != settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME))
return value return value
def validate_credential(self, value): def validate_credential(self, value):

View File

@@ -177,6 +177,24 @@ register(
read_only=True, read_only=True,
) )
register(
'DEFAULT_CONTROL_PLANE_QUEUE_NAME',
field_class=fields.CharField,
label=_('The instance group where control plane tasks run'),
category=_('System'),
category_slug='system',
read_only=True,
)
register(
'DEFAULT_EXECUTION_QUEUE_NAME',
field_class=fields.CharField,
label=_('The instance group where user jobs run (currently only on non-VM installs)'),
category=_('System'),
category_slug='system',
read_only=True,
)
register( register(
'DEFAULT_EXECUTION_ENVIRONMENT', 'DEFAULT_EXECUTION_ENVIRONMENT',
field_class=fields.PrimaryKeyRelatedField, field_class=fields.PrimaryKeyRelatedField,

View File

@@ -146,8 +146,8 @@ class InstanceManager(models.Manager):
pod_ip = os.environ.get('MY_POD_IP') pod_ip = os.environ.get('MY_POD_IP')
registered = self.register(ip_address=pod_ip) registered = self.register(ip_address=pod_ip)
is_container_group = settings.IS_K8S RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue('tower', 100, 0, [], is_container_group).register() RegisterQueue(settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True).register()
return registered return registered
else: else:
return (False, self.me()) return (False, self.me())
@@ -156,10 +156,6 @@ class InstanceManager(models.Manager):
"""Return count of active Tower nodes for licensing.""" """Return count of active Tower nodes for licensing."""
return self.all().count() return self.all().count()
def my_role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "tower"
class InstanceGroupManager(models.Manager): class InstanceGroupManager(models.Manager):
"""A custom manager class for the Instance model. """A custom manager class for the Instance model.

View File

@@ -130,12 +130,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
return self.modified < ref_time - timedelta(seconds=grace_period) return self.modified < ref_time - timedelta(seconds=grace_period)
def refresh_capacity(self): def refresh_capacity(self):
if settings.IS_K8S:
self.capacity = self.cpu = self.memory = self.cpu_capacity = self.mem_capacity = 0 # noqa
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
return
cpu = get_cpu_capacity() cpu = get_cpu_capacity()
mem = get_mem_capacity() mem = get_mem_capacity()
if self.enabled: if self.enabled:

View File

@@ -1253,10 +1253,12 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
return UnpartitionedSystemJobEvent return UnpartitionedSystemJobEvent
return SystemJobEvent return SystemJobEvent
@property
def can_run_on_control_plane(self):
return True
@property @property
def task_impact(self): def task_impact(self):
if settings.IS_K8S:
return 0
return 5 return 5
@property @property

View File

@@ -553,6 +553,10 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
websocket_data.update(dict(project_id=self.project.id)) websocket_data.update(dict(project_id=self.project.id))
return websocket_data return websocket_data
@property
def can_run_on_control_plane(self):
return True
@property @property
def event_class(self): def event_class(self):
if self.has_unpartitioned_events: if self.has_unpartitioned_events:
@@ -561,8 +565,6 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
@property @property
def task_impact(self): def task_impact(self):
if settings.IS_K8S:
return 0
return 0 if self.job_type == 'run' else 1 return 0 if self.job_type == 'run' else 1
@property @property
@@ -623,6 +625,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
organization_groups = [] organization_groups = []
template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups] template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups]
selected_groups = template_groups + organization_groups selected_groups = template_groups + organization_groups
if not any([not group.is_container_group for group in selected_groups]):
selected_groups = selected_groups + list(self.control_plane_instance_group)
if not selected_groups: if not selected_groups:
return self.global_instance_groups return self.global_instance_groups
return selected_groups return selected_groups

View File

@@ -736,6 +736,13 @@ class UnifiedJob(
def _get_task_class(cls): def _get_task_class(cls):
raise NotImplementedError # Implement in subclasses. raise NotImplementedError # Implement in subclasses.
@property
def can_run_on_control_plane(self):
if settings.IS_K8S:
return False
return True
@property @property
def can_run_containerized(self): def can_run_containerized(self):
return False return False
@@ -1415,14 +1422,21 @@ class UnifiedJob(
return [] return []
return list(self.unified_job_template.instance_groups.all()) return list(self.unified_job_template.instance_groups.all())
@property
def control_plane_instance_group(self):
from awx.main.models.ha import InstanceGroup
control_plane_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)
return list(control_plane_instance_group)
@property @property
def global_instance_groups(self): def global_instance_groups(self):
from awx.main.models.ha import InstanceGroup from awx.main.models.ha import InstanceGroup
default_instance_group = InstanceGroup.objects.filter(name='tower') default_instance_groups = InstanceGroup.objects.filter(name__in=[settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME])
if default_instance_group.exists():
return [default_instance_group.first()] return list(default_instance_groups)
return []
def awx_meta_vars(self): def awx_meta_vars(self):
""" """

View File

@@ -474,6 +474,7 @@ class TaskManager:
tasks_to_update_job_explanation.append(task) tasks_to_update_job_explanation.append(task)
continue continue
preferred_instance_groups = task.preferred_instance_groups preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False found_acceptable_queue = False
if isinstance(task, WorkflowJob): if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates: if task.unified_job_template_id in running_workflow_templates:
@@ -484,6 +485,7 @@ class TaskManager:
running_workflow_templates.add(task.unified_job_template_id) running_workflow_templates.add(task.unified_job_template_id)
self.start_task(task, None, task.get_jobs_fail_chain(), None) self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue continue
for rampart_group in preferred_instance_groups: for rampart_group in preferred_instance_groups:
if task.can_run_containerized and rampart_group.is_container_group: if task.can_run_containerized and rampart_group.is_container_group:
self.graph[rampart_group.name]['graph'].add_job(task) self.graph[rampart_group.name]['graph'].add_job(task)
@@ -491,12 +493,12 @@ class TaskManager:
found_acceptable_queue = True found_acceptable_queue = True
break break
if not task.can_run_on_control_plane:
logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name))
continue
remaining_capacity = self.get_remaining_capacity(rampart_group.name) remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if ( if task.task_impact > 0 and self.get_remaining_capacity(rampart_group.name) <= 0:
task.task_impact > 0
and not rampart_group.is_container_group # project updates have a cost of zero
and self.get_remaining_capacity(rampart_group.name) <= 0
):
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity)) logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity))
continue continue

View File

@@ -81,7 +81,7 @@ def instance_group_factory():
@pytest.fixture @pytest.fixture
def default_instance_group(instance_factory, instance_group_factory): def default_instance_group(instance_factory, instance_group_factory):
return create_instance_group("tower", instances=[create_instance("hostA")]) return create_instance_group("default", instances=[create_instance("hostA")])
@pytest.fixture @pytest.fixture

View File

@@ -13,7 +13,7 @@ from awx.main.utils import camelcase_to_underscore
@pytest.fixture @pytest.fixture
def tower_instance_group(): def tower_instance_group():
ig = InstanceGroup(name='tower') ig = InstanceGroup(name='default')
ig.save() ig.save()
return ig return ig
@@ -117,8 +117,8 @@ def test_delete_rename_tower_instance_group_prevented(delete, options, tower_ins
assert 'GET' in resp.data['actions'] assert 'GET' in resp.data['actions']
assert 'PUT' in resp.data['actions'] assert 'PUT' in resp.data['actions']
# Rename 'tower' instance group denied # Rename 'default' instance group denied
patch(url, {'name': 'tower_prime'}, super_user, expect=400) patch(url, {'name': 'default_prime'}, super_user, expect=400)
# Rename, other instance group OK # Rename, other instance group OK
url = reverse("api:instance_group_detail", kwargs={'pk': instance_group.pk}) url = reverse("api:instance_group_detail", kwargs={'pk': instance_group.pk})

View File

@@ -10,21 +10,21 @@ class TestCapacityMapping(TransactionTestCase):
def sample_cluster(self): def sample_cluster(self):
ig_small = InstanceGroup.objects.create(name='ig_small') ig_small = InstanceGroup.objects.create(name='ig_small')
ig_large = InstanceGroup.objects.create(name='ig_large') ig_large = InstanceGroup.objects.create(name='ig_large')
tower = InstanceGroup.objects.create(name='tower') default = InstanceGroup.objects.create(name='default')
i1 = Instance.objects.create(hostname='i1', capacity=200) i1 = Instance.objects.create(hostname='i1', capacity=200)
i2 = Instance.objects.create(hostname='i2', capacity=200) i2 = Instance.objects.create(hostname='i2', capacity=200)
i3 = Instance.objects.create(hostname='i3', capacity=200) i3 = Instance.objects.create(hostname='i3', capacity=200)
ig_small.instances.add(i1) ig_small.instances.add(i1)
ig_large.instances.add(i2, i3) ig_large.instances.add(i2, i3)
tower.instances.add(i2) default.instances.add(i2)
return [tower, ig_large, ig_small] return [default, ig_large, ig_small]
def test_mapping(self): def test_mapping(self):
self.sample_cluster() self.sample_cluster()
with self.assertNumQueries(2): with self.assertNumQueries(2):
inst_map, ig_map = InstanceGroup.objects.capacity_mapping() inst_map, ig_map = InstanceGroup.objects.capacity_mapping()
assert inst_map['i1'] == set(['ig_small']) assert inst_map['i1'] == set(['ig_small'])
assert inst_map['i2'] == set(['ig_large', 'tower']) assert inst_map['i2'] == set(['ig_large', 'default'])
assert ig_map['ig_small'] == set(['ig_small']) assert ig_map['ig_small'] == set(['ig_small'])
assert ig_map['ig_large'] == set(['ig_large', 'tower']) assert ig_map['ig_large'] == set(['ig_large', 'default'])
assert ig_map['tower'] == set(['ig_large', 'tower']) assert ig_map['default'] == set(['ig_large', 'default'])

View File

@@ -43,34 +43,34 @@ def sample_cluster():
ig_small = InstanceGroup(name='ig_small') ig_small = InstanceGroup(name='ig_small')
ig_large = InstanceGroup(name='ig_large') ig_large = InstanceGroup(name='ig_large')
tower = InstanceGroup(name='tower') default = InstanceGroup(name='default')
i1 = Instance(hostname='i1', capacity=200) i1 = Instance(hostname='i1', capacity=200)
i2 = Instance(hostname='i2', capacity=200) i2 = Instance(hostname='i2', capacity=200)
i3 = Instance(hostname='i3', capacity=200) i3 = Instance(hostname='i3', capacity=200)
ig_small.instances.add(i1) ig_small.instances.add(i1)
ig_large.instances.add(i2, i3) ig_large.instances.add(i2, i3)
tower.instances.add(i2) default.instances.add(i2)
return [tower, ig_large, ig_small] return [default, ig_large, ig_small]
return stand_up_cluster return stand_up_cluster
def test_committed_capacity(sample_cluster): def test_committed_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster() default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True) capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True)
# Jobs submitted to either tower or ig_larg must count toward both # Jobs submitted to either tower or ig_larg must count toward both
assert capacities['tower']['committed_capacity'] == 43 * 2 assert capacities['default']['committed_capacity'] == 43 * 2
assert capacities['ig_large']['committed_capacity'] == 43 * 2 assert capacities['ig_large']['committed_capacity'] == 43 * 2
assert capacities['ig_small']['committed_capacity'] == 43 assert capacities['ig_small']['committed_capacity'] == 43
def test_running_capacity(sample_cluster): def test_running_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster() default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='running', execution_node='i1'), Job(status='running', execution_node='i2'), Job(status='running', execution_node='i3')] tasks = [Job(status='running', execution_node='i1'), Job(status='running', execution_node='i2'), Job(status='running', execution_node='i3')]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True) capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True)
# Tower is only given 1 instance # Tower is only given 1 instance
assert capacities['tower']['running_capacity'] == 43 assert capacities['default']['running_capacity'] == 43
# Large IG has 2 instances # Large IG has 2 instances
assert capacities['ig_large']['running_capacity'] == 43 * 2 assert capacities['ig_large']['running_capacity'] == 43 * 2
assert capacities['ig_small']['running_capacity'] == 43 assert capacities['ig_small']['running_capacity'] == 43
@@ -81,10 +81,10 @@ def test_offline_node_running(sample_cluster):
Assure that algorithm doesn't explode if a job is marked running Assure that algorithm doesn't explode if a job is marked running
in an offline node in an offline node
""" """
tower, ig_large, ig_small = sample_cluster() default, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0 ig_small.instance_list[0].capacity = 0
tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks) capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43 assert capacities['ig_small']['consumed_capacity'] == 43
@@ -92,10 +92,10 @@ def test_offline_node_waiting(sample_cluster):
""" """
Same but for a waiting job Same but for a waiting job
""" """
tower, ig_large, ig_small = sample_cluster() default, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0 ig_small.instance_list[0].capacity = 0
tasks = [Job(status='waiting', instance_group=ig_small)] tasks = [Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks) capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43 assert capacities['ig_small']['consumed_capacity'] == 43
@@ -105,9 +105,9 @@ def test_RBAC_reduced_filter(sample_cluster):
but user does not have permission to see those actual instance groups. but user does not have permission to see those actual instance groups.
Verify that this does not blow everything up. Verify that this does not blow everything up.
""" """
tower, ig_large, ig_small = sample_cluster() default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower], tasks=tasks, breakdown=True) capacities = InstanceGroup.objects.capacity_values(qs=[default], tasks=tasks, breakdown=True)
# Cross-links between groups not visible to current user, # Cross-links between groups not visible to current user,
# so a naieve accounting of capacities is returned instead # so a naieve accounting of capacities is returned instead
assert capacities['tower']['committed_capacity'] == 43 assert capacities['default']['committed_capacity'] == 43

View File

@@ -945,3 +945,8 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
# Name of the default task queue
DEFAULT_EXECUTION_QUEUE_NAME = 'default'
# Name of the default controlplane queue
DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'