Merge pull request #11431 from jbradberry/receptor-mesh-models

Modify Instance and introduce InstanceLink
This commit is contained in:
Jeff Bradberry 2022-01-11 10:55:54 -05:00 committed by GitHub
commit db999b82ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 210 additions and 67 deletions

View File

@ -11,7 +11,6 @@ COLLECTION_VERSION := $(shell $(PYTHON) setup.py --version | cut -d . -f 1-3)
# NOTE: This defaults the container image version to the branch that's active
COMPOSE_TAG ?= $(GIT_BRANCH)
COMPOSE_HOST ?= $(shell hostname)
MAIN_NODE_TYPE ?= hybrid
# If set to true docker-compose will also start a keycloak instance
KEYCLOAK ?= false
@ -147,24 +146,6 @@ version_file:
fi; \
$(PYTHON) -c "import awx; print(awx.__version__)" > /var/lib/awx/.awx_version; \
# Do any one-time init tasks.
comma := ,
init:
if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST) --node_type=$(MAIN_NODE_TYPE); \
$(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;\
$(MANAGEMENT_COMMAND) register_queue --queuename=default --instance_percent=100;
if [ ! -f /etc/receptor/certs/awx.key ]; then \
rm -f /etc/receptor/certs/*; \
receptor --cert-init commonname="AWX Test CA" bits=2048 outcert=/etc/receptor/certs/ca.crt outkey=/etc/receptor/certs/ca.key; \
for node in $(RECEPTOR_MUTUAL_TLS); do \
receptor --cert-makereq bits=2048 commonname="$$node test cert" dnsname=$$node nodeid=$$node outreq=/etc/receptor/certs/$$node.csr outkey=/etc/receptor/certs/$$node.key; \
receptor --cert-signreq req=/etc/receptor/certs/$$node.csr cacert=/etc/receptor/certs/ca.crt cakey=/etc/receptor/certs/ca.key outcert=/etc/receptor/certs/$$node.crt verify=yes; \
done; \
fi
# Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate

View File

@ -364,11 +364,14 @@ class InstanceList(ListAPIView):
serializer_class = serializers.InstanceSerializer
search_fields = ('hostname',)
def get_queryset(self):
return super().get_queryset().exclude(node_type='hop')
class InstanceDetail(RetrieveUpdateAPIView):
name = _("Instance Detail")
model = models.Instance
queryset = models.Instance.objects.exclude(node_type='hop')
serializer_class = serializers.InstanceSerializer
def update(self, request, *args, **kwargs):
@ -406,13 +409,15 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta
def is_valid_relation(self, parent, sub, created=False):
if parent.node_type == 'control':
return {'msg': _(f"Cannot change instance group membership of control-only node: {parent.hostname}.")}
if parent.node_type == 'hop':
return {'msg': _(f"Cannot change instance group membership of hop node: {parent.hostname}.")}
return None
class InstanceHealthCheck(GenericAPIView):
name = _('Instance Health Check')
model = models.Instance
queryset = models.Instance.objects.exclude(node_type='hop')
serializer_class = serializers.InstanceHealthCheckSerializer
permission_classes = (IsSystemAdminOrAuditor,)
@ -503,6 +508,8 @@ class InstanceGroupInstanceList(InstanceGroupMembershipMixin, SubListAttachDetac
def is_valid_relation(self, parent, sub, created=False):
if sub.node_type == 'control':
return {'msg': _(f"Cannot change instance group membership of control-only node: {sub.hostname}.")}
if sub.node_type == 'hop':
return {'msg': _(f"Cannot change instance group membership of hop node: {sub.hostname}.")}
return None

View File

@ -149,7 +149,7 @@ class ApiV2PingView(APIView):
response = {'ha': is_ha_environment(), 'version': get_awx_version(), 'active_node': settings.CLUSTER_HOST_ID, 'install_uuid': settings.INSTALL_UUID}
response['instances'] = []
for instance in Instance.objects.all():
for instance in Instance.objects.exclude(node_type='hop'):
response['instances'].append(
dict(
node=instance.hostname,

View File

@ -13,7 +13,7 @@ class Ungrouped(object):
@property
def instances(self):
return Instance.objects.filter(rampart_groups__isnull=True)
return Instance.objects.filter(rampart_groups__isnull=True).exclude(node_type='hop')
@property
def capacity(self):

View File

@ -13,19 +13,19 @@ class Command(BaseCommand):
Register this instance with the database for HA tracking.
"""
help = 'Add instance to the database. ' 'Specify `--hostname` to use this command.'
help = "Add instance to the database. Specify `--hostname` to use this command."
def add_arguments(self, parser):
parser.add_argument('--hostname', dest='hostname', type=str, help='Hostname used during provisioning')
parser.add_argument('--node_type', type=str, default="hybrid", choices=["control", "execution", "hybrid"], help='Instance Node type')
parser.add_argument('--uuid', type=str, help='Instance UUID')
parser.add_argument('--hostname', dest='hostname', type=str, help="Hostname used during provisioning")
parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type")
parser.add_argument('--uuid', type=str, help="Instance UUID")
def _register_hostname(self, hostname, node_type, uuid):
if not hostname:
return
(changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, uuid=uuid)
if changed:
print('Successfully registered instance {}'.format(hostname))
print("Successfully registered instance {}".format(hostname))
else:
print("Instance already registered {}".format(instance.hostname))
self.changed = changed
@ -37,4 +37,4 @@ class Command(BaseCommand):
self.changed = False
self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'))
if self.changed:
print('(changed: True)')
print("(changed: True)")

View File

@ -0,0 +1,85 @@
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from awx.main.models import Instance, InstanceLink
class Command(BaseCommand):
"""
Internal tower command.
Register the peers of a receptor node.
"""
help = "Register or remove links between Receptor nodes."
def add_arguments(self, parser):
parser.add_argument('source', type=str, help="Receptor node opening the connections.")
parser.add_argument('--peers', type=str, nargs='+', required=False, help="Nodes that the source node connects out to.")
parser.add_argument('--disconnect', type=str, nargs='+', required=False, help="Nodes that should no longer be connected to by the source node.")
parser.add_argument(
'--exact',
type=str,
nargs='+',
required=False,
help="The exact set of nodes the source node should connect out to. Any existing links registered in the database that do not match will be removed.",
)
def handle(self, **options):
nodes = Instance.objects.in_bulk(field_name='hostname')
if options['source'] not in nodes:
raise CommandError(f"Host {options['source']} is not a registered instance.")
if not (options['peers'] or options['disconnect'] or options['exact']):
raise CommandError("One of the options --peers, --disconnect, or --exact is required.")
if options['exact'] and options['peers']:
raise CommandError("The option --peers may not be used with --exact.")
if options['exact'] and options['disconnect']:
raise CommandError("The option --disconnect may not be used with --exact.")
# No 1-cycles
for collection in ('peers', 'disconnect', 'exact'):
if options['source'] in options[collection]:
raise CommandError(f"Source node {options['source']} may not also be in --{collection}.")
# No 2-cycles
if options['peers'] or options['exact']:
peers = set(options['peers'] or options['exact'])
incoming = set(InstanceLink.objects.filter(target=nodes[options['source']]).values_list('source__hostname', flat=True))
if peers & incoming:
raise CommandError(f"Source node {options['source']} cannot link to nodes already peering to it: {peers & incoming}.")
if options['peers']:
missing_peers = set(options['peers']) - set(nodes)
if missing_peers:
missing = ' '.join(missing_peers)
raise CommandError(f"Peers not currently registered as instances: {missing}")
results = 0
for target in options['peers']:
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
if created:
results += 1
print(f"{results} new peer links added to the database.")
if options['disconnect']:
results = 0
for target in options['disconnect']:
if target not in nodes: # Be permissive, the node might have already been de-registered.
continue
n, _ = InstanceLink.objects.filter(source=nodes[options['source']], target=nodes[target]).delete()
results += n
print(f"{results} peer links removed from the database.")
if options['exact']:
additions = 0
with transaction.atomic():
peers = set(options['exact'])
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=peers - links).delete()
for peer in peers - links:
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
if created:
additions += 1
print(f"{additions} peer links added and {removals} deleted from the database.")

View File

@ -188,7 +188,7 @@ class InstanceManager(models.Manager):
def active_count(self):
"""Return count of active Tower nodes for licensing."""
return self.all().count()
return self.exclude(node_type='hop').count()
class InstanceGroupManager(models.Manager):

View File

@ -0,0 +1,44 @@
# Generated by Django 2.2.20 on 2021-12-17 19:26
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0155_improved_health_check'),
]
operations = [
migrations.AlterField(
model_name='instance',
name='node_type',
field=models.CharField(
choices=[
('control', 'Control plane node'),
('execution', 'Execution plane node'),
('hybrid', 'Controller and execution'),
('hop', 'Message-passing node, no execution capability'),
],
default='hybrid',
max_length=16,
),
),
migrations.CreateModel(
name='InstanceLink',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('source', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='+', to='main.Instance')),
('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reverse_peers', to='main.Instance')),
],
options={
'unique_together': {('source', 'target')},
},
),
migrations.AddField(
model_name='instance',
name='peers',
field=models.ManyToManyField(through='main.InstanceLink', to='main.Instance'),
),
]

View File

@ -47,6 +47,7 @@ from awx.main.models.execution_environments import ExecutionEnvironment # noqa
from awx.main.models.activity_stream import ActivityStream # noqa
from awx.main.models.ha import ( # noqa
Instance,
InstanceLink,
InstanceGroup,
TowerScheduleState,
)

View File

@ -29,7 +29,7 @@ from awx.main.models.mixins import RelatedJobsMixin
# ansible-runner
from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes
__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState')
__all__ = ('Instance', 'InstanceGroup', 'InstanceLink', 'TowerScheduleState')
logger = logging.getLogger('awx.main.models.ha')
@ -54,6 +54,14 @@ class HasPolicyEditsMixin(HasEditsMixin):
return self._values_have_edits(new_values)
class InstanceLink(BaseModel):
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
class Meta:
unique_together = ('source', 'target')
class Instance(HasPolicyEditsMixin, BaseModel):
"""A model representing an AWX instance running against this database."""
@ -116,9 +124,16 @@ class Instance(HasPolicyEditsMixin, BaseModel):
default=0,
editable=False,
)
NODE_TYPE_CHOICES = [("control", "Control plane node"), ("execution", "Execution plane node"), ("hybrid", "Controller and execution")]
NODE_TYPE_CHOICES = [
("control", "Control plane node"),
("execution", "Execution plane node"),
("hybrid", "Controller and execution"),
("hop", "Message-passing node, no execution capability"),
]
node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16)
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
class Meta:
app_label = 'main'
ordering = ("hostname",)
@ -200,7 +215,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def set_capacity_value(self):
"""Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled:
if self.enabled and self.node_type != 'hop':
lower_cap = min(self.mem_capacity, self.cpu_capacity)
higher_cap = max(self.mem_capacity, self.cpu_capacity)
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
@ -305,7 +320,7 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
@property
def capacity(self):
return sum([inst.capacity for inst in self.instances.all()])
return sum(inst.capacity for inst in self.instances.all())
@property
def jobs_running(self):

View File

@ -13,7 +13,6 @@ from django.db import transaction, connection
from django.utils.translation import ugettext_lazy as _, gettext_noop
from django.utils.timezone import now as tz_now
from django.conf import settings
from django.db.models import Q
# AWX
from awx.main.dispatch.reaper import reap_job
@ -69,7 +68,7 @@ class TaskManager:
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
instances = Instance.objects.filter(~Q(hostname=None), enabled=True)
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances}
instances_partial = [
@ -484,7 +483,7 @@ class TaskManager:
return created_dependencies
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
for task in pending_tasks:
if self.start_task_limit <= 0:
@ -593,7 +592,7 @@ class TaskManager:
# elsewhere
for j in UnifiedJob.objects.filter(
status__in=['pending', 'waiting', 'running'],
).exclude(execution_node__in=Instance.objects.values_list('hostname', flat=True)):
).exclude(execution_node__in=Instance.objects.exclude(node_type='hop').values_list('hostname', flat=True)):
if j.execution_node and not j.is_container_group_task:
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
reap_job(j, 'failed')

View File

@ -202,7 +202,8 @@ def apply_cluster_membership_policies():
to_log = logger.debug
to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time))
started_compute = time.time()
all_instances = list(Instance.objects.order_by('id'))
# Hop nodes should never get assigned to an InstanceGroup.
all_instances = list(Instance.objects.exclude(node_type='hop').order_by('id'))
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
total_instances = len(all_instances)
@ -253,7 +254,7 @@ def apply_cluster_membership_policies():
# Finally, process instance policy percentages
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
exclude_type = 'execution' if g.obj.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME else 'control'
candidate_pool_ct = len([i for i in actual_instances if i.obj.node_type != exclude_type])
candidate_pool_ct = sum(1 for i in actual_instances if i.obj.node_type != exclude_type)
if not candidate_pool_ct:
continue
policy_per_added = []
@ -494,29 +495,21 @@ def execution_node_health_check(node):
def inspect_execution_nodes(instance_list):
with advisory_lock('inspect_execution_nodes_lock', wait=False):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
node_lookup = {inst.hostname: inst for inst in instance_list}
ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
mesh_status = ctl.simple_command('status')
nowtime = now()
for ad in connections:
workers = mesh_status['Advertisements']
for ad in workers:
hostname = ad['NodeID']
commands = ad.get('WorkCommands') or []
worktypes = []
for c in commands:
worktypes.append(c["WorkType"])
if 'ansible-runner' not in worktypes:
if not any(cmd['WorkType'] == 'ansible-runner' for cmd in ad['WorkCommands'] or []):
continue
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
elif settings.MESH_AUTODISCOVERY_ENABLED:
defaults = dict(enabled=False)
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution', defaults=defaults)
logger.warn(f"Registered execution node '{hostname}' (marked disabled by default)")
else:
logger.warn(f"Unrecognized node on mesh advertising ansible-runner work type: {hostname}")
continue

View File

@ -84,8 +84,9 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
list_response2 = get(reverse('api:instance_list'), user=org_admin)
api_num_instances_oa = list(list_response2.data.items())[0][1]
assert actual_num_instances == api_num_instances_auditor
# Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected
assert api_num_instances_auditor == actual_num_instances
# Note: The org_admin will not see the default 'tower' node
# (instance fixture) because it is not in its group, as expected
assert api_num_instances_oa == (actual_num_instances - 1)

View File

@ -35,6 +35,7 @@ def get_broadcast_hosts():
instances = (
Instance.objects.exclude(hostname=Instance.objects.me().hostname)
.exclude(node_type='execution')
.exclude(node_type='hop')
.order_by('hostname')
.values('hostname', 'ip_address')
.distinct()

View File

@ -262,10 +262,6 @@ CSRF_COOKIE_SECURE = True
# Limit CSRF cookies to browser sessions
CSRF_COOKIE_AGE = None
# Auto-discover new instances that appear on receptor mesh
# used for docker-compose environment, unsupported
MESH_AUTODISCOVERY_ENABLED = False
TEMPLATES = [
{
'NAME': 'default',

View File

@ -55,9 +55,6 @@ template['OPTIONS']['loaders'] = ('django.template.loaders.filesystem.Loader', '
PENDO_TRACKING_STATE = "off"
INSIGHTS_TRACKING_STATE = False
# auto-discover receptor-* execution nodes
MESH_AUTODISCOVERY_ENABLED = True
# debug toolbar and swagger assume that requirements/requirements_dev.txt are installed
INSTALLED_APPS += ['rest_framework_swagger', 'debug_toolbar'] # NOQA

View File

@ -19,6 +19,8 @@ services:
AWX_GROUP_QUEUES: tower
MAIN_NODE_TYPE: "${MAIN_NODE_TYPE:-hybrid}"
RECEPTORCTL_SOCKET: {{ receptor_socket_file }}
CONTROL_PLANE_NODE_COUNT: {{ control_plane_node_count|int }}
EXECUTION_NODE_COUNT: {{ execution_node_count|int }}
{% if loop.index == 1 %}
RUN_MIGRATIONS: 1
{% endif %}

View File

@ -10,3 +10,6 @@
- tcp-listener:
port: 5555
- control-service:
service: control

View File

@ -19,9 +19,6 @@ else
wait-for-migrations
fi
make init
if output=$(awx-manage createsuperuser --noinput --username=admin --email=admin@localhost 2> /dev/null); then
echo $output
admin_password=$(openssl rand -base64 12)
@ -35,6 +32,27 @@ mkdir -p /awx_devel/awx/public/static
mkdir -p /awx_devel/awx/ui/static
mkdir -p /awx_devel/awx/ui/build/static
awx-manage provision_instance --hostname="$(hostname)" --node_type="$MAIN_NODE_TYPE"
awx-manage register_queue --queuename=controlplane --instance_percent=100
awx-manage register_queue --queuename=default --instance_percent=100
if [[ -n "$RUN_MIGRATIONS" ]]; then
for (( i=1; i<$CONTROL_PLANE_NODE_COUNT; i++ )); do
for (( j=i + 1; j<=$CONTROL_PLANE_NODE_COUNT; j++ )); do
awx-manage register_peers "awx_$i" --peers "awx_$j"
done
done
if [[ $EXECUTION_NODE_COUNT > 0 ]]; then
awx-manage provision_instance --hostname="receptor-hop" --node_type="hop"
awx-manage register_peers "receptor-hop" --peers "awx_1"
for (( e=1; e<=$EXECUTION_NODE_COUNT; e++ )); do
awx-manage provision_instance --hostname="receptor-$e" --node_type="execution"
awx-manage register_peers "receptor-$e" --peers "receptor-hop"
done
fi
fi
# Create resource entries when using Minikube
if [[ -n "$MINIKUBE_CONTAINER_GROUP" ]]; then
awx-manage shell < /awx_devel/tools/docker-compose-minikube/_sources/bootstrap_minikube.py