From c8f1e714e174b484f08f26d276c70dec86b0a84a Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 13 Dec 2021 15:40:36 -0500 Subject: [PATCH 01/11] Capture hop nodes and the peer links between nodes --- .../migrations/0156_capture_mesh_topology.py | 44 +++++++++++++++++++ awx/main/models/__init__.py | 1 + awx/main/models/ha.py | 17 ++++++- 3 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 awx/main/migrations/0156_capture_mesh_topology.py diff --git a/awx/main/migrations/0156_capture_mesh_topology.py b/awx/main/migrations/0156_capture_mesh_topology.py new file mode 100644 index 0000000000..90f5a5e0a2 --- /dev/null +++ b/awx/main/migrations/0156_capture_mesh_topology.py @@ -0,0 +1,44 @@ +# Generated by Django 2.2.20 on 2021-12-17 19:26 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0155_improved_health_check'), + ] + + operations = [ + migrations.AlterField( + model_name='instance', + name='node_type', + field=models.CharField( + choices=[ + ('control', 'Control plane node'), + ('execution', 'Execution plane node'), + ('hybrid', 'Controller and execution'), + ('hop', 'Message-passing node, no execution capability'), + ], + default='hybrid', + max_length=16, + ), + ), + migrations.CreateModel( + name='InstanceLink', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='+', to='main.Instance')), + ('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reverse_peers', to='main.Instance')), + ], + options={ + 'unique_together': {('source', 'target')}, + }, + ), + migrations.AddField( + model_name='instance', + name='peers', + field=models.ManyToManyField(through='main.InstanceLink', to='main.Instance'), + ), + ] diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index f439a692fb..ed49b98083 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -47,6 +47,7 @@ from awx.main.models.execution_environments import ExecutionEnvironment # noqa from awx.main.models.activity_stream import ActivityStream # noqa from awx.main.models.ha import ( # noqa Instance, + InstanceLink, InstanceGroup, TowerScheduleState, ) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index ea9f7d8d0e..c21c66b8df 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -54,6 +54,14 @@ class HasPolicyEditsMixin(HasEditsMixin): return self._values_have_edits(new_values) +class InstanceLink(BaseModel): + source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+') + target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers') + + class Meta: + unique_together = ('source', 'target') + + class Instance(HasPolicyEditsMixin, BaseModel): """A model representing an AWX instance running against this database.""" @@ -116,9 +124,16 @@ class Instance(HasPolicyEditsMixin, BaseModel): default=0, editable=False, ) - NODE_TYPE_CHOICES = [("control", "Control plane node"), ("execution", "Execution plane node"), ("hybrid", "Controller and execution")] + NODE_TYPE_CHOICES = [ + ("control", "Control plane node"), + ("execution", "Execution plane node"), + ("hybrid", "Controller and execution"), + ("hop", "Message-passing node, no execution capability"), + ] node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16) + peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target')) + class Meta: app_label = 'main' ordering = ("hostname",) From f340f491dc1ad61c6ecfaafce02ce351f6142086 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 16 Dec 2021 15:53:15 -0500 Subject: [PATCH 02/11] Control the visibility and use of hop node Instances - the list, detail, and health check API views should not include them - the Instance-InstanceGroup association views should not allow them to be changed - the ping view excludes them - list_instances management command excludes them - Instance.set_capacity_value sets hop nodes to 0 capacity - TaskManager will exclude them from the nodes available for job execution - TaskManager.reap_jobs_from_orphaned_instances will consider hop nodes to be an orphaned instance - The apply_cluster_membership_policies task will not manipulate hop nodes - get_broadcast_hosts will ignore hop nodes - active_count also will ignore hop nodes --- awx/api/views/__init__.py | 11 +++++++++-- awx/api/views/root.py | 2 +- awx/main/management/commands/list_instances.py | 2 +- awx/main/managers.py | 2 +- awx/main/models/ha.py | 6 +++--- awx/main/scheduler/task_manager.py | 7 +++---- awx/main/tasks.py | 5 +++-- awx/main/tests/functional/test_instances.py | 5 +++-- awx/main/wsbroadcast.py | 1 + 9 files changed, 25 insertions(+), 16 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index e9077f96f7..3dc6881333 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -364,11 +364,14 @@ class InstanceList(ListAPIView): serializer_class = serializers.InstanceSerializer search_fields = ('hostname',) + def get_queryset(self): + return super().get_queryset().exclude(node_type='hop') + class InstanceDetail(RetrieveUpdateAPIView): name = _("Instance Detail") - model = models.Instance + queryset = models.Instance.objects.exclude(node_type='hop') serializer_class = serializers.InstanceSerializer def update(self, request, *args, **kwargs): @@ -406,13 +409,15 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta def is_valid_relation(self, parent, sub, created=False): if parent.node_type == 'control': return {'msg': _(f"Cannot change instance group membership of control-only node: {parent.hostname}.")} + if parent.node_type == 'hop': + return {'msg': _(f"Cannot change instance group membership of hop node: {parent.hostname}.")} return None class InstanceHealthCheck(GenericAPIView): name = _('Instance Health Check') - model = models.Instance + queryset = models.Instance.objects.exclude(node_type='hop') serializer_class = serializers.InstanceHealthCheckSerializer permission_classes = (IsSystemAdminOrAuditor,) @@ -503,6 +508,8 @@ class InstanceGroupInstanceList(InstanceGroupMembershipMixin, SubListAttachDetac def is_valid_relation(self, parent, sub, created=False): if sub.node_type == 'control': return {'msg': _(f"Cannot change instance group membership of control-only node: {sub.hostname}.")} + if sub.node_type == 'hop': + return {'msg': _(f"Cannot change instance group membership of hop node: {sub.hostname}.")} return None diff --git a/awx/api/views/root.py b/awx/api/views/root.py index 110403ed5e..2f7f49a122 100644 --- a/awx/api/views/root.py +++ b/awx/api/views/root.py @@ -149,7 +149,7 @@ class ApiV2PingView(APIView): response = {'ha': is_ha_environment(), 'version': get_awx_version(), 'active_node': settings.CLUSTER_HOST_ID, 'install_uuid': settings.INSTALL_UUID} response['instances'] = [] - for instance in Instance.objects.all(): + for instance in Instance.objects.exclude(node_type='hop'): response['instances'].append( dict( node=instance.hostname, diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index bef9034774..15a8df4cb0 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -13,7 +13,7 @@ class Ungrouped(object): @property def instances(self): - return Instance.objects.filter(rampart_groups__isnull=True) + return Instance.objects.filter(rampart_groups__isnull=True).exclude(node_type='hop') @property def capacity(self): diff --git a/awx/main/managers.py b/awx/main/managers.py index 2091c36562..0bc30f649c 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -188,7 +188,7 @@ class InstanceManager(models.Manager): def active_count(self): """Return count of active Tower nodes for licensing.""" - return self.all().count() + return self.exclude(node_type='hop').count() class InstanceGroupManager(models.Manager): diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index c21c66b8df..78f07861d2 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -29,7 +29,7 @@ from awx.main.models.mixins import RelatedJobsMixin # ansible-runner from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes -__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState') +__all__ = ('Instance', 'InstanceGroup', 'InstanceLink', 'TowerScheduleState') logger = logging.getLogger('awx.main.models.ha') @@ -215,7 +215,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): def set_capacity_value(self): """Sets capacity according to capacity adjustment rule (no save)""" - if self.enabled: + if self.enabled and self.node_type != 'hop': lower_cap = min(self.mem_capacity, self.cpu_capacity) higher_cap = max(self.mem_capacity, self.cpu_capacity) self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment @@ -320,7 +320,7 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): @property def capacity(self): - return sum([inst.capacity for inst in self.instances.all()]) + return sum(inst.capacity for inst in self.instances.all()) @property def jobs_running(self): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 6576d99829..2544a062db 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -13,7 +13,6 @@ from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now from django.conf import settings -from django.db.models import Q # AWX from awx.main.dispatch.reaper import reap_job @@ -69,7 +68,7 @@ class TaskManager: """ Init AFTER we know this instance of the task manager will run because the lock is acquired. """ - instances = Instance.objects.filter(~Q(hostname=None), enabled=True) + instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop') self.real_instances = {i.hostname: i for i in instances} instances_partial = [ @@ -484,7 +483,7 @@ class TaskManager: return created_dependencies def process_pending_tasks(self, pending_tasks): - running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) + running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] for task in pending_tasks: if self.start_task_limit <= 0: @@ -593,7 +592,7 @@ class TaskManager: # elsewhere for j in UnifiedJob.objects.filter( status__in=['pending', 'waiting', 'running'], - ).exclude(execution_node__in=Instance.objects.values_list('hostname', flat=True)): + ).exclude(execution_node__in=Instance.objects.exclude(node_type='hop').values_list('hostname', flat=True)): if j.execution_node and not j.is_container_group_task: logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}') reap_job(j, 'failed') diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 05d01ed5e6..477cb1a47a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -202,7 +202,8 @@ def apply_cluster_membership_policies(): to_log = logger.debug to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time)) started_compute = time.time() - all_instances = list(Instance.objects.order_by('id')) + # Hop nodes should never get assigned to an InstanceGroup. + all_instances = list(Instance.objects.exclude(node_type='hop').order_by('id')) all_groups = list(InstanceGroup.objects.prefetch_related('instances')) total_instances = len(all_instances) @@ -253,7 +254,7 @@ def apply_cluster_membership_policies(): # Finally, process instance policy percentages for g in sorted(actual_groups, key=lambda x: len(x.instances)): exclude_type = 'execution' if g.obj.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME else 'control' - candidate_pool_ct = len([i for i in actual_instances if i.obj.node_type != exclude_type]) + candidate_pool_ct = sum(1 for i in actual_instances if i.obj.node_type != exclude_type) if not candidate_pool_ct: continue policy_per_added = [] diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index 21a17ff2b5..95bbd15014 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -84,8 +84,9 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan list_response2 = get(reverse('api:instance_list'), user=org_admin) api_num_instances_oa = list(list_response2.data.items())[0][1] - assert actual_num_instances == api_num_instances_auditor - # Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected + assert api_num_instances_auditor == actual_num_instances + # Note: The org_admin will not see the default 'tower' node + # (instance fixture) because it is not in its group, as expected assert api_num_instances_oa == (actual_num_instances - 1) diff --git a/awx/main/wsbroadcast.py b/awx/main/wsbroadcast.py index 1ec4fa1256..ec2fae5e89 100644 --- a/awx/main/wsbroadcast.py +++ b/awx/main/wsbroadcast.py @@ -35,6 +35,7 @@ def get_broadcast_hosts(): instances = ( Instance.objects.exclude(hostname=Instance.objects.me().hostname) .exclude(node_type='execution') + .exclude(node_type='hop') .order_by('hostname') .values('hostname', 'ip_address') .distinct() From 4449555abe6d1921d80efe7be7593267c3078593 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 17 Dec 2021 14:28:33 -0500 Subject: [PATCH 03/11] Add a new register_peers management command and alter provision_instance to accept hop nodes. --- .../management/commands/provision_instance.py | 12 ++++---- .../management/commands/register_peers.py | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 awx/main/management/commands/register_peers.py diff --git a/awx/main/management/commands/provision_instance.py b/awx/main/management/commands/provision_instance.py index 3056a09b9c..c3c8f188ab 100644 --- a/awx/main/management/commands/provision_instance.py +++ b/awx/main/management/commands/provision_instance.py @@ -13,19 +13,19 @@ class Command(BaseCommand): Register this instance with the database for HA tracking. """ - help = 'Add instance to the database. ' 'Specify `--hostname` to use this command.' + help = "Add instance to the database. Specify `--hostname` to use this command." def add_arguments(self, parser): - parser.add_argument('--hostname', dest='hostname', type=str, help='Hostname used during provisioning') - parser.add_argument('--node_type', type=str, default="hybrid", choices=["control", "execution", "hybrid"], help='Instance Node type') - parser.add_argument('--uuid', type=str, help='Instance UUID') + parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname used during provisioning") + parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type") + parser.add_argument('--uuid', type=str, help="Instance UUID") def _register_hostname(self, hostname, node_type, uuid): if not hostname: return (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, uuid=uuid) if changed: - print('Successfully registered instance {}'.format(hostname)) + print("Successfully registered instance {}".format(hostname)) else: print("Instance already registered {}".format(instance.hostname)) self.changed = changed @@ -37,4 +37,4 @@ class Command(BaseCommand): self.changed = False self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid')) if self.changed: - print('(changed: True)') + print("(changed: True)") diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py new file mode 100644 index 0000000000..3a9af7947f --- /dev/null +++ b/awx/main/management/commands/register_peers.py @@ -0,0 +1,30 @@ +from django.core.management.base import BaseCommand, CommandError + +from awx.main.models import Instance, InstanceLink + + +class Command(BaseCommand): + """ + Internal tower command. + Register the peers of a receptor node. + """ + + def add_arguments(self, parser): + parser.add_argument('source', type=str, help="") + parser.add_argument('--peers', type=str, nargs='+', required=True, help="") + + def handle(self, **options): + nodes = Instance.objects.in_bulk(field_name='hostname') + if options['source'] not in nodes: + raise CommandError() + missing_peers = set(options['peers']) - set(nodes) + if missing_peers: + raise CommandError() + + results = 0 + for target in options['peers']: + instance, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + if created: + results += 1 + + print(f"{results} new peer links added to the database.") From ce5aefd3d8953fc528575d60e736b0e48c5ee7ee Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 20 Dec 2021 15:23:32 -0500 Subject: [PATCH 04/11] Capture hop nodes and links in the automatic discovery machinery Also, make sure that the control service is turned on in the dev environment's hop node, so that it shows up in the Advertisements list. --- awx/main/tasks.py | 37 +++++++++++++------ .../sources/templates/receptor-hop.conf.j2 | 3 ++ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 477cb1a47a..d936d3e6ae 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -67,6 +67,7 @@ from awx.main.models import ( TowerScheduleState, Instance, InstanceGroup, + InstanceLink, UnifiedJob, Notification, Inventory, @@ -495,28 +496,29 @@ def execution_node_health_check(node): def inspect_execution_nodes(instance_list): with advisory_lock('inspect_execution_nodes_lock', wait=False): - node_lookup = {} - for inst in instance_list: - if inst.node_type == 'execution': - node_lookup[inst.hostname] = inst + node_lookup = {inst.hostname: inst for inst in instance_list} ctl = get_receptor_ctl() - connections = ctl.simple_command('status')['Advertisements'] + mesh_status = ctl.simple_command('status') + nowtime = now() - for ad in connections: + workers = mesh_status['Advertisements'] + for ad in workers: hostname = ad['NodeID'] - commands = ad.get('WorkCommands') or [] - worktypes = [] - for c in commands: - worktypes.append(c["WorkType"]) - if 'ansible-runner' not in worktypes: + if ad.get('WorkCommands') is None: + node_type = 'hop' + elif any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []): + node_type = 'execution' + else: continue + changed = False if hostname in node_lookup: instance = node_lookup[hostname] elif settings.MESH_AUTODISCOVERY_ENABLED: defaults = dict(enabled=False) - (changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults) + (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, defaults=defaults) + node_lookup[hostname] = instance logger.warn(f"Registered execution node '{hostname}' (marked disabled by default)") else: logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}") @@ -546,6 +548,17 @@ def inspect_execution_nodes(instance_list): logger.debug(f'Restarting health check for execution node {hostname} with known errors.') execution_node_health_check.apply_async([hostname]) + links = {tuple(sorted((node, peer))) for node, peers in mesh_status['KnownConnectionCosts'].items() for peer in peers} + for a, b in links: + if a not in node_lookup: + logger.warn(f"Cannot link {a} to {b}: {a} not registered.") + continue + if b not in node_lookup: + logger.warn(f"Cannot link {a} to {b}: {b} not registered.") + continue + a_obj, b_obj = node_lookup[a], node_lookup[b] + InstanceLink.objects.get_or_create(source=a_obj, target=b_obj) + @task(queue=get_local_queuename) def cluster_node_heartbeat(): diff --git a/tools/docker-compose/ansible/roles/sources/templates/receptor-hop.conf.j2 b/tools/docker-compose/ansible/roles/sources/templates/receptor-hop.conf.j2 index 4f055fa1f3..f0bd884f04 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/receptor-hop.conf.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/receptor-hop.conf.j2 @@ -10,3 +10,6 @@ - tcp-listener: port: 5555 + +- control-service: + service: control From fc2a5224efa01376218e4dbfa1789c56a8be6345 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 6 Jan 2022 15:39:16 -0500 Subject: [PATCH 05/11] Add error messages to the new register_peers command --- awx/main/management/commands/register_peers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 3a9af7947f..19b40c0c59 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -16,10 +16,11 @@ class Command(BaseCommand): def handle(self, **options): nodes = Instance.objects.in_bulk(field_name='hostname') if options['source'] not in nodes: - raise CommandError() + raise CommandError(f"Host {options['source']} is not a registered instance.") missing_peers = set(options['peers']) - set(nodes) if missing_peers: - raise CommandError() + missing = ' '.join(missing_peers) + raise CommandError(f"Peers not currently registered as instances: {missing}") results = 0 for target in options['peers']: From f1c5da7026a24118e34542970f03e03f41d9bbb6 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 7 Jan 2022 14:27:17 -0500 Subject: [PATCH 06/11] Remove the auto-discovery feature --- awx/main/tasks.py | 23 +---------------------- awx/settings/defaults.py | 4 ---- awx/settings/development.py | 3 --- 3 files changed, 1 insertion(+), 29 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d936d3e6ae..c46fa1ade6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -67,7 +67,6 @@ from awx.main.models import ( TowerScheduleState, Instance, InstanceGroup, - InstanceLink, UnifiedJob, Notification, Inventory, @@ -505,21 +504,12 @@ def inspect_execution_nodes(instance_list): workers = mesh_status['Advertisements'] for ad in workers: hostname = ad['NodeID'] - if ad.get('WorkCommands') is None: - node_type = 'hop' - elif any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []): - node_type = 'execution' - else: + if not any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []): continue changed = False if hostname in node_lookup: instance = node_lookup[hostname] - elif settings.MESH_AUTODISCOVERY_ENABLED: - defaults = dict(enabled=False) - (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, defaults=defaults) - node_lookup[hostname] = instance - logger.warn(f"Registered execution node '{hostname}' (marked disabled by default)") else: logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}") continue @@ -548,17 +538,6 @@ def inspect_execution_nodes(instance_list): logger.debug(f'Restarting health check for execution node {hostname} with known errors.') execution_node_health_check.apply_async([hostname]) - links = {tuple(sorted((node, peer))) for node, peers in mesh_status['KnownConnectionCosts'].items() for peer in peers} - for a, b in links: - if a not in node_lookup: - logger.warn(f"Cannot link {a} to {b}: {a} not registered.") - continue - if b not in node_lookup: - logger.warn(f"Cannot link {a} to {b}: {b} not registered.") - continue - a_obj, b_obj = node_lookup[a], node_lookup[b] - InstanceLink.objects.get_or_create(source=a_obj, target=b_obj) - @task(queue=get_local_queuename) def cluster_node_heartbeat(): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 5fbc0dfd32..4bc0b56511 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -262,10 +262,6 @@ CSRF_COOKIE_SECURE = True # Limit CSRF cookies to browser sessions CSRF_COOKIE_AGE = None -# Auto-discover new instances that appear on receptor mesh -# used for docker-compose environment, unsupported -MESH_AUTODISCOVERY_ENABLED = False - TEMPLATES = [ { 'NAME': 'default', diff --git a/awx/settings/development.py b/awx/settings/development.py index 015cda16f1..8fb1c8c1c6 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -55,9 +55,6 @@ template['OPTIONS']['loaders'] = ('django.template.loaders.filesystem.Loader', ' PENDO_TRACKING_STATE = "off" INSIGHTS_TRACKING_STATE = False -# auto-discover receptor-* execution nodes -MESH_AUTODISCOVERY_ENABLED = True - # debug toolbar and swagger assume that requirements/requirements_dev.txt are installed INSTALLED_APPS += ['rest_framework_swagger', 'debug_toolbar'] # NOQA From 386aa898ecf3864fa7363f1b32c8069368831880 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 7 Jan 2022 16:10:27 -0500 Subject: [PATCH 07/11] Remove the make init target we want to fold that in to bootstrap_environment.sh. --- Makefile | 19 ------------------- tools/docker-compose/bootstrap_development.sh | 7 ++++--- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index ce82634869..bbc7cbc374 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,6 @@ COLLECTION_VERSION := $(shell $(PYTHON) setup.py --version | cut -d . -f 1-3) # NOTE: This defaults the container image version to the branch that's active COMPOSE_TAG ?= $(GIT_BRANCH) -COMPOSE_HOST ?= $(shell hostname) MAIN_NODE_TYPE ?= hybrid VENV_BASE ?= /var/lib/awx/venv @@ -145,24 +144,6 @@ version_file: fi; \ $(PYTHON) -c "import awx; print(awx.__version__)" > /var/lib/awx/.awx_version; \ -# Do any one-time init tasks. -comma := , -init: - if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/awx/bin/activate; \ - fi; \ - $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST) --node_type=$(MAIN_NODE_TYPE); \ - $(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;\ - $(MANAGEMENT_COMMAND) register_queue --queuename=default --instance_percent=100; - if [ ! -f /etc/receptor/certs/awx.key ]; then \ - rm -f /etc/receptor/certs/*; \ - receptor --cert-init commonname="AWX Test CA" bits=2048 outcert=/etc/receptor/certs/ca.crt outkey=/etc/receptor/certs/ca.key; \ - for node in $(RECEPTOR_MUTUAL_TLS); do \ - receptor --cert-makereq bits=2048 commonname="$$node test cert" dnsname=$$node nodeid=$$node outreq=/etc/receptor/certs/$$node.csr outkey=/etc/receptor/certs/$$node.key; \ - receptor --cert-signreq req=/etc/receptor/certs/$$node.csr cacert=/etc/receptor/certs/ca.crt cakey=/etc/receptor/certs/ca.key outcert=/etc/receptor/certs/$$node.crt verify=yes; \ - done; \ - fi - # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate diff --git a/tools/docker-compose/bootstrap_development.sh b/tools/docker-compose/bootstrap_development.sh index c33559c155..5d4f2c34a3 100755 --- a/tools/docker-compose/bootstrap_development.sh +++ b/tools/docker-compose/bootstrap_development.sh @@ -19,9 +19,6 @@ else wait-for-migrations fi -make init - - if output=$(awx-manage createsuperuser --noinput --username=admin --email=admin@localhost 2> /dev/null); then echo $output admin_password=$(openssl rand -base64 12) @@ -35,6 +32,10 @@ mkdir -p /awx_devel/awx/public/static mkdir -p /awx_devel/awx/ui/static mkdir -p /awx_devel/awx/ui/build/static +awx-manage provision_instance --hostname="$(hostname)" --node_type="$MAIN_NODE_TYPE" +awx-manage register_queue --queuename=controlplane --instance_percent=100 +awx-manage register_queue --queuename=default --instance_percent=100 + # Create resource entries when using Minikube if [[ -n "$MINIKUBE_CONTAINER_GROUP" ]]; then awx-manage shell < /awx_devel/tools/docker-compose-minikube/_sources/bootstrap_minikube.py From 37907ad34838522f94b5f49b5a9074727048a554 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 7 Jan 2022 16:43:59 -0500 Subject: [PATCH 08/11] Register the hop & execution nodes and all node links --- .../sources/templates/docker-compose.yml.j2 | 2 ++ tools/docker-compose/bootstrap_development.sh | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 b/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 index 3c6f71d9a5..bcc03b9a6c 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 @@ -19,6 +19,8 @@ services: AWX_GROUP_QUEUES: tower MAIN_NODE_TYPE: "${MAIN_NODE_TYPE:-hybrid}" RECEPTORCTL_SOCKET: {{ receptor_socket_file }} + CONTROL_PLANE_NODE_COUNT: {{ control_plane_node_count|int }} + EXECUTION_NODE_COUNT: {{ execution_node_count|int }} {% if loop.index == 1 %} RUN_MIGRATIONS: 1 {% endif %} diff --git a/tools/docker-compose/bootstrap_development.sh b/tools/docker-compose/bootstrap_development.sh index 5d4f2c34a3..d97d5fdfca 100755 --- a/tools/docker-compose/bootstrap_development.sh +++ b/tools/docker-compose/bootstrap_development.sh @@ -36,6 +36,23 @@ awx-manage provision_instance --hostname="$(hostname)" --node_type="$MAIN_NODE_T awx-manage register_queue --queuename=controlplane --instance_percent=100 awx-manage register_queue --queuename=default --instance_percent=100 +if [[ -n "$RUN_MIGRATIONS" ]]; then + for (( i=1; i<$CONTROL_PLANE_NODE_COUNT; i++ )); do + for (( j=i + 1; j<=$CONTROL_PLANE_NODE_COUNT; j++ )); do + awx-manage register_peers "awx_$i" --peers "awx_$j" + done + done + + if [[ $EXECUTION_NODE_COUNT > 0 ]]; then + awx-manage provision_instance --hostname="receptor-hop" --node_type="hop" + awx-manage register_peers "receptor-hop" --peers "awx_1" + for (( e=1; e<=$EXECUTION_NODE_COUNT; e++ )); do + awx-manage provision_instance --hostname="receptor-$e" --node_type="execution" + awx-manage register_peers "receptor-$e" --peers "receptor-hop" + done + fi +fi + # Create resource entries when using Minikube if [[ -n "$MINIKUBE_CONTAINER_GROUP" ]]; then awx-manage shell < /awx_devel/tools/docker-compose-minikube/_sources/bootstrap_minikube.py From 63867518eefd27abf9a18f24d07c00a65267a9f8 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 10 Jan 2022 14:15:58 -0500 Subject: [PATCH 09/11] Add a new parameter --disconnect to register_peers To allow links between Receptor nodes to be removed from the database. --- .../management/commands/register_peers.py | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 19b40c0c59..dd11efd6d1 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -9,23 +9,40 @@ class Command(BaseCommand): Register the peers of a receptor node. """ + help = "Register or remove links between Receptor nodes." + def add_arguments(self, parser): - parser.add_argument('source', type=str, help="") - parser.add_argument('--peers', type=str, nargs='+', required=True, help="") + parser.add_argument('source', type=str, help="Receptor node opening the connections.") + parser.add_argument('--peers', type=str, nargs='+', required=False, help="Nodes that the source node connects out to.") + parser.add_argument('--disconnect', type=str, nargs='+', required=False, help="Nodes that should no longer be connected to by the source node.") def handle(self, **options): nodes = Instance.objects.in_bulk(field_name='hostname') if options['source'] not in nodes: raise CommandError(f"Host {options['source']} is not a registered instance.") - missing_peers = set(options['peers']) - set(nodes) - if missing_peers: - missing = ' '.join(missing_peers) - raise CommandError(f"Peers not currently registered as instances: {missing}") + if not options['peers'] and not options['disconnect']: + raise CommandError("One of the options --peers or --disconnect is required.") - results = 0 - for target in options['peers']: - instance, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) - if created: - results += 1 + if options['peers']: + missing_peers = set(options['peers']) - set(nodes) + if missing_peers: + missing = ' '.join(missing_peers) + raise CommandError(f"Peers not currently registered as instances: {missing}") - print(f"{results} new peer links added to the database.") + results = 0 + for target in options['peers']: + instance, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + if created: + results += 1 + + print(f"{results} new peer links added to the database.") + + if options['disconnect']: + results = 0 + for target in options['disconnect']: + if target not in nodes: # Be permissive, the node might have already been de-registered. + continue + n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete() + results += n + + print(f"{results} peer links removed from the database.") From 5ffe91f069991597e882c1db52f0ae748788bd4d Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 10 Jan 2022 15:12:04 -0500 Subject: [PATCH 10/11] Add a new --exact parameter to register_peers --- .../management/commands/register_peers.py | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index dd11efd6d1..35a05ff195 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -1,4 +1,5 @@ from django.core.management.base import BaseCommand, CommandError +from django.db import transaction from awx.main.models import Instance, InstanceLink @@ -15,13 +16,24 @@ class Command(BaseCommand): parser.add_argument('source', type=str, help="Receptor node opening the connections.") parser.add_argument('--peers', type=str, nargs='+', required=False, help="Nodes that the source node connects out to.") parser.add_argument('--disconnect', type=str, nargs='+', required=False, help="Nodes that should no longer be connected to by the source node.") + parser.add_argument( + '--exact', + type=str, + nargs='+', + required=False, + help="The exact set of nodes the source node should connect out to. Any existing links registered in the database that do not match will be removed.", + ) def handle(self, **options): nodes = Instance.objects.in_bulk(field_name='hostname') if options['source'] not in nodes: raise CommandError(f"Host {options['source']} is not a registered instance.") - if not options['peers'] and not options['disconnect']: - raise CommandError("One of the options --peers or --disconnect is required.") + if not (options['peers'] or options['disconnect'] or options['exact']): + raise CommandError("One of the options --peers, --disconnect, or --exact is required.") + if options['exact'] and options['peers']: + raise CommandError("The option --peers may not be used with --exact.") + if options['exact'] and options['disconnect']: + raise CommandError("The option --disconnect may not be used with --exact.") if options['peers']: missing_peers = set(options['peers']) - set(nodes) @@ -31,7 +43,7 @@ class Command(BaseCommand): results = 0 for target in options['peers']: - instance, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + _, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) if created: results += 1 @@ -46,3 +58,16 @@ class Command(BaseCommand): results += n print(f"{results} peer links removed from the database.") + + if options['exact']: + additions = 0 + with transaction.atomic(): + peers = set(options['exact']) + links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True)) + removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=peers - links).delete() + for peer in peers - links: + _, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target]) + if created: + additions += 1 + + print(f"{additions} peer links added and {removals} deleted from the database.") From 9c9c1b4d3b65eef66e91269a2f5d95a43c1bdbb2 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 10 Jan 2022 15:48:17 -0500 Subject: [PATCH 11/11] register_peers will now raise errors if you attempt to reverse or loop --- awx/main/management/commands/register_peers.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/awx/main/management/commands/register_peers.py b/awx/main/management/commands/register_peers.py index 35a05ff195..1e7f770f80 100644 --- a/awx/main/management/commands/register_peers.py +++ b/awx/main/management/commands/register_peers.py @@ -35,6 +35,18 @@ class Command(BaseCommand): if options['exact'] and options['disconnect']: raise CommandError("The option --disconnect may not be used with --exact.") + # No 1-cycles + for collection in ('peers', 'disconnect', 'exact'): + if options['source'] in options[collection]: + raise CommandError(f"Source node {options['source']} may not also be in --{collection}.") + + # No 2-cycles + if options['peers'] or options['exact']: + peers = set(options['peers'] or options['exact']) + incoming = set(InstanceLink.objects.filter(target=nodes[options['source']]).values_list('source__hostname', flat=True)) + if peers & incoming: + raise CommandError(f"Source node {options['source']} cannot link to nodes already peering to it: {peers & incoming}.") + if options['peers']: missing_peers = set(options['peers']) - set(nodes) if missing_peers: