diff --git a/awx/api/serializers.py b/awx/api/serializers.py index ab84ca3bb0..522a4d8942 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5459,17 +5459,25 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria class InstanceLinkSerializer(BaseSerializer): class Meta: model = InstanceLink - fields = ('id', 'url', 'related', 'source', 'target', 'link_state') + fields = ('id', 'related', 'source', 'target', 'target_full_address', 'link_state') source = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all()) - target = serializers.SlugRelatedField(slug_field="hostname", queryset=Instance.objects.all()) + + target = serializers.SerializerMethodField() + target_full_address = serializers.SerializerMethodField() def get_related(self, obj): res = super(InstanceLinkSerializer, self).get_related(obj) res['source_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.source.id}) - res['target_instance'] = self.reverse('api:instance_detail', kwargs={'pk': obj.target.id}) + res['target_address'] = self.reverse('api:receptor_address_detail', kwargs={'pk': obj.target.id}) return res + def get_target(self, obj): + return obj.target.instance.hostname + + def get_target_full_address(self, obj): + return obj.target.get_full_address() + class InstanceNodeSerializer(BaseSerializer): class Meta: @@ -5482,7 +5490,7 @@ class ReceptorAddressSerializer(BaseSerializer): class Meta: model = ReceptorAddress - fields = ('id', 'url', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'instance', 'full_address') + fields = ('id', 'url', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'instance', 'peers_from_control_nodes', 'full_address') read_only = 'full_address' def get_full_address(self, obj): @@ -5497,7 +5505,6 @@ class InstanceSerializer(BaseSerializer): jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True) jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True) health_check_pending = serializers.SerializerMethodField() - peers = serializers.SlugRelatedField(many=True, required=False, slug_field="hostname", queryset=Instance.objects.all()) class Meta: model = Instance @@ -5534,7 +5541,6 @@ class InstanceSerializer(BaseSerializer): 'node_state', 'ip_address', 'listener_port', - 'peers', 'peers_from_control_nodes', ) extra_kwargs = { @@ -5562,7 +5568,6 @@ class InstanceSerializer(BaseSerializer): res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk}) if obj.node_type in [Instance.Types.EXECUTION, Instance.Types.HOP]: res['install_bundle'] = self.reverse('api:instance_install_bundle', kwargs={'pk': obj.pk}) - res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk}) if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor: if obj.node_type == 'execution': res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) @@ -5607,30 +5612,30 @@ class InstanceSerializer(BaseSerializer): node_type = get_field_from_model_or_attrs("node_type") peers_from_control_nodes = get_field_from_model_or_attrs("peers_from_control_nodes") listener_port = get_field_from_model_or_attrs("listener_port") - peers = attrs.get('peers', []) + # peers = attrs.get('peers', []) if peers_from_control_nodes and node_type not in (Instance.Types.EXECUTION, Instance.Types.HOP): raise serializers.ValidationError(_("peers_from_control_nodes can only be enabled for execution or hop nodes.")) - if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - if check_peers_changed(): - raise serializers.ValidationError( - _("Setting peers manually for control nodes is not allowed. Enable peers_from_control_nodes on the hop and execution nodes instead.") - ) + # if node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: + # if check_peers_changed(): + # raise serializers.ValidationError( + # _("Setting peers manually for control nodes is not allowed. Enable peers_from_control_nodes on the hop and execution nodes instead.") + # ) - if not listener_port and peers_from_control_nodes: - raise serializers.ValidationError(_("Field listener_port must be a valid integer when peers_from_control_nodes is enabled.")) + # if not listener_port and peers_from_control_nodes: + # raise serializers.ValidationError(_("Field listener_port must be a valid integer when peers_from_control_nodes is enabled.")) - if not listener_port and self.instance and self.instance.peers_from.exists(): - raise serializers.ValidationError(_("Field listener_port must be a valid integer when other nodes peer to it.")) + # if not listener_port and self.instance and self.instance.peers_from.exists(): + # raise serializers.ValidationError(_("Field listener_port must be a valid integer when other nodes peer to it.")) - for peer in peers: - if peer.listener_port is None: - raise serializers.ValidationError(_("Field listener_port must be set on peer ") + peer.hostname + ".") + # for peer in peers: + # if peer.listener_port is None: + # raise serializers.ValidationError(_("Field listener_port must be set on peer ") + peer.hostname + ".") - if not settings.IS_K8S: - if check_peers_changed(): - raise serializers.ValidationError(_("Cannot change peers.")) + # if not settings.IS_K8S: + # if check_peers_changed(): + # raise serializers.ValidationError(_("Cannot change peers.")) return super().validate(attrs) diff --git a/awx/main/management/commands/add_receptor_address.py b/awx/main/management/commands/add_receptor_address.py index 5ad1a044dc..d9b66eec9b 100644 --- a/awx/main/management/commands/add_receptor_address.py +++ b/awx/main/management/commands/add_receptor_address.py @@ -22,6 +22,7 @@ class Command(BaseCommand): parser.add_argument('--protocol', dest='protocol', type=str, default='tcp', choices=['tcp', 'ws'], help="Protocol of the backend connection") parser.add_argument('--websocket_path', dest='websocket_path', type=str, default="", help="Path for websockets") parser.add_argument('--is_internal', action='store_true', help="If true, address only resolvable within the Kubernetes cluster") + parser.add_argument('--peers_from_control_nodes', action='store_true', help="If true, control nodes will peer to this address") def _add_address(self, **kwargs): try: @@ -34,10 +35,9 @@ class Command(BaseCommand): self.changed = False print(f"Error adding receptor address: {e}") - @transaction.atomic def handle(self, **options): self.changed = False - address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal')} + address_options = {k: options[k] for k in ('hostname', 'address', 'port', 'protocol', 'websocket_path', 'is_internal', 'peers_from_control_nodes')} self._add_address(**address_options) if self.changed: print("(changed: True)") diff --git a/awx/main/management/commands/provision_instance.py b/awx/main/management/commands/provision_instance.py index be9a1a8102..e7f8063d61 100644 --- a/awx/main/management/commands/provision_instance.py +++ b/awx/main/management/commands/provision_instance.py @@ -28,9 +28,8 @@ class Command(BaseCommand): parser.add_argument('--listener_port', dest='listener_port', type=int, help="Receptor listener port") parser.add_argument('--node_type', type=str, default='hybrid', choices=['control', 'execution', 'hop', 'hybrid'], help="Instance Node type") parser.add_argument('--uuid', type=str, help="Instance UUID") - parser.add_argument('--peers_from_control_nodes', action='store_true', help="If set, will automatically peer from control nodes") - def _register_hostname(self, hostname, node_type, uuid, listener_port, peers_from_control_nodes): + def _register_hostname(self, hostname, node_type, uuid, listener_port): if not hostname: if not settings.AWX_AUTO_DEPROVISION_INSTANCES: raise CommandError('Registering with values from settings only intended for use in K8s installs') @@ -52,9 +51,7 @@ class Command(BaseCommand): max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS, ).register() else: - (changed, instance) = Instance.objects.register( - hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port, peers_from_control_nodes=peers_from_control_nodes - ) + (changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, node_uuid=uuid, listener_port=listener_port) if changed: print("Successfully registered instance {}".format(hostname)) else: @@ -64,8 +61,6 @@ class Command(BaseCommand): @transaction.atomic def handle(self, **options): self.changed = False - self._register_hostname( - options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port'), options.get('peers_from_control_nodes') - ) + self._register_hostname(options.get('hostname'), options.get('node_type'), options.get('uuid'), options.get('listener_port')) if self.changed: print("(changed: True)") diff --git a/awx/main/managers.py b/awx/main/managers.py index ce5bef7e60..747f9d4467 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -115,7 +115,7 @@ class InstanceManager(models.Manager): return node[0] raise RuntimeError("No instance found with the current cluster host id") - def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', peers_from_control_nodes=False, defaults=None): + def register(self, node_uuid=None, hostname=None, ip_address="", listener_port=None, node_type='hybrid', defaults=None): if not hostname: hostname = settings.CLUSTER_HOST_ID @@ -180,13 +180,5 @@ class InstanceManager(models.Manager): uuid_option = {'uuid': node_uuid if node_uuid is not None else uuid.uuid4()} if node_type == 'execution' and 'version' not in create_defaults: create_defaults['version'] = RECEPTOR_PENDING - instance = self.create( - hostname=hostname, - ip_address=ip_address, - listener_port=listener_port, - node_type=node_type, - peers_from_control_nodes=peers_from_control_nodes, - **create_defaults, - **uuid_option - ) + instance = self.create(hostname=hostname, ip_address=ip_address, listener_port=listener_port, node_type=node_type, **create_defaults, **uuid_option) return (True, instance) diff --git a/awx/main/migrations/0188_inbound_hop_nodes.py b/awx/main/migrations/0188_inbound_hop_nodes.py new file mode 100644 index 0000000000..77570c2ef9 --- /dev/null +++ b/awx/main/migrations/0188_inbound_hop_nodes.py @@ -0,0 +1,59 @@ +# Generated by Django 4.2.5 on 2023-10-10 00:26 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ('main', '0187_hop_nodes'), + ] + + operations = [ + migrations.CreateModel( + name='ReceptorAddress', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('address', models.CharField(max_length=255)), + ('port', models.IntegerField()), + ('protocol', models.CharField(max_length=10)), + ('websocket_path', models.CharField(blank=True, default='', max_length=255)), + ('is_internal', models.BooleanField(default=False)), + ('peers_from_control_nodes', models.BooleanField(default=False)), + ], + ), + migrations.RemoveConstraint( + model_name='instancelink', + name='source_and_target_can_not_be_equal', + ), + migrations.AlterUniqueTogether( + name='instancelink', + unique_together=set(), + ), + migrations.AlterField( + model_name='instancelink', + name='source', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='main.instance'), + ), + migrations.AddField( + model_name='receptoraddress', + name='instance', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance'), + ), + migrations.AlterField( + model_name='instance', + name='peers', + field=models.ManyToManyField(related_name='peers_from', through='main.InstanceLink', to='main.receptoraddress'), + ), + migrations.AlterField( + model_name='instancelink', + name='target', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='main.receptoraddress'), + ), + migrations.AddConstraint( + model_name='receptoraddress', + constraint=models.UniqueConstraint( + fields=('address', 'protocol'), name='unique_receptor_address', violation_error_message='Receptor address + protocol must be unique.' + ), + ), + ] diff --git a/awx/main/migrations/0188_inbound_ingress.py b/awx/main/migrations/0188_inbound_ingress.py deleted file mode 100644 index 8ed17864ea..0000000000 --- a/awx/main/migrations/0188_inbound_ingress.py +++ /dev/null @@ -1,43 +0,0 @@ -# Generated by Django 4.2.5 on 2023-10-04 06:51 - -from django.db import migrations, models -import django.db.models.deletion - - -class Migration(migrations.Migration): - dependencies = [ - ('main', '0187_hop_nodes'), - ] - - operations = [ - migrations.CreateModel( - name='ReceptorAddress', - fields=[ - ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('address', models.CharField(max_length=255)), - ('port', models.IntegerField(null=True)), - ('protocol', models.CharField(max_length=10)), - ('websocket_path', models.CharField(blank=True, default='', max_length=255)), - ('is_internal', models.BooleanField(default=False)), - ('instance', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='receptor_addresses', to='main.instance')), - ], - ), - migrations.AddConstraint( - model_name='receptoraddress', - constraint=models.UniqueConstraint( - condition=models.Q(('port', None)), - fields=('address', 'protocol', 'websocket_path'), - name='unique_receptor_address_no_port', - violation_error_message='Receptor address must be unique.', - ), - ), - migrations.AddConstraint( - model_name='receptoraddress', - constraint=models.UniqueConstraint( - condition=models.Q(('port', None), _negated=True), - fields=('address', 'port', 'protocol', 'websocket_path'), - name='unique_receptor_address_with_port', - violation_error_message='Receptor address must be unique.', - ), - ), - ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 5ec92e26f2..142bf19dd6 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -34,6 +34,7 @@ from awx.main.models.rbac import ( from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity from awx.main.models.mixins import RelatedJobsMixin, ResourceMixin +from awx.main.models.receptor_address import ReceptorAddress # ansible-runner from ansible_runner.utils.capacity import get_cpu_count, get_mem_in_bytes @@ -64,8 +65,11 @@ class HasPolicyEditsMixin(HasEditsMixin): class InstanceLink(BaseModel): - source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+') - target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers') + class Meta: + ordering = ("id",) + + source = models.ForeignKey('Instance', on_delete=models.CASCADE) + target = models.ForeignKey('ReceptorAddress', on_delete=models.CASCADE) class States(models.TextChoices): ADDING = 'adding', _('Adding') @@ -76,11 +80,6 @@ class InstanceLink(BaseModel): choices=States.choices, default=States.ADDING, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.") ) - class Meta: - unique_together = ('source', 'target') - ordering = ("id",) - constraints = [models.CheckConstraint(check=~models.Q(source=models.F('target')), name='source_and_target_can_not_be_equal')] - class Instance(HasPolicyEditsMixin, BaseModel): """A model representing an AWX instance running against this database.""" @@ -194,7 +193,7 @@ class Instance(HasPolicyEditsMixin, BaseModel): help_text=_("Port that Receptor will listen for incoming connections on."), ) - peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') + peers = models.ManyToManyField('ReceptorAddress', through=InstanceLink, through_fields=('source', 'target'), related_name='peers_from') peers_from_control_nodes = models.BooleanField(default=False, help_text=_("If True, control plane cluster nodes should automatically peer to it.")) POLICY_FIELDS = frozenset(('managed_by_policy', 'hostname', 'capacity_adjustment')) @@ -502,6 +501,29 @@ def schedule_write_receptor_config(broadcast=True): write_receptor_config() # just run locally +# TODO: don't use the receiver post save, just call this at the moment when we need it +# that way we don't call this multiple times unnecessarily +@receiver(post_save, sender=ReceptorAddress) +def receptor_address_saved(sender, instance, **kwargs): + from awx.main.signals import disable_activity_stream + + address = instance + + with disable_activity_stream(): + control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) + if address.peers_from_control_nodes: + address.peers_from.add(*control_instances) + else: + address.peers_from.remove(*control_instances) + + schedule_write_receptor_config() + + +@receiver(post_delete, sender=ReceptorAddress) +def receptor_address_deleted(sender, instance, **kwargs): + schedule_write_receptor_config() + + @receiver(post_save, sender=Instance) def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): ''' @@ -512,10 +534,12 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): 2. a node changes its value of peers_from_control_nodes 3. a new control node comes online and has instances to peer to ''' + from awx.main.signals import disable_activity_stream + if created and settings.IS_K8S and instance.node_type in [Instance.Types.CONTROL, Instance.Types.HYBRID]: - inst = Instance.objects.filter(peers_from_control_nodes=True) - if set(instance.peers.all()) != set(inst): - instance.peers.set(inst) + peers_address = ReceptorAddress.objects.filter(peers_from_control_nodes=True) + with disable_activity_stream(): + instance.peers.add(*peers_address) schedule_write_receptor_config(broadcast=False) if settings.IS_K8S and instance.node_type in [Instance.Types.HOP, Instance.Types.EXECUTION]: @@ -525,16 +549,6 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): # wait for jobs on the node to complete, then delete the # node and kick off write_receptor_config connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname])) - else: - control_instances = set(Instance.objects.filter(node_type__in=[Instance.Types.CONTROL, Instance.Types.HYBRID])) - if instance.peers_from_control_nodes: - if (control_instances & set(instance.peers_from.all())) != set(control_instances): - instance.peers_from.add(*control_instances) - schedule_write_receptor_config() # keep method separate to make pytest mocking easier - else: - if set(control_instances) & set(instance.peers_from.all()): - instance.peers_from.remove(*control_instances) - schedule_write_receptor_config() if created or instance.has_policy_changes(): schedule_policy_task() diff --git a/awx/main/models/receptor_address.py b/awx/main/models/receptor_address.py index dd0ff4b411..c47d229912 100644 --- a/awx/main/models/receptor_address.py +++ b/awx/main/models/receptor_address.py @@ -5,32 +5,24 @@ from django.dispatch import receiver from awx.api.versioning import reverse from django.db.models import Sum, Q -from awx.main.models.ha import schedule_write_receptor_config - class ReceptorAddress(models.Model): class Meta: app_label = 'main' constraints = [ models.UniqueConstraint( - fields=["address", "protocol", "websocket_path"], - condition=Q(port=None), - name="unique_receptor_address_no_port", - violation_error_message=_("Receptor address must be unique."), - ), - models.UniqueConstraint( - fields=["address", "port", "protocol", "websocket_path"], - condition=~Q(port=None), - name="unique_receptor_address_with_port", - violation_error_message=_("Receptor address must be unique."), - ), + fields=["address", "protocol"], + name="unique_receptor_address", + violation_error_message=_("Receptor address + protocol must be unique."), + ) ] address = models.CharField(max_length=255) - port = models.IntegerField(null=True) + port = models.IntegerField(blank=False) protocol = models.CharField(max_length=10) websocket_path = models.CharField(max_length=255, default="", blank=True) is_internal = models.BooleanField(default=False) + peers_from_control_nodes = models.BooleanField(default=False) instance = models.ForeignKey( 'Instance', related_name='receptor_addresses', @@ -62,13 +54,3 @@ class ReceptorAddress(models.Model): def get_absolute_url(self, request=None): return reverse('api:receptor_address_detail', kwargs={'pk': self.pk}, request=request) - - -@receiver(post_save, sender=ReceptorAddress) -def receptor_address_saved(sender, instance, **kwargs): - schedule_write_receptor_config(True) - - -@receiver(post_delete, sender=ReceptorAddress) -def receptor_address_deleted(sender, instance, **kwargs): - schedule_write_receptor_config(True) diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index b1bf657f32..17d4208c66 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -27,7 +27,7 @@ from awx.main.utils.common import ( ) from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER from awx.main.tasks.signals import signal_state, signal_callback, SignalExit -from awx.main.models import Instance, InstanceLink, UnifiedJob +from awx.main.models import Instance, InstanceLink, UnifiedJob, ReceptorAddress from awx.main.dispatch import get_task_queuename from awx.main.dispatch.publish import task from awx.main.utils.pglock import advisory_lock @@ -676,49 +676,44 @@ RECEPTOR_CONFIG_STARTER = ( ) -def should_update_config(instances): +def should_update_config(new_config): ''' checks that the list of instances matches the list of tcp-peers in the config ''' - current_config = read_receptor_config() # this gets receptor conf lock - current_peers = [] - for config_entry in current_config: - for key, value in config_entry.items(): - if key.endswith('-peer'): - current_peers.append(value['address']) - intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances] - logger.debug(f"Peers current {current_peers} intended {intended_peers}") - if set(current_peers) == set(intended_peers): - return False # config file is already update to date - return True + current_config = read_receptor_config() # this gets receptor conf lock + for config_entry in current_config: + if config_entry not in new_config: + logger.warning(f"{config_entry} should not be in receptor config. Updating.") + return True + for config_entry in new_config: + if config_entry not in current_config: + logger.warning(f"{config_entry} missing from receptor config. Updating.") + return True + + return False def generate_config_data(): # returns two values # receptor config - based on current database peers # should_update - If True, receptor_config differs from the receptor conf file on disk - instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True) + addresses = ReceptorAddress.objects.filter(peers_from_control_nodes=True) receptor_config = list(RECEPTOR_CONFIG_STARTER) - for instance in instances: - # receptor_addresses = instance.receptor_addresses.all() - # if not receptor_addresses.exists() and instance.listener_port: - # peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}} - # receptor_config.append(peer) - for address in instance.receptor_addresses.all(): - if address.get_peer_type() and address.is_internal: - peer = { - f'{address.get_peer_type()}': { - 'address': f'{address.get_full_address()}', - 'tls': 'tlsclient', - } + for address in addresses: + if address.get_peer_type() and address.is_internal: + peer = { + f'{address.get_peer_type()}': { + 'address': f'{address.get_full_address()}', + 'tls': 'tlsclient', } - receptor_config.append(peer) - else: - logger.warning(f"Receptor address {address} has unsupported peer type, skipping.") - should_update = should_update_config(instances) + } + receptor_config.append(peer) + else: + logger.warning(f"Receptor address {address} has unsupported peer type, skipping.") + should_update = should_update_config(receptor_config) return receptor_config, should_update @@ -760,14 +755,13 @@ def write_receptor_config(): with lock: with open(__RECEPTOR_CONF, 'w') as file: yaml.dump(receptor_config, file, default_flow_style=False) - reload_receptor() @task(queue=get_task_queuename) def remove_deprovisioned_node(hostname): InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) - InstanceLink.objects.filter(target__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) + InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) node_jobs = UnifiedJob.objects.filter( execution_node=hostname, diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index b32ae7b5e5..4614b7b4a4 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -495,7 +495,7 @@ def inspect_established_receptor_connections(mesh_status): update_links = [] for link in all_links: if link.link_state != InstanceLink.States.REMOVING: - if link.target.hostname in active_receptor_conns.get(link.source.hostname, {}): + if link.target.instance.hostname in active_receptor_conns.get(link.source.hostname, {}): if link.link_state is not InstanceLink.States.ESTABLISHED: link.link_state = InstanceLink.States.ESTABLISHED update_links.append(link)