From d1cacf64de6c44d78799a5f211372fea8e589d4b Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 14 Nov 2023 13:39:24 -0500 Subject: [PATCH] Add functional and unit tests Updated existing tests to support the ReceptorAddress model - cannot peer to self - cannot peer to node that is already peered to me - cannot peer to node more than once (via 2+ addresses) - cannot set is_internal True Other changes: Change post save signal to only call schedule_write_receptor_config() when an actual change is detected. Make functional tests more robust by checking for specific validation error in the response. I.e. instead of just checking for 400, just for 400 and that the error message corresponds to the validation we are testing for. Signed-off-by: Seth Foster --- awx/api/serializers.py | 8 +- awx/main/models/ha.py | 25 +- awx/main/tasks/receptor.py | 2 +- .../functional/api/test_instance_peers.py | 251 ++++++++++-------- .../unit/models/test_receptor_address.py | 32 +++ 5 files changed, 199 insertions(+), 119 deletions(-) create mode 100644 awx/main/tests/unit/models/test_receptor_address.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 9df4e0edf4..6eae0601c1 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5652,13 +5652,19 @@ class InstanceSerializer(BaseSerializer): if self.instance and self.instance.receptor_addresses.filter(id__in=peers_ids).exists(): raise serializers.ValidationError(_("Instance cannot peer to its own address.")) - # cannot peer to an instance that is already peered to this instance if self.instance and self.instance.receptor_addresses.all().exists(): instance_addresses = set(self.instance.receptor_addresses.all()) + # cannot peer to an instance that is already peered to this instance for p in attrs.get('peers', []): if set(p.instance.peers.all()) & instance_addresses: raise serializers.ValidationError(_(f"Instance {p.instance.hostname} is already peered to this instance.")) + # cannot peer to instance more than once + # compare length of set to original list to check for duplicates + peers_instances = [p.instance for p in attrs.get('peers', [])] + if len(set(peers_instances)) != len(peers_instances): + raise serializers.ValidationError(_("Cannot peer to the same instance more than once.")) + return super().validate(attrs) def validate_node_type(self, value): diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index a5cf2846ca..8f606c1487 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -496,8 +496,6 @@ def schedule_write_receptor_config(broadcast=True): write_receptor_config() # just run locally -# TODO: don't use the receiver post save, just call this at the moment when we need it -# that way we don't call this multiple times unnecessarily @receiver(post_save, sender=ReceptorAddress) def receptor_address_saved(sender, instance, **kwargs): from awx.main.signals import disable_activity_stream @@ -506,16 +504,20 @@ def receptor_address_saved(sender, instance, **kwargs): control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) if address.peers_from_control_nodes: - address.peers_from.add(*control_instances) + if set(address.peers_from.all()) != control_instances: + address.peers_from.add(*control_instances) + schedule_write_receptor_config() else: - address.peers_from.remove(*control_instances) - - schedule_write_receptor_config() + if address.peers_from.exists(): + address.peers_from.remove(*control_instances) + schedule_write_receptor_config() @receiver(post_delete, sender=ReceptorAddress) def receptor_address_deleted(sender, instance, **kwargs): - schedule_write_receptor_config() + address = instance + if address.peers_from_control_nodes: + schedule_write_receptor_config() @receiver(post_save, sender=Instance) @@ -531,10 +533,11 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): from awx.main.signals import disable_activity_stream if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - peers_address = ReceptorAddress.objects.filter(peers_from_control_nodes=True) - with disable_activity_stream(): - instance.peers.add(*peers_address) - schedule_write_receptor_config(broadcast=False) + peers_addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True) + if peers_addresses.exists(): + with disable_activity_stream(): + instance.peers.add(*peers_addresses) + schedule_write_receptor_config(broadcast=False) if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]: if instance.node_state == Instance.States.DEPROVISIONING: diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 17d4208c66..597c900eeb 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -703,7 +703,7 @@ def generate_config_data(): receptor_config = list(RECEPTOR_CONFIG_STARTER) for address in addresses: - if address.get_peer_type() and address.is_internal: + if address.get_peer_type(): peer = { f'{address.get_peer_type()}': { 'address': f'{address.get_full_address()}', diff --git a/awx/main/tests/functional/api/test_instance_peers.py b/awx/main/tests/functional/api/test_instance_peers.py index 93af6bf5a2..c73a2b4427 100644 --- a/awx/main/tests/functional/api/test_instance_peers.py +++ b/awx/main/tests/functional/api/test_instance_peers.py @@ -6,14 +6,14 @@ from unittest import mock from django.db.utils import IntegrityError from awx.api.versioning import reverse -from awx.main.models import Instance +from awx.main.models import Instance, ReceptorAddress from awx.api.views.instance_install_bundle import generate_group_vars_all_yml def has_peer(group_vars, peer): peers = group_vars.get('receptor_peers', []) for p in peers: - if f"{p['host']}:{p['port']}" == peer: + if p['address'] == peer: return True return False @@ -24,119 +24,143 @@ class TestPeers: def configure_settings(self, settings): settings.IS_K8S = True - @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_prevent_peering_to_self(self, node_type): + @pytest.mark.parametrize('node_type', ['hop', 'execution']) + def test_peering_to_self(self, node_type, admin_user, patch): """ cannot peer to self """ - control_instance = Instance.objects.create(hostname='abc', node_type=node_type) - with pytest.raises(IntegrityError): - control_instance.peers.add(control_instance) + instance = Instance.objects.create(hostname='abc', node_type=node_type) + addr = ReceptorAddress.objects.create(instance=instance, address='addr') + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': instance.pk}), + data={"hostname": "abc", "node_type": node_type, "peers": [addr.id]}, + user=admin_user, + expect=400, + ) + assert 'Instance cannot peer to its own address.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'hop', 'execution']) def test_creating_node(self, node_type, admin_user, post): """ can only add hop and execution nodes via API """ - post( + resp = post( url=reverse('api:instance_list'), data={"hostname": "abc", "node_type": node_type}, user=admin_user, expect=400 if node_type in ['control', 'hybrid'] else 201, ) + if resp.status_code == 400: + assert 'Can only create execution or hop nodes.' in str(resp.data) def test_changing_node_type(self, admin_user, patch): """ cannot change node type """ hop = Instance.objects.create(hostname='abc', node_type="hop") - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_type": "execution"}, user=admin_user, expect=400, ) + assert 'Cannot change node type.' in str(resp.data) - @pytest.mark.parametrize('node_type', ['hop', 'execution']) - def test_listener_port_null(self, node_type, admin_user, post): + def test_is_internal(self, admin_user, post): """ - listener_port can be None + cannot set is_internal to True """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "node_type": node_type, "listener_port": None}, - user=admin_user, - expect=201, - ) - - @pytest.mark.parametrize('node_type, allowed', [('control', False), ('hybrid', False), ('hop', True), ('execution', True)]) - def test_peers_from_control_nodes_allowed(self, node_type, allowed, post, admin_user): - """ - only hop and execution nodes can have peers_from_control_nodes set to True - """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "peers_from_control_nodes": True, "node_type": node_type, "listener_port": 6789}, - user=admin_user, - expect=201 if allowed else 400, - ) - - def test_listener_port_is_required(self, admin_user, post): - """ - if adding instance to peers list, that instance must have listener_port set - """ - Instance.objects.create(hostname='abc', node_type="hop", listener_port=None) - post( - url=reverse('api:instance_list'), - data={"hostname": "ex", "peers_from_control_nodes": False, "node_type": "execution", "listener_port": None, "peers": ["abc"]}, + hop = Instance.objects.create(hostname='abc', node_type="hop") + resp = post( + url=reverse('api:instance_receptor_addresses_list', kwargs={'pk': hop.pk}), + data={"address": "hopaddr", "is_internal": True}, user=admin_user, expect=400, ) + assert 'Only external addresses can be created.' in str(resp.data) - def test_peers_from_control_nodes_listener_port_enabled(self, admin_user, post): + def test_multiple_peers_from_control_nodes(self, admin_user, post): """ - if peers_from_control_nodes is True, listener_port must an integer - Assert that all other combinations are allowed + only one address can have peers_from_control_nodes set to True for a given instance """ - for index, item in enumerate(itertools.product(['hop', 'execution'], [True, False], [None, 6789])): - node_type, peers_from, listener_port = item - # only disallowed case is when peers_from is True and listener port is None - disallowed = peers_from and not listener_port - post( - url=reverse('api:instance_list'), - data={"hostname": f"abc{index}", "peers_from_control_nodes": peers_from, "node_type": node_type, "listener_port": listener_port}, - user=admin_user, - expect=400 if disallowed else 201, - ) + hop = Instance.objects.create(hostname='hop', node_type='hop') + hopaddr1 = ReceptorAddress.objects.create(instance=hop, address='hopaddr1', peers_from_control_nodes=True) + resp = post( + url=reverse('api:instance_receptor_addresses_list', kwargs={'pk': hop.pk}), + data={"address": "hopaddr2", "peers_from_control_nodes": True}, + user=admin_user, + expect=400, + ) + assert 'Only one address can set peers_from_control_nodes to True.' in str(resp.data) + + def test_bidirectional_peering(self, admin_user, patch): + """ + cannot peer to node that is already to peered to it + if A -> B, then disallow B -> A + """ + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + hop1.peers.add(hop2addr) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr.id]}, + user=admin_user, + expect=400, + ) + assert 'Instance hop1 is already peered to this instance.' in str(resp.data) + + def test_multiple_peers_same_instance(self, admin_user, patch): + """ + cannot peer to more than one address of the same instance + """ + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr1 = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr1') + hop1addr2 = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr2') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr1.id, hop1addr2.id]}, + user=admin_user, + expect=400, + ) + assert 'Cannot peer to the same instance more than once.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_disallow_modifying_peers_control_nodes(self, node_type, admin_user, patch): + def test_modifying_peers_control_nodes(self, node_type, admin_user, patch): """ for control nodes, peers field should not be modified directly via patch. """ control = Instance.objects.create(hostname='abc', node_type=node_type) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', peers_from_control_nodes=True, listener_port=6789) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', peers_from_control_nodes=False, listener_port=6789) - assert [hop1] == list(control.peers.all()) # only hop1 should be peered - patch( + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + assert [hop1addr] == list(control.peers.all()) # only hop1addr should be peered + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop2"]}, + data={"peers": [hop2addr.id]}, user=admin_user, - expect=400, # cannot add peers directly + expect=400, # cannot add peers manually ) + assert 'Setting peers manually for control nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop1"]}, + data={"peers": [hop1addr.id]}, user=admin_user, expect=200, # patching with current peers list should be okay ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={"peers": []}, user=admin_user, expect=400, # cannot remove peers directly ) + assert 'Setting peers manually for control nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={}, @@ -145,26 +169,28 @@ class TestPeers: ) # patch hop2 patch( - url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + url=reverse('api:receptor_address_detail', kwargs={'pk': hop2addr.pk}), data={"peers_from_control_nodes": True}, user=admin_user, expect=200, # patching without data should be fine too ) - assert {hop1, hop2} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node + assert {hop1addr, hop2addr} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node - def test_disallow_changing_hostname(self, admin_user, patch): + def test_changing_hostname(self, admin_user, patch): """ cannot change hostname """ hop = Instance.objects.create(hostname='hop', node_type='hop') - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"hostname": "hop2"}, user=admin_user, expect=400, ) - def test_disallow_changing_node_state(self, admin_user, patch): + assert 'Cannot change hostname.' in str(resp.data) + + def test_changing_node_state(self, admin_user, patch): """ only allow setting to deprovisioning """ @@ -175,12 +201,13 @@ class TestPeers: user=admin_user, expect=200, ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_state": "ready"}, user=admin_user, expect=400, ) + assert "Can only change instances to the 'deprovisioning' state." in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) def test_control_node_automatically_peers(self, node_type): @@ -191,9 +218,10 @@ class TestPeers: peer to hop should be removed if hop is deleted """ - hop = Instance.objects.create(hostname='hop', node_type='hop', peers_from_control_nodes=True, listener_port=6789) + hop = Instance.objects.create(hostname='hop', node_type='hop') + hopaddr = ReceptorAddress.objects.create(instance=hop, address='hopaddr', peers_from_control_nodes=True) control = Instance.objects.create(hostname='abc', node_type=node_type) - assert hop in control.peers.all() + assert hopaddr in control.peers.all() hop.delete() assert not control.peers.exists() @@ -203,26 +231,32 @@ class TestPeers: if a new node comes online, other peer relationships should remain intact """ - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop1.peers.add(hop2) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + hop1.peers.add(hop2addr) # a control node is added - Instance.objects.create(hostname='control', node_type=node_type, listener_port=None) + Instance.objects.create(hostname='control', node_type=node_type) assert hop1.peers.exists() - def test_group_vars(self, get, admin_user): + def test_group_vars(self): """ control > hop1 > hop2 < execution """ - control = Instance.objects.create(hostname='control', node_type='control', listener_port=None) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - execution = Instance.objects.create(hostname='execution', node_type='execution', listener_port=6789) + control = Instance.objects.create(hostname='control', node_type='control') + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True, port=6789) - execution.peers.add(hop2) - hop1.peers.add(hop2) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr', peers_from_control_nodes=False, port=6789) + + execution = Instance.objects.create(hostname='execution', node_type='execution') + executionaddr = ReceptorAddress.objects.create(instance=execution, address='executionaddr', peers_from_control_nodes=False, port=6789) + + execution.peers.add(hop2addr) + hop1.peers.add(hop2addr) control_vars = yaml.safe_load(generate_group_vars_all_yml(control)) hop1_vars = yaml.safe_load(generate_group_vars_all_yml(hop1)) @@ -230,25 +264,25 @@ class TestPeers: execution_vars = yaml.safe_load(generate_group_vars_all_yml(execution)) # control group vars assertions - assert has_peer(control_vars, 'hop1:6789') - assert not has_peer(control_vars, 'hop2:6789') - assert not has_peer(control_vars, 'execution:6789') + assert has_peer(control_vars, 'hop1addr:6789') + assert not has_peer(control_vars, 'hop2addr:6789') + assert not has_peer(control_vars, 'executionaddr:6789') assert not control_vars.get('receptor_listener', False) # hop1 group vars assertions - assert has_peer(hop1_vars, 'hop2:6789') - assert not has_peer(hop1_vars, 'execution:6789') + assert has_peer(hop1_vars, 'hop2addr:6789') + assert not has_peer(hop1_vars, 'executionaddr:6789') assert hop1_vars.get('receptor_listener', False) # hop2 group vars assertions - assert not has_peer(hop2_vars, 'hop1:6789') - assert not has_peer(hop2_vars, 'execution:6789') + assert not has_peer(hop2_vars, 'hop1addr:6789') + assert not has_peer(hop2_vars, 'executionaddr:6789') assert hop2_vars.get('receptor_listener', False) assert hop2_vars.get('receptor_peers', []) == [] # execution group vars assertions - assert has_peer(execution_vars, 'hop2:6789') - assert not has_peer(execution_vars, 'hop1:6789') + assert has_peer(execution_vars, 'hop2addr:6789') + assert not has_peer(execution_vars, 'hop1addr:6789') assert execution_vars.get('receptor_listener', False) def test_write_receptor_config_called(self): @@ -265,13 +299,15 @@ class TestPeers: control = Instance.objects.create(hostname='control1', node_type='control') write_method.assert_not_called() - # new hop node with peers_from_control_nodes False (no) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + # new address with peers_from_control_nodes False (no) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=False) hop1.delete() write_method.assert_not_called() - # new hop node with peers_from_control_nodes True (yes) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) + # new address with peers_from_control_nodes True (yes) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True) write_method.assert_called() write_method.reset_mock() @@ -280,20 +316,21 @@ class TestPeers: write_method.assert_called() write_method.reset_mock() - # new hop node with peers_from_control_nodes False and peered to another hop node (no) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop2.peers.add(hop1) + # new address with peers_from_control_nodes False and peered to another hop node (no) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr', peers_from_control_nodes=False) + hop2.peers.add(hop1addr) hop2.delete() write_method.assert_not_called() # changing peers_from_control_nodes to False (yes) - hop1.peers_from_control_nodes = False - hop1.save() + hop1addr.peers_from_control_nodes = False + hop1addr.save() write_method.assert_called() write_method.reset_mock() - # deleting hop node that has peers_from_control_nodes to False (no) - hop1.delete() + # deleting address that has peers_from_control_nodes to False (no) + hop1.delete() # cascade deletes to hop1addr write_method.assert_not_called() # deleting control nodes (no) @@ -315,20 +352,22 @@ class TestPeers: # not peered, so config file should not be updated for i in range(3): - Instance.objects.create(hostname=f"exNo-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=False) - + inst = Instance.objects.create(hostname=f"exNo-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exNo-{i}addr", port=6789, peers_from_control_nodes=False) _, should_update = generate_config_data() assert not should_update # peered, so config file should be updated expected_peers = [] for i in range(3): - expected_peers.append(f"hop-{i}:6789") - Instance.objects.create(hostname=f"hop-{i}", node_type='hop', listener_port=6789, peers_from_control_nodes=True) + expected_peers.append(f"hop-{i}addr:6789") + inst = Instance.objects.create(hostname=f"hop-{i}", node_type='hop') + ReceptorAddress.objects.create(instance=inst, address=f"hop-{i}addr", port=6789, peers_from_control_nodes=True) for i in range(3): - expected_peers.append(f"exYes-{i}:6789") - Instance.objects.create(hostname=f"exYes-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=True) + expected_peers.append(f"exYes-{i}addr:6789") + inst = Instance.objects.create(hostname=f"exYes-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exYes-{i}addr", port=6789, peers_from_control_nodes=True) new_config, should_update = generate_config_data() assert should_update diff --git a/awx/main/tests/unit/models/test_receptor_address.py b/awx/main/tests/unit/models/test_receptor_address.py new file mode 100644 index 0000000000..f18e1a9018 --- /dev/null +++ b/awx/main/tests/unit/models/test_receptor_address.py @@ -0,0 +1,32 @@ +from awx.main.models import ReceptorAddress +import pytest + +ReceptorAddress() + + +@pytest.mark.parametrize( + 'address, protocol, port, websocket_path, expected', + [ + ('foo', 'tcp', 27199, '', 'foo:27199'), + ('bar', 'ws', 6789, '', 'wss://bar:6789'), + ('mal', 'ws', 6789, 'path', 'wss://mal:6789/path'), + ('example.com', 'ws', 443, 'path', 'wss://example.com:443/path'), + ], +) +def test_get_full_address(address, protocol, port, websocket_path, expected): + receptor_address = ReceptorAddress(address=address, protocol=protocol, port=port, websocket_path=websocket_path) + assert receptor_address.get_full_address() == expected + + +@pytest.mark.parametrize( + 'protocol, expected', + [ + ('tcp', 'tcp-peer'), + ('ws', 'ws-peer'), + ('wss', 'ws-peer'), + ('foo', None), + ], +) +def test_get_peer_type(protocol, expected): + receptor_address = ReceptorAddress(protocol=protocol) + assert receptor_address.get_peer_type() == expected