diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 51224488a3..94797182ca 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5506,6 +5506,9 @@ class InstanceSerializer(BaseSerializer): return super().validate(attrs) def validate_node_type(self, value): + if not self.instance and value not in [Instance.Types.HOP, Instance.Types.EXECUTION]: + raise serializers.ValidationError("Can only create execution nodes.") + if self.instance and self.instance.node_type != value: raise serializers.ValidationError(_("Cannot change node type.")) diff --git a/awx/main/management/commands/host_metric.py b/awx/main/management/commands/host_metric.py index b778545148..c0862ea1a3 100644 --- a/awx/main/management/commands/host_metric.py +++ b/awx/main/management/commands/host_metric.py @@ -35,7 +35,15 @@ class Command(BaseCommand): def host_metric_summary_monthly_queryset(self, result, offset=0, limit=BATCHED_FETCH_COUNT): list_of_queryset = list( - result.values('id', 'date', 'license_consumed', 'license_capacity', 'hosts_added', 'hosts_deleted', 'indirectly_managed_hosts',).order_by( + result.values( + 'id', + 'date', + 'license_consumed', + 'license_capacity', + 'hosts_added', + 'hosts_deleted', + 'indirectly_managed_hosts', + ).order_by( 'date' )[offset : offset + limit] ) diff --git a/awx/main/managers.py b/awx/main/managers.py index 86ab328121..41096ef3fe 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -167,7 +167,6 @@ class InstanceManager(models.Manager): create_defaults = { 'node_state': Instance.States.INSTALLED, 'capacity': 0, - 'listener_port': 27199, } if defaults is not None: create_defaults.update(defaults) diff --git a/awx/main/migrations/0185_hop_nodes.py b/awx/main/migrations/0186_hop_nodes.py similarity index 97% rename from awx/main/migrations/0185_hop_nodes.py rename to awx/main/migrations/0186_hop_nodes.py index e3b1549195..c157b2c952 100644 --- a/awx/main/migrations/0185_hop_nodes.py +++ b/awx/main/migrations/0186_hop_nodes.py @@ -11,7 +11,7 @@ def set_peers_from_control_nodes_true(apps, schema_editor): class Migration(migrations.Migration): dependencies = [ - ('main', '0184_django_indexes'), + ('main', '0185_move_JSONBlob_to_JSONField'), ] operations = [ diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 78f782edd3..7676e08c2a 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -19,6 +19,7 @@ from solo.models import SingletonModel # AWX from awx import __version__ as awx_application_version +from awx.main.utils import is_testing from awx.api.versioning import reverse from awx.main.fields import ImplicitRoleField from awx.main.managers import InstanceManager, UUID_DEFAULT @@ -184,7 +185,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): help_text=_("Port that Receptor will listen for incoming connections on."), ) - peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target')) + peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) class Meta: @@ -477,20 +478,50 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs instance.set_default_policy_fields() +def schedule_write_receptor_config(broadcast=True): + from awx.main.tasks.receptor import write_receptor_config # prevents circular import + + # broadcast to all control instances to update their receptor configs + if broadcast: + connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) + else: + if not is_testing(): + write_receptor_config() # just run locally + + @receiver(post_save, sender=Instance) def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): - if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP): + ''' + Here we link control nodes to hop or execution nodes based on the + peers_from_control_nodes field. + write_receptor_config should be called on each control node when: + 1. new node is created with peers_from_control_nodes enabled + 2. a node changes its value of peers_from_control_nodes + 3. a new control node comes online and has instances to peer to + ''' + if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: + inst = Instance.objects.filter(peers_from_control_nodes=True) + if set(instance.peers.all()) != set(inst): + instance.peers.set(inst) + schedule_write_receptor_config(broadcast=False) + + if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]: if instance.node_state == Instance.States.DEPROVISIONING: from awx.main.tasks.receptor import remove_deprovisioned_node # prevents circular import # wait for jobs on the node to complete, then delete the # node and kick off write_receptor_config connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname])) - - from awx.main.tasks.receptor import write_receptor_config # prevents circular import - - # broadcast to all control instances to update their receptor configs - connection.on_commit(lambda: write_receptor_config.apply_async(queue='tower_broadcast_all')) + else: + if instance.peers_from_control_nodes: + control_instances = Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID]) + if set(instance.peers_from.all()) != control_instances: + instance.peers_from.set(control_instances) + schedule_write_receptor_config() # keep method separate to make pytest mocking easier + else: + if instance.peers_from.exists(): + instance.peers_from.clear() + schedule_write_receptor_config() if created or instance.has_policy_changes(): schedule_policy_task() @@ -506,10 +537,12 @@ def on_instance_group_deleted(sender, instance, using, **kwargs): def on_instance_deleted(sender, instance, using, **kwargs): schedule_policy_task() if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION, Instance.Types.HOP) and instance.peers_from_control_nodes: - from awx.main.tasks.receptor import write_receptor_config # prevents circular import + schedule_write_receptor_config() - # broadcast to all control instances to update their receptor configs - connection.on_commit(lambda: write_receptor_config.apply_async(kwargs=dict(force=True), queue='tower_broadcast_all')) + +@receiver(post_save, sender=InstanceLink) +def on_instancelink_save(sender, instance, using, **kwargs): + logger.warning("instancelink saved") class UnifiedJobTemplateInstanceGroupMembership(models.Model): diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 14c0a42786..877b842ded 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -30,6 +30,7 @@ from awx.main.tasks.signals import signal_state, signal_callback, SignalExit from awx.main.models import Instance, InstanceLink, UnifiedJob from awx.main.dispatch import get_task_queuename from awx.main.dispatch.publish import task +from awx.main.utils.pglock import advisory_lock # Receptorctl from receptorctl.socket_interface import ReceptorControl @@ -675,55 +676,80 @@ RECEPTOR_CONFIG_STARTER = ( ) +def should_update_config(instances): + ''' + checks that the list of instances matches the list of + tcp-peers in the config + ''' + current_config = read_receptor_config() # this gets receptor conf lock + current_peers = [] + for config_entry in current_config: + for key, value in config_entry.items(): + if key.endswith('-peer'): + current_peers.append(value['address']) + intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances] + # TODO remove this logging line + logger.warning(f"current {current_peers} intended {intended_peers}") + if set(current_peers) == set(intended_peers): + return False # config file is already update to date + + return True + + +def generate_config_data(): + # returns two values + # receptor config - based on current database peers + # should_update - If True, receptor_config differs from the receptor conf file on disk + instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True) + + receptor_config = list(RECEPTOR_CONFIG_STARTER) + for instance in instances: + peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} + receptor_config.append(peer) + should_update = should_update_config(instances) + return receptor_config, should_update + + +def reload_receptor(): + logger.warning("Receptor config changed, reloading receptor") + + # This needs to be outside of the lock because this function itself will acquire the lock. + receptor_ctl = get_receptor_ctl() + + attempts = 10 + for backoff in range(1, attempts + 1): + try: + receptor_ctl.simple_command("reload") + break + except ValueError: + logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.") + time.sleep(backoff) + else: + raise RuntimeError("Receptor reload failed") + + @task() -def write_receptor_config(force=False): +def write_receptor_config(): """ - only control nodes will run this - force=True means to call receptorctl reload + This task runs async on each control node, K8S only. + It is triggered whenever remote is added or removed, or if peers_from_control_nodes + is flipped. + It is possible for write_receptor_config to be called multiple times. + For example, if new instances are added in quick succession. + To prevent that case, each control node first grabs a DB advisory lock, specific + to just that control node (i.e. multiple control nodes can run this function + at the same time, since it only writes the local receptor config file) """ - lock = FileLock(__RECEPTOR_CONF_LOCKFILE) - with lock: - receptor_config = list(RECEPTOR_CONFIG_STARTER) + with advisory_lock(f"{settings.CLUSTER_HOST_ID}_write_receptor_config", wait=True): + # Config file needs to be updated + receptor_config, should_update = generate_config_data() + if should_update: + lock = FileLock(__RECEPTOR_CONF_LOCKFILE) + with lock: + with open(__RECEPTOR_CONF, 'w') as file: + yaml.dump(receptor_config, file, default_flow_style=False) - this_inst = Instance.objects.me() - instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP)) - existing_peers = this_inst.peers.all() - - links_added = [] - links_removed = False - for instance in instances: - if not instance.peers_from_control_nodes and instance in existing_peers: - this_inst.peers.remove(instance) - links_removed = True - if instance.peers_from_control_nodes: - peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} - receptor_config.append(peer) - if instance not in existing_peers: - links_added.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING)) - - InstanceLink.objects.bulk_create(links_added) - - with open(__RECEPTOR_CONF, 'w') as file: - yaml.dump(receptor_config, file, default_flow_style=False) - - if force or links_removed or links_added: - logger.debug("Receptor config changed, reloading receptor") - # This needs to be outside of the lock because this function itself will acquire the lock. - receptor_ctl = get_receptor_ctl() - - attempts = 10 - for backoff in range(1, attempts + 1): - try: - receptor_ctl.simple_command("reload") - break - except ValueError: - logger.warning(f"Unable to reload Receptor configuration. {attempts-backoff} attempts left.") - time.sleep(backoff) - else: - raise RuntimeError("Receptor reload failed") - - links = InstanceLink.objects.filter(source=this_inst, target__in=instances, link_state=InstanceLink.States.ADDING) - links.update(link_state=InstanceLink.States.ESTABLISHED) + reload_receptor() @task(queue=get_task_queuename) @@ -743,6 +769,3 @@ def remove_deprovisioned_node(hostname): # This will as a side effect also delete the InstanceLinks that are tied to it. Instance.objects.filter(hostname=hostname).delete() - - # Update the receptor configs for all of the control-plane. - write_receptor_config.apply_async(queue='tower_broadcast_all') diff --git a/awx/main/tests/functional/api/test_instance_peers.py b/awx/main/tests/functional/api/test_instance_peers.py new file mode 100644 index 0000000000..1ae5abf369 --- /dev/null +++ b/awx/main/tests/functional/api/test_instance_peers.py @@ -0,0 +1,331 @@ +import pytest +import yaml +from unittest import mock + +from django.db.utils import IntegrityError + +from awx.api.versioning import reverse +from awx.main.models import Instance +from awx.api.views.instance_install_bundle import generate_group_vars_all_yml + + +@pytest.mark.django_db +class TestPeers: + @pytest.fixture(autouse=True) + def configure_settings(self, settings): + settings.IS_K8S = True + + def test_prevent_peering_to_self(self): + ''' + cannot peer to self + ''' + control_instance = Instance.objects.create(hostname='abc', node_type="control") + with pytest.raises(IntegrityError): + control_instance.peers.add(control_instance) + + @pytest.mark.parametrize('node_type', ['control', 'hop', 'execution']) + def test_creating_node(self, node_type, admin_user, post): + ''' + can only add hop and execution nodes via API + ''' + post( + url=reverse('api:instance_list'), + data={"hostname": "abc", "node_type": node_type}, + user=admin_user, + expect=400 if node_type == 'control' else 201, + ) + + def test_changing_node_type(self, admin_user, patch): + ''' + cannot change node type + ''' + hop = Instance.objects.create(hostname='abc', node_type="hop") + patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"node_type": "execution"}, + user=admin_user, + expect=400, + ) + + @pytest.mark.parametrize('node_type', ['hop', 'execution']) + def test_listener_port_null(self, node_type, admin_user, post): + ''' + listener_port can be None + ''' + post( + url=reverse('api:instance_list'), + data={"hostname": "abc", "node_type": node_type, "listener_port": None}, + user=admin_user, + expect=201, + ) + + @pytest.mark.parametrize('node_type, allowed', [('control', False), ('hop', True), ('execution', True)]) + def test_peers_from_control_nodes_allowed(self, node_type, allowed, post, admin_user): + ''' + only hop and execution nodes can have peers_from_control_nodes set to True + ''' + post( + url=reverse('api:instance_list'), + data={"hostname": "abc", "peers_from_control_nodes": True, "node_type": node_type, "listener_port": 6789}, + user=admin_user, + expect=201 if allowed else 400, + ) + + def test_listener_port_is_required(self, admin_user, post): + ''' + if adding instance to peers list, that instance must have listener_port set + ''' + Instance.objects.create(hostname='abc', node_type="hop", listener_port=None) + post( + url=reverse('api:instance_list'), + data={"hostname": "ex", "peers_from_control_nodes": False, "node_type": "execution", "listener_port": None, "peers": ["abc"]}, + user=admin_user, + expect=400, + ) + + def test_peers_from_control_nodes_listener_port_enabled(self, admin_user, post): + ''' + if peers_from_control_nodes is True, listener_port must an integer + Assert that all other combinations are allowed + ''' + Instance.objects.create(hostname='abc', node_type="control") + i = 0 + for node_type in ['hop', 'execution']: + for peers_from in [True, False]: + for listener_port in [None, 6789]: + # only disallowed case is when peers_from is True and listener port is None + disallowed = peers_from and not listener_port + post( + url=reverse('api:instance_list'), + data={"hostname": f"abc{i}", "peers_from_control_nodes": peers_from, "node_type": node_type, "listener_port": listener_port}, + user=admin_user, + expect=400 if disallowed else 201, + ) + i += 1 + + def test_disallow_modify_peers_control_nodes(self, admin_user, patch): + ''' + for control nodes, peers field should not be + modified directly via patch. + ''' + control = Instance.objects.create(hostname='abc', node_type='control') + hop1 = Instance.objects.create(hostname='hop1', node_type='hop', peers_from_control_nodes=True, listener_port=6789) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop', peers_from_control_nodes=False, listener_port=6789) + assert [hop1] == list(control.peers.all()) # only hop1 should be peered + patch( + url=reverse('api:instance_detail', kwargs={'pk': control.pk}), + data={"peers": ["hop2"]}, + user=admin_user, + expect=400, # cannot add peers directly + ) + patch( + url=reverse('api:instance_detail', kwargs={'pk': control.pk}), + data={"peers": ["hop1"]}, + user=admin_user, + expect=200, # patching with current peers list should be okay + ) + patch( + url=reverse('api:instance_detail', kwargs={'pk': control.pk}), + data={"peers": []}, + user=admin_user, + expect=400, # cannot remove peers directly + ) + patch( + url=reverse('api:instance_detail', kwargs={'pk': control.pk}), + data={}, + user=admin_user, + expect=200, # patching without data should be fine too + ) + # patch hop2 + patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers_from_control_nodes": True}, + user=admin_user, + expect=200, # patching without data should be fine too + ) + control.refresh_from_db() + assert {hop1, hop2} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node + + def test_disallow_changing_hostname(self, admin_user, patch): + ''' + cannot change hostname + ''' + hop = Instance.objects.create(hostname='hop', node_type='hop') + patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"hostname": "hop2"}, + user=admin_user, + expect=400, + ) + + def test_disallow_changing_node_state(self, admin_user, patch): + ''' + only allow setting to deprovisioning + ''' + hop = Instance.objects.create(hostname='hop', node_type='hop', node_state='installed') + patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"node_state": "deprovisioning"}, + user=admin_user, + expect=200, + ) + patch( + url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), + data={"node_state": "ready"}, + user=admin_user, + expect=400, + ) + + def test_control_node_automatically_peers(self): + ''' + a new control node should automatically + peer to hop + + peer to hop should be removed if hop is deleted + ''' + + hop = Instance.objects.create(hostname='hop', node_type='hop', peers_from_control_nodes=True, listener_port=6789) + control = Instance.objects.create(hostname='abc', node_type='control') + assert hop in control.peers.all() + hop.delete() + assert not control.peers.exists() + + def test_group_vars(self, get, admin_user): + ''' + control > hop1 > hop2 < execution + ''' + control = Instance.objects.create(hostname='control', node_type='control', listener_port=None) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + execution = Instance.objects.create(hostname='execution', node_type='execution', listener_port=6789) + + execution.peers.add(hop2) + hop1.peers.add(hop2) + + control_vars = yaml.safe_load(generate_group_vars_all_yml(control)) + hop1_vars = yaml.safe_load(generate_group_vars_all_yml(hop1)) + hop2_vars = yaml.safe_load(generate_group_vars_all_yml(hop2)) + execution_vars = yaml.safe_load(generate_group_vars_all_yml(execution)) + + def has_peer(group_vars, peer): + peers = group_vars.get('receptor_peers', []) + for p in peers: + if f"{p['host']}:{p['port']}" == peer: + return True + return False + + # control group vars assertions + assert control_vars.get('receptor_host_identifier', '') == 'control' + assert has_peer(control_vars, 'hop1:6789') + assert not has_peer(control_vars, 'hop2:6789') + assert not has_peer(control_vars, 'execution:6789') + assert not control_vars.get('receptor_listener', False) + + # hop1 group vars assertions + assert hop1_vars.get('receptor_host_identifier', '') == 'hop1' + assert has_peer(hop1_vars, 'hop2:6789') + assert not has_peer(hop1_vars, 'execution:6789') + assert hop1_vars.get('receptor_listener', False) + + # hop2 group vars assertions + assert hop2_vars.get('receptor_host_identifier', '') == 'hop2' + assert not has_peer(hop2_vars, 'hop1:6789') + assert not has_peer(hop2_vars, 'execution:6789') + assert hop2_vars.get('receptor_listener', False) + assert hop2_vars.get('receptor_peers', []) == [] + + # execution group vars assertions + assert execution_vars.get('receptor_host_identifier', '') == 'execution' + assert has_peer(execution_vars, 'hop2:6789') + assert not has_peer(execution_vars, 'hop1:6789') + assert execution_vars.get('receptor_listener', False) + + def test_write_receptor_config_called(self): + ''' + Assert that write_receptor_config is called + when certain instances are created, or if + peers_from_control_nodes changes. + In general, write_receptor_config should only + be called when necessary, as it will reload + receptor backend connections which is not trivial. + ''' + with mock.patch('awx.main.models.ha.schedule_write_receptor_config') as write_method: + # new control instance but nothing to peer to (no) + control = Instance.objects.create(hostname='control1', node_type='control') + write_method.assert_not_called() + + # new hop node with peers_from_control_nodes False (no) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + hop1.delete() + write_method.assert_not_called() + + # new hop node with peers_from_control_nodes True (yes) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) + write_method.assert_called() + write_method.reset_mock() + + # new control instance but with something to peer to (yes) + Instance.objects.create(hostname='control2', node_type='control') + write_method.assert_called() + write_method.reset_mock() + + # new hop node with peers_from_control_nodes False and peered to another hop node (no) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + hop2.peers.add(hop1) + hop2.delete() + write_method.assert_not_called() + + # changing peers_from_control_nodes to False (yes) + hop1.peers_from_control_nodes = False + hop1.save() + write_method.assert_called() + write_method.reset_mock() + + # deleting hop node that has peers_from_control_nodes to False (no) + hop1.delete() + write_method.assert_not_called() + + # deleting control nodes (no) + control.delete() + write_method.assert_not_called() + + def test_write_receptor_config_data(self): + ''' + Assert the correct peers are included in data that will + be written to receptor.conf + ''' + from awx.main.tasks.receptor import RECEPTOR_CONFIG_STARTER + + with mock.patch('awx.main.tasks.receptor.read_receptor_config', return_value=list(RECEPTOR_CONFIG_STARTER)): + from awx.main.tasks.receptor import generate_config_data + + _, should_update = generate_config_data() + assert not should_update + + # not peered, so config file should not be updated + for i in range(3): + Instance.objects.create(hostname=f"exNo-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=False) + + _, should_update = generate_config_data() + assert not should_update + + # peered, so config file should be updated + expected_peers = [] + for i in range(3): + expected_peers.append(f"hop-{i}:6789") + Instance.objects.create(hostname=f"hop-{i}", node_type='hop', listener_port=6789, peers_from_control_nodes=True) + + for i in range(3): + expected_peers.append(f"exYes-{i}:6789") + Instance.objects.create(hostname=f"exYes-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=True) + + new_config, should_update = generate_config_data() + assert should_update + + peers = [] + for entry in new_config: + for key, value in entry.items(): + if key == "tcp-peer": + peers.append(value['address']) + + assert set(expected_peers) == set(peers)