From f691bb2c1d1af97f51240c8b2bcd6eeca24cc1ca Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Tue, 19 Oct 2021 14:37:48 -0400 Subject: [PATCH 1/4] Enable ActivityStream capture for Instances --- awx/api/serializers.py | 1 + awx/main/models/__init__.py | 1 + awx/main/signals.py | 1 + awx/main/tasks.py | 3 +++ 4 files changed, 6 insertions(+) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index c3e2a427f9..e8df13ab3e 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5003,6 +5003,7 @@ class ActivityStreamSerializer(BaseSerializer): ('credential_type', ('id', 'name', 'description', 'kind', 'managed')), ('ad_hoc_command', ('id', 'name', 'status', 'limit')), ('workflow_approval', ('id', 'name', 'unified_job_id')), + ('instance', ('id', 'hostname')), ] return field_list diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 0fab2cd4f6..29b64a0758 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -201,6 +201,7 @@ activity_stream_registrar.connect(Organization) activity_stream_registrar.connect(Inventory) activity_stream_registrar.connect(Host) activity_stream_registrar.connect(Group) +activity_stream_registrar.connect(Instance) activity_stream_registrar.connect(InventorySource) # activity_stream_registrar.connect(InventoryUpdate) activity_stream_registrar.connect(Credential) diff --git a/awx/main/signals.py b/awx/main/signals.py index 5caf7b45a8..d8931dc266 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -377,6 +377,7 @@ def model_serializer_mapping(): models.Inventory: serializers.InventorySerializer, models.Host: serializers.HostSerializer, models.Group: serializers.GroupSerializer, + models.Instance: serializers.InstanceSerializer, models.InstanceGroup: serializers.InstanceGroupSerializer, models.InventorySource: serializers.InventorySourceSerializer, models.Credential: serializers.CredentialSerializer, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 2ec7659c00..0a9068bf33 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -288,6 +288,9 @@ def apply_cluster_membership_policies(): continue instances_to_add = set(g.instances) - set(g.prior_instances) instances_to_remove = set(g.prior_instances) - set(g.instances) + # The following writes to the db don't spam the activity stream, because + # InstanceGroup is special-cased in signals.py to connect to only the non-m2m + # signal handlers. if instances_to_add: logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) g.obj.instances.add(*instances_to_add) From d9b4f142b88427f38357d3b54bdb330522fa2e91 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Tue, 19 Oct 2021 16:56:46 -0400 Subject: [PATCH 2/4] Update a couple of the existing tests --- awx/main/tests/functional/api/test_instance.py | 5 +++++ .../functional/models/test_activity_stream.py | 2 +- awx/main/tests/functional/test_ha.py | 2 +- awx/main/tests/functional/test_instances.py | 14 +++++++++++++- 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/awx/main/tests/functional/api/test_instance.py b/awx/main/tests/functional/api/test_instance.py index b94b860b01..f3bdbf3092 100644 --- a/awx/main/tests/functional/api/test_instance.py +++ b/awx/main/tests/functional/api/test_instance.py @@ -3,6 +3,7 @@ import pytest from unittest import mock from awx.api.versioning import reverse +from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance import redis @@ -17,6 +18,7 @@ INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_c @pytest.mark.django_db def test_disabled_zeros_capacity(patch, admin_user): instance = Instance.objects.create(**INSTANCE_KWARGS) + assert ActivityStream.objects.filter(instance=instance).count() == 1 url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -25,12 +27,14 @@ def test_disabled_zeros_capacity(patch, admin_user): instance.refresh_from_db() assert instance.capacity == 0 + assert ActivityStream.objects.filter(instance=instance).count() == 2 @pytest.mark.django_db def test_enabled_sets_capacity(patch, admin_user): instance = Instance.objects.create(enabled=False, capacity=0, **INSTANCE_KWARGS) assert instance.capacity == 0 + assert ActivityStream.objects.filter(instance=instance).count() == 1 url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -39,6 +43,7 @@ def test_enabled_sets_capacity(patch, admin_user): instance.refresh_from_db() assert instance.capacity > 0 + assert ActivityStream.objects.filter(instance=instance).count() == 2 @pytest.mark.django_db diff --git a/awx/main/tests/functional/models/test_activity_stream.py b/awx/main/tests/functional/models/test_activity_stream.py index bc6c3e8c51..f8ae40b540 100644 --- a/awx/main/tests/functional/models/test_activity_stream.py +++ b/awx/main/tests/functional/models/test_activity_stream.py @@ -170,7 +170,7 @@ def test_activity_stream_actor(admin_user): @pytest.mark.django_db -def test_annon_user_action(): +def test_anon_user_action(): with mock.patch('awx.main.signals.get_current_user') as u_mock: u_mock.return_value = AnonymousUser() inv = Inventory.objects.create(name='ainventory') diff --git a/awx/main/tests/functional/test_ha.py b/awx/main/tests/functional/test_ha.py index bda4da35a5..0b4ced53c2 100644 --- a/awx/main/tests/functional/test_ha.py +++ b/awx/main/tests/functional/test_ha.py @@ -14,6 +14,6 @@ def test_multiple_instances(): @pytest.mark.django_db def test_db_localhost(): - Instance.objects.create(hostname='foo', node_type='hybrd') + Instance.objects.create(hostname='foo', node_type='hybrid') Instance.objects.create(hostname='bar', node_type='execution') assert is_ha_environment() is False diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index a0a06b4ae5..21a17ff2b5 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -2,6 +2,7 @@ import pytest from unittest import mock from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate, ProjectUpdate +from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance, InstanceGroup from awx.main.tasks import apply_cluster_membership_policies from awx.api.versioning import reverse @@ -72,6 +73,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") + ig_all = instance_group_factory("all", instances=[i1, i2, i3]) ig_dup = instance_group_factory("duplicates", instances=[i1]) project.organization.instance_groups.add(ig_all, ig_dup) @@ -83,7 +85,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan api_num_instances_oa = list(list_response2.data.items())[0][1] assert actual_num_instances == api_num_instances_auditor - # Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in it's group, as expected + # Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected assert api_num_instances_oa == (actual_num_instances - 1) @@ -94,7 +96,13 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) + + count = ActivityStream.objects.count() + apply_cluster_membership_policies() + # running apply_cluster_membership_policies shouldn't spam the activity stream + assert ActivityStream.objects.count() == count + assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -103,8 +111,12 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory) assert i1 in ig_3.instances.all() assert len(ig_4.instances.all()) == 1 assert i1 in ig_4.instances.all() + i2 = instance_factory("i2") + count += 1 apply_cluster_membership_policies() + assert ActivityStream.objects.count() == count + assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 From 7a1b0786f593cbefc6e0057c3b3e397f831c7633 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 21 Oct 2021 10:08:42 -0400 Subject: [PATCH 3/4] Change the ActivityStream registration for InstanceGroups to include the m2m fields. Also to avoid spamminess, disable the activity stream on the apply_cluster_membership_policies task. --- awx/main/models/__init__.py | 1 + awx/main/signals.py | 6 ------ awx/main/tasks.py | 30 +++++++++++++++--------------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 29b64a0758..f439a692fb 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -202,6 +202,7 @@ activity_stream_registrar.connect(Inventory) activity_stream_registrar.connect(Host) activity_stream_registrar.connect(Group) activity_stream_registrar.connect(Instance) +activity_stream_registrar.connect(InstanceGroup) activity_stream_registrar.connect(InventorySource) # activity_stream_registrar.connect(InventoryUpdate) activity_stream_registrar.connect(Credential) diff --git a/awx/main/signals.py b/awx/main/signals.py index d8931dc266..3cc57abb6a 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -676,9 +676,3 @@ def create_access_token_user_if_missing(sender, **kwargs): post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken) obj.save() post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken) - - -# Connect the Instance Group to Activity Stream receivers. -post_save.connect(activity_stream_create, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_create") -pre_save.connect(activity_stream_update, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_update") -pre_delete.connect(activity_stream_delete, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_delete") diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0a9068bf33..6039083cb6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -191,6 +191,8 @@ def inform_cluster_of_shutdown(): @task(queue=get_local_queuename) def apply_cluster_membership_policies(): + from awx.main.signals import disable_activity_stream + started_waiting = time.time() with advisory_lock('cluster_policy_lock', wait=True): lock_time = time.time() - started_waiting @@ -282,21 +284,19 @@ def apply_cluster_membership_policies(): # On a differential basis, apply instances to groups with transaction.atomic(): - for g in actual_groups: - if g.obj.is_container_group: - logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) - continue - instances_to_add = set(g.instances) - set(g.prior_instances) - instances_to_remove = set(g.prior_instances) - set(g.instances) - # The following writes to the db don't spam the activity stream, because - # InstanceGroup is special-cased in signals.py to connect to only the non-m2m - # signal handlers. - if instances_to_add: - logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) - g.obj.instances.add(*instances_to_add) - if instances_to_remove: - logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name)) - g.obj.instances.remove(*instances_to_remove) + with disable_activity_stream(): + for g in actual_groups: + if g.obj.is_container_group: + logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) + continue + instances_to_add = set(g.instances) - set(g.prior_instances) + instances_to_remove = set(g.prior_instances) - set(g.instances) + if instances_to_add: + logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) + g.obj.instances.add(*instances_to_add) + if instances_to_remove: + logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name)) + g.obj.instances.remove(*instances_to_remove) logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) From c8f65bafde388a465d980a458b4f7b16d9539b5b Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 21 Oct 2021 10:51:44 -0400 Subject: [PATCH 4/4] Adjust Instance-InstanceGroup tests to show that the ActivityStream is captured --- awx/main/signals.py | 1 - .../functional/api/test_instance_group.py | 57 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/awx/main/signals.py b/awx/main/signals.py index 3cc57abb6a..8dde65342d 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -34,7 +34,6 @@ from awx.main.models import ( ExecutionEnvironment, Group, Host, - InstanceGroup, Inventory, InventorySource, Job, diff --git a/awx/main/tests/functional/api/test_instance_group.py b/awx/main/tests/functional/api/test_instance_group.py index 5a787b6607..97b8abcff2 100644 --- a/awx/main/tests/functional/api/test_instance_group.py +++ b/awx/main/tests/functional/api/test_instance_group.py @@ -4,6 +4,7 @@ import pytest from awx.api.versioning import reverse from awx.main.models import ( + ActivityStream, Instance, InstanceGroup, ProjectUpdate, @@ -213,9 +214,23 @@ def test_containerized_group_default_fields(instance_group, kube_credential): def test_instance_attach_to_instance_group(post, instance_group, node_type_instance, admin, node_type): instance = node_type_instance(hostname=node_type, node_type=node_type) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk}) post(url, {'associate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 2 # the second is an update of the instance group policy + new_activity = new_activity[0] + assert new_activity.operation == 'associate' + assert new_activity.object1 == 'instance_group' + assert new_activity.object2 == 'instance' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) @@ -223,18 +238,46 @@ def test_instance_unattach_from_instance_group(post, instance_group, node_type_i instance = node_type_instance(hostname=node_type, node_type=node_type) instance_group.instances.add(instance) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk}) post(url, {'disassociate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 1 + new_activity = new_activity[0] + assert new_activity.operation == 'disassociate' + assert new_activity.object1 == 'instance_group' + assert new_activity.object2 == 'instance' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) def test_instance_group_attach_to_instance(post, instance_group, node_type_instance, admin, node_type): instance = node_type_instance(hostname=node_type, node_type=node_type) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk}) post(url, {'associate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 2 # the second is an update of the instance group policy + new_activity = new_activity[0] + assert new_activity.operation == 'associate' + assert new_activity.object1 == 'instance' + assert new_activity.object2 == 'instance_group' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) @@ -242,5 +285,19 @@ def test_instance_group_unattach_from_instance(post, instance_group, node_type_i instance = node_type_instance(hostname=node_type, node_type=node_type) instance_group.instances.add(instance) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk}) post(url, {'disassociate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400) + + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 1 + new_activity = new_activity[0] + assert new_activity.operation == 'disassociate' + assert new_activity.object1 == 'instance' + assert new_activity.object2 == 'instance_group' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity