diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 9df4e0edf4..6eae0601c1 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5652,13 +5652,19 @@ class InstanceSerializer(BaseSerializer): if self.instance and self.instance.receptor_addresses.filter(id__in=peers_ids).exists(): raise serializers.ValidationError(_("Instance cannot peer to its own address.")) - # cannot peer to an instance that is already peered to this instance if self.instance and self.instance.receptor_addresses.all().exists(): instance_addresses = set(self.instance.receptor_addresses.all()) + # cannot peer to an instance that is already peered to this instance for p in attrs.get('peers', []): if set(p.instance.peers.all()) & instance_addresses: raise serializers.ValidationError(_(f"Instance {p.instance.hostname} is already peered to this instance.")) + # cannot peer to instance more than once + # compare length of set to original list to check for duplicates + peers_instances = [p.instance for p in attrs.get('peers', [])] + if len(set(peers_instances)) != len(peers_instances): + raise serializers.ValidationError(_("Cannot peer to the same instance more than once.")) + return super().validate(attrs) def validate_node_type(self, value): diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index a5cf2846ca..8f606c1487 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -496,8 +496,6 @@ def schedule_write_receptor_config(broadcast=True): write_receptor_config() # just run locally -# TODO: don't use the receiver post save, just call this at the moment when we need it -# that way we don't call this multiple times unnecessarily @receiver(post_save, sender=ReceptorAddress) def receptor_address_saved(sender, instance, **kwargs): from awx.main.signals import disable_activity_stream @@ -506,16 +504,20 @@ def receptor_address_saved(sender, instance, **kwargs): control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) if address.peers_from_control_nodes: - address.peers_from.add(*control_instances) + if set(address.peers_from.all()) != control_instances: + address.peers_from.add(*control_instances) + schedule_write_receptor_config() else: - address.peers_from.remove(*control_instances) - - schedule_write_receptor_config() + if address.peers_from.exists(): + address.peers_from.remove(*control_instances) + schedule_write_receptor_config() @receiver(post_delete, sender=ReceptorAddress) def receptor_address_deleted(sender, instance, **kwargs): - schedule_write_receptor_config() + address = instance + if address.peers_from_control_nodes: + schedule_write_receptor_config() @receiver(post_save, sender=Instance) @@ -531,10 +533,11 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): from awx.main.signals import disable_activity_stream if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - peers_address = ReceptorAddress.objects.filter(peers_from_control_nodes=True) - with disable_activity_stream(): - instance.peers.add(*peers_address) - schedule_write_receptor_config(broadcast=False) + peers_addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True) + if peers_addresses.exists(): + with disable_activity_stream(): + instance.peers.add(*peers_addresses) + schedule_write_receptor_config(broadcast=False) if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]: if instance.node_state == Instance.States.DEPROVISIONING: diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 17d4208c66..597c900eeb 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -703,7 +703,7 @@ def generate_config_data(): receptor_config = list(RECEPTOR_CONFIG_STARTER) for address in addresses: - if address.get_peer_type() and address.is_internal: + if address.get_peer_type(): peer = { f'{address.get_peer_type()}': { 'address': f'{address.get_full_address()}', diff --git a/awx/main/tests/functional/api/test_instance_peers.py b/awx/main/tests/functional/api/test_instance_peers.py index 93af6bf5a2..c73a2b4427 100644 --- a/awx/main/tests/functional/api/test_instance_peers.py +++ b/awx/main/tests/functional/api/test_instance_peers.py @@ -6,14 +6,14 @@ from unittest import mock from django.db.utils import IntegrityError from awx.api.versioning import reverse -from awx.main.models import Instance +from awx.main.models import Instance, ReceptorAddress from awx.api.views.instance_install_bundle import generate_group_vars_all_yml def has_peer(group_vars, peer): peers = group_vars.get('receptor_peers', []) for p in peers: - if f"{p['host']}:{p['port']}" == peer: + if p['address'] == peer: return True return False @@ -24,119 +24,143 @@ class TestPeers: def configure_settings(self, settings): settings.IS_K8S = True - @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_prevent_peering_to_self(self, node_type): + @pytest.mark.parametrize('node_type', ['hop', 'execution']) + def test_peering_to_self(self, node_type, admin_user, patch): """ cannot peer to self """ - control_instance = Instance.objects.create(hostname='abc', node_type=node_type) - with pytest.raises(IntegrityError): - control_instance.peers.add(control_instance) + instance = Instance.objects.create(hostname='abc', node_type=node_type) + addr = ReceptorAddress.objects.create(instance=instance, address='addr') + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': instance.pk}), + data={"hostname": "abc", "node_type": node_type, "peers": [addr.id]}, + user=admin_user, + expect=400, + ) + assert 'Instance cannot peer to its own address.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'hop', 'execution']) def test_creating_node(self, node_type, admin_user, post): """ can only add hop and execution nodes via API """ - post( + resp = post( url=reverse('api:instance_list'), data={"hostname": "abc", "node_type": node_type}, user=admin_user, expect=400 if node_type in ['control', 'hybrid'] else 201, ) + if resp.status_code == 400: + assert 'Can only create execution or hop nodes.' in str(resp.data) def test_changing_node_type(self, admin_user, patch): """ cannot change node type """ hop = Instance.objects.create(hostname='abc', node_type="hop") - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_type": "execution"}, user=admin_user, expect=400, ) + assert 'Cannot change node type.' in str(resp.data) - @pytest.mark.parametrize('node_type', ['hop', 'execution']) - def test_listener_port_null(self, node_type, admin_user, post): + def test_is_internal(self, admin_user, post): """ - listener_port can be None + cannot set is_internal to True """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "node_type": node_type, "listener_port": None}, - user=admin_user, - expect=201, - ) - - @pytest.mark.parametrize('node_type, allowed', [('control', False), ('hybrid', False), ('hop', True), ('execution', True)]) - def test_peers_from_control_nodes_allowed(self, node_type, allowed, post, admin_user): - """ - only hop and execution nodes can have peers_from_control_nodes set to True - """ - post( - url=reverse('api:instance_list'), - data={"hostname": "abc", "peers_from_control_nodes": True, "node_type": node_type, "listener_port": 6789}, - user=admin_user, - expect=201 if allowed else 400, - ) - - def test_listener_port_is_required(self, admin_user, post): - """ - if adding instance to peers list, that instance must have listener_port set - """ - Instance.objects.create(hostname='abc', node_type="hop", listener_port=None) - post( - url=reverse('api:instance_list'), - data={"hostname": "ex", "peers_from_control_nodes": False, "node_type": "execution", "listener_port": None, "peers": ["abc"]}, + hop = Instance.objects.create(hostname='abc', node_type="hop") + resp = post( + url=reverse('api:instance_receptor_addresses_list', kwargs={'pk': hop.pk}), + data={"address": "hopaddr", "is_internal": True}, user=admin_user, expect=400, ) + assert 'Only external addresses can be created.' in str(resp.data) - def test_peers_from_control_nodes_listener_port_enabled(self, admin_user, post): + def test_multiple_peers_from_control_nodes(self, admin_user, post): """ - if peers_from_control_nodes is True, listener_port must an integer - Assert that all other combinations are allowed + only one address can have peers_from_control_nodes set to True for a given instance """ - for index, item in enumerate(itertools.product(['hop', 'execution'], [True, False], [None, 6789])): - node_type, peers_from, listener_port = item - # only disallowed case is when peers_from is True and listener port is None - disallowed = peers_from and not listener_port - post( - url=reverse('api:instance_list'), - data={"hostname": f"abc{index}", "peers_from_control_nodes": peers_from, "node_type": node_type, "listener_port": listener_port}, - user=admin_user, - expect=400 if disallowed else 201, - ) + hop = Instance.objects.create(hostname='hop', node_type='hop') + hopaddr1 = ReceptorAddress.objects.create(instance=hop, address='hopaddr1', peers_from_control_nodes=True) + resp = post( + url=reverse('api:instance_receptor_addresses_list', kwargs={'pk': hop.pk}), + data={"address": "hopaddr2", "peers_from_control_nodes": True}, + user=admin_user, + expect=400, + ) + assert 'Only one address can set peers_from_control_nodes to True.' in str(resp.data) + + def test_bidirectional_peering(self, admin_user, patch): + """ + cannot peer to node that is already to peered to it + if A -> B, then disallow B -> A + """ + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + hop1.peers.add(hop2addr) + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr.id]}, + user=admin_user, + expect=400, + ) + assert 'Instance hop1 is already peered to this instance.' in str(resp.data) + + def test_multiple_peers_same_instance(self, admin_user, patch): + """ + cannot peer to more than one address of the same instance + """ + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr1 = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr1') + hop1addr2 = ReceptorAddress.objects.create(instance=hop1, address='hop1ddr2') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + resp = patch( + url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + data={"peers": [hop1addr1.id, hop1addr2.id]}, + user=admin_user, + expect=400, + ) + assert 'Cannot peer to the same instance more than once.' in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) - def test_disallow_modifying_peers_control_nodes(self, node_type, admin_user, patch): + def test_modifying_peers_control_nodes(self, node_type, admin_user, patch): """ for control nodes, peers field should not be modified directly via patch. """ control = Instance.objects.create(hostname='abc', node_type=node_type) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', peers_from_control_nodes=True, listener_port=6789) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', peers_from_control_nodes=False, listener_port=6789) - assert [hop1] == list(control.peers.all()) # only hop1 should be peered - patch( + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + assert [hop1addr] == list(control.peers.all()) # only hop1addr should be peered + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop2"]}, + data={"peers": [hop2addr.id]}, user=admin_user, - expect=400, # cannot add peers directly + expect=400, # cannot add peers manually ) + assert 'Setting peers manually for control nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), - data={"peers": ["hop1"]}, + data={"peers": [hop1addr.id]}, user=admin_user, expect=200, # patching with current peers list should be okay ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={"peers": []}, user=admin_user, expect=400, # cannot remove peers directly ) + assert 'Setting peers manually for control nodes is not allowed.' in str(resp.data) + patch( url=reverse('api:instance_detail', kwargs={'pk': control.pk}), data={}, @@ -145,26 +169,28 @@ class TestPeers: ) # patch hop2 patch( - url=reverse('api:instance_detail', kwargs={'pk': hop2.pk}), + url=reverse('api:receptor_address_detail', kwargs={'pk': hop2addr.pk}), data={"peers_from_control_nodes": True}, user=admin_user, expect=200, # patching without data should be fine too ) - assert {hop1, hop2} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node + assert {hop1addr, hop2addr} == set(control.peers.all()) # hop1 and hop2 should now be peered from control node - def test_disallow_changing_hostname(self, admin_user, patch): + def test_changing_hostname(self, admin_user, patch): """ cannot change hostname """ hop = Instance.objects.create(hostname='hop', node_type='hop') - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"hostname": "hop2"}, user=admin_user, expect=400, ) - def test_disallow_changing_node_state(self, admin_user, patch): + assert 'Cannot change hostname.' in str(resp.data) + + def test_changing_node_state(self, admin_user, patch): """ only allow setting to deprovisioning """ @@ -175,12 +201,13 @@ class TestPeers: user=admin_user, expect=200, ) - patch( + resp = patch( url=reverse('api:instance_detail', kwargs={'pk': hop.pk}), data={"node_state": "ready"}, user=admin_user, expect=400, ) + assert "Can only change instances to the 'deprovisioning' state." in str(resp.data) @pytest.mark.parametrize('node_type', ['control', 'hybrid']) def test_control_node_automatically_peers(self, node_type): @@ -191,9 +218,10 @@ class TestPeers: peer to hop should be removed if hop is deleted """ - hop = Instance.objects.create(hostname='hop', node_type='hop', peers_from_control_nodes=True, listener_port=6789) + hop = Instance.objects.create(hostname='hop', node_type='hop') + hopaddr = ReceptorAddress.objects.create(instance=hop, address='hopaddr', peers_from_control_nodes=True) control = Instance.objects.create(hostname='abc', node_type=node_type) - assert hop in control.peers.all() + assert hopaddr in control.peers.all() hop.delete() assert not control.peers.exists() @@ -203,26 +231,32 @@ class TestPeers: if a new node comes online, other peer relationships should remain intact """ - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop1.peers.add(hop2) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr') + hop1.peers.add(hop2addr) # a control node is added - Instance.objects.create(hostname='control', node_type=node_type, listener_port=None) + Instance.objects.create(hostname='control', node_type=node_type) assert hop1.peers.exists() - def test_group_vars(self, get, admin_user): + def test_group_vars(self): """ control > hop1 > hop2 < execution """ - control = Instance.objects.create(hostname='control', node_type='control', listener_port=None) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - execution = Instance.objects.create(hostname='execution', node_type='execution', listener_port=6789) + control = Instance.objects.create(hostname='control', node_type='control') + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True, port=6789) - execution.peers.add(hop2) - hop1.peers.add(hop2) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr', peers_from_control_nodes=False, port=6789) + + execution = Instance.objects.create(hostname='execution', node_type='execution') + executionaddr = ReceptorAddress.objects.create(instance=execution, address='executionaddr', peers_from_control_nodes=False, port=6789) + + execution.peers.add(hop2addr) + hop1.peers.add(hop2addr) control_vars = yaml.safe_load(generate_group_vars_all_yml(control)) hop1_vars = yaml.safe_load(generate_group_vars_all_yml(hop1)) @@ -230,25 +264,25 @@ class TestPeers: execution_vars = yaml.safe_load(generate_group_vars_all_yml(execution)) # control group vars assertions - assert has_peer(control_vars, 'hop1:6789') - assert not has_peer(control_vars, 'hop2:6789') - assert not has_peer(control_vars, 'execution:6789') + assert has_peer(control_vars, 'hop1addr:6789') + assert not has_peer(control_vars, 'hop2addr:6789') + assert not has_peer(control_vars, 'executionaddr:6789') assert not control_vars.get('receptor_listener', False) # hop1 group vars assertions - assert has_peer(hop1_vars, 'hop2:6789') - assert not has_peer(hop1_vars, 'execution:6789') + assert has_peer(hop1_vars, 'hop2addr:6789') + assert not has_peer(hop1_vars, 'executionaddr:6789') assert hop1_vars.get('receptor_listener', False) # hop2 group vars assertions - assert not has_peer(hop2_vars, 'hop1:6789') - assert not has_peer(hop2_vars, 'execution:6789') + assert not has_peer(hop2_vars, 'hop1addr:6789') + assert not has_peer(hop2_vars, 'executionaddr:6789') assert hop2_vars.get('receptor_listener', False) assert hop2_vars.get('receptor_peers', []) == [] # execution group vars assertions - assert has_peer(execution_vars, 'hop2:6789') - assert not has_peer(execution_vars, 'hop1:6789') + assert has_peer(execution_vars, 'hop2addr:6789') + assert not has_peer(execution_vars, 'hop1addr:6789') assert execution_vars.get('receptor_listener', False) def test_write_receptor_config_called(self): @@ -265,13 +299,15 @@ class TestPeers: control = Instance.objects.create(hostname='control1', node_type='control') write_method.assert_not_called() - # new hop node with peers_from_control_nodes False (no) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=False) + # new address with peers_from_control_nodes False (no) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=False) hop1.delete() write_method.assert_not_called() - # new hop node with peers_from_control_nodes True (yes) - hop1 = Instance.objects.create(hostname='hop1', node_type='hop', listener_port=6789, peers_from_control_nodes=True) + # new address with peers_from_control_nodes True (yes) + hop1 = Instance.objects.create(hostname='hop1', node_type='hop') + hop1addr = ReceptorAddress.objects.create(instance=hop1, address='hop1addr', peers_from_control_nodes=True) write_method.assert_called() write_method.reset_mock() @@ -280,20 +316,21 @@ class TestPeers: write_method.assert_called() write_method.reset_mock() - # new hop node with peers_from_control_nodes False and peered to another hop node (no) - hop2 = Instance.objects.create(hostname='hop2', node_type='hop', listener_port=6789, peers_from_control_nodes=False) - hop2.peers.add(hop1) + # new address with peers_from_control_nodes False and peered to another hop node (no) + hop2 = Instance.objects.create(hostname='hop2', node_type='hop') + hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2addr', peers_from_control_nodes=False) + hop2.peers.add(hop1addr) hop2.delete() write_method.assert_not_called() # changing peers_from_control_nodes to False (yes) - hop1.peers_from_control_nodes = False - hop1.save() + hop1addr.peers_from_control_nodes = False + hop1addr.save() write_method.assert_called() write_method.reset_mock() - # deleting hop node that has peers_from_control_nodes to False (no) - hop1.delete() + # deleting address that has peers_from_control_nodes to False (no) + hop1.delete() # cascade deletes to hop1addr write_method.assert_not_called() # deleting control nodes (no) @@ -315,20 +352,22 @@ class TestPeers: # not peered, so config file should not be updated for i in range(3): - Instance.objects.create(hostname=f"exNo-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=False) - + inst = Instance.objects.create(hostname=f"exNo-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exNo-{i}addr", port=6789, peers_from_control_nodes=False) _, should_update = generate_config_data() assert not should_update # peered, so config file should be updated expected_peers = [] for i in range(3): - expected_peers.append(f"hop-{i}:6789") - Instance.objects.create(hostname=f"hop-{i}", node_type='hop', listener_port=6789, peers_from_control_nodes=True) + expected_peers.append(f"hop-{i}addr:6789") + inst = Instance.objects.create(hostname=f"hop-{i}", node_type='hop') + ReceptorAddress.objects.create(instance=inst, address=f"hop-{i}addr", port=6789, peers_from_control_nodes=True) for i in range(3): - expected_peers.append(f"exYes-{i}:6789") - Instance.objects.create(hostname=f"exYes-{i}", node_type='execution', listener_port=6789, peers_from_control_nodes=True) + expected_peers.append(f"exYes-{i}addr:6789") + inst = Instance.objects.create(hostname=f"exYes-{i}", node_type='execution') + ReceptorAddress.objects.create(instance=inst, address=f"exYes-{i}addr", port=6789, peers_from_control_nodes=True) new_config, should_update = generate_config_data() assert should_update diff --git a/awx/main/tests/unit/models/test_receptor_address.py b/awx/main/tests/unit/models/test_receptor_address.py new file mode 100644 index 0000000000..f18e1a9018 --- /dev/null +++ b/awx/main/tests/unit/models/test_receptor_address.py @@ -0,0 +1,32 @@ +from awx.main.models import ReceptorAddress +import pytest + +ReceptorAddress() + + +@pytest.mark.parametrize( + 'address, protocol, port, websocket_path, expected', + [ + ('foo', 'tcp', 27199, '', 'foo:27199'), + ('bar', 'ws', 6789, '', 'wss://bar:6789'), + ('mal', 'ws', 6789, 'path', 'wss://mal:6789/path'), + ('example.com', 'ws', 443, 'path', 'wss://example.com:443/path'), + ], +) +def test_get_full_address(address, protocol, port, websocket_path, expected): + receptor_address = ReceptorAddress(address=address, protocol=protocol, port=port, websocket_path=websocket_path) + assert receptor_address.get_full_address() == expected + + +@pytest.mark.parametrize( + 'protocol, expected', + [ + ('tcp', 'tcp-peer'), + ('ws', 'ws-peer'), + ('wss', 'ws-peer'), + ('foo', None), + ], +) +def test_get_peer_type(protocol, expected): + receptor_address = ReceptorAddress(protocol=protocol) + assert receptor_address.get_peer_type() == expected