diff --git a/awx/api/serializers.py b/awx/api/serializers.py index c3e2a427f9..e8df13ab3e 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -5003,6 +5003,7 @@ class ActivityStreamSerializer(BaseSerializer): ('credential_type', ('id', 'name', 'description', 'kind', 'managed')), ('ad_hoc_command', ('id', 'name', 'status', 'limit')), ('workflow_approval', ('id', 'name', 'unified_job_id')), + ('instance', ('id', 'hostname')), ] return field_list diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 0fab2cd4f6..f439a692fb 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -201,6 +201,8 @@ activity_stream_registrar.connect(Organization) activity_stream_registrar.connect(Inventory) activity_stream_registrar.connect(Host) activity_stream_registrar.connect(Group) +activity_stream_registrar.connect(Instance) +activity_stream_registrar.connect(InstanceGroup) activity_stream_registrar.connect(InventorySource) # activity_stream_registrar.connect(InventoryUpdate) activity_stream_registrar.connect(Credential) diff --git a/awx/main/signals.py b/awx/main/signals.py index 5caf7b45a8..8dde65342d 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -34,7 +34,6 @@ from awx.main.models import ( ExecutionEnvironment, Group, Host, - InstanceGroup, Inventory, InventorySource, Job, @@ -377,6 +376,7 @@ def model_serializer_mapping(): models.Inventory: serializers.InventorySerializer, models.Host: serializers.HostSerializer, models.Group: serializers.GroupSerializer, + models.Instance: serializers.InstanceSerializer, models.InstanceGroup: serializers.InstanceGroupSerializer, models.InventorySource: serializers.InventorySourceSerializer, models.Credential: serializers.CredentialSerializer, @@ -675,9 +675,3 @@ def create_access_token_user_if_missing(sender, **kwargs): post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken) obj.save() post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken) - - -# Connect the Instance Group to Activity Stream receivers. -post_save.connect(activity_stream_create, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_create") -pre_save.connect(activity_stream_update, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_update") -pre_delete.connect(activity_stream_delete, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_delete") diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 73344b066d..ac7dffc6e9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -191,6 +191,8 @@ def inform_cluster_of_shutdown(): @task(queue=get_local_queuename) def apply_cluster_membership_policies(): + from awx.main.signals import disable_activity_stream + started_waiting = time.time() with advisory_lock('cluster_policy_lock', wait=True): lock_time = time.time() - started_waiting @@ -282,18 +284,19 @@ def apply_cluster_membership_policies(): # On a differential basis, apply instances to groups with transaction.atomic(): - for g in actual_groups: - if g.obj.is_container_group: - logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) - continue - instances_to_add = set(g.instances) - set(g.prior_instances) - instances_to_remove = set(g.prior_instances) - set(g.instances) - if instances_to_add: - logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) - g.obj.instances.add(*instances_to_add) - if instances_to_remove: - logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name)) - g.obj.instances.remove(*instances_to_remove) + with disable_activity_stream(): + for g in actual_groups: + if g.obj.is_container_group: + logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) + continue + instances_to_add = set(g.instances) - set(g.prior_instances) + instances_to_remove = set(g.prior_instances) - set(g.instances) + if instances_to_add: + logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) + g.obj.instances.add(*instances_to_add) + if instances_to_remove: + logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name)) + g.obj.instances.remove(*instances_to_remove) logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) diff --git a/awx/main/tests/functional/api/test_instance.py b/awx/main/tests/functional/api/test_instance.py index b840ba7f29..c65cea0c01 100644 --- a/awx/main/tests/functional/api/test_instance.py +++ b/awx/main/tests/functional/api/test_instance.py @@ -3,6 +3,7 @@ import pytest from unittest import mock from awx.api.versioning import reverse +from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance import redis @@ -17,6 +18,7 @@ INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_c @pytest.mark.django_db def test_disabled_zeros_capacity(patch, admin_user): instance = Instance.objects.create(**INSTANCE_KWARGS) + assert ActivityStream.objects.filter(instance=instance).count() == 1 url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -25,12 +27,14 @@ def test_disabled_zeros_capacity(patch, admin_user): instance.refresh_from_db() assert instance.capacity == 0 + assert ActivityStream.objects.filter(instance=instance).count() == 2 @pytest.mark.django_db def test_enabled_sets_capacity(patch, admin_user): instance = Instance.objects.create(enabled=False, capacity=0, **INSTANCE_KWARGS) assert instance.capacity == 0 + assert ActivityStream.objects.filter(instance=instance).count() == 1 url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -39,6 +43,7 @@ def test_enabled_sets_capacity(patch, admin_user): instance.refresh_from_db() assert instance.capacity > 0 + assert ActivityStream.objects.filter(instance=instance).count() == 2 @pytest.mark.django_db diff --git a/awx/main/tests/functional/api/test_instance_group.py b/awx/main/tests/functional/api/test_instance_group.py index 5a787b6607..97b8abcff2 100644 --- a/awx/main/tests/functional/api/test_instance_group.py +++ b/awx/main/tests/functional/api/test_instance_group.py @@ -4,6 +4,7 @@ import pytest from awx.api.versioning import reverse from awx.main.models import ( + ActivityStream, Instance, InstanceGroup, ProjectUpdate, @@ -213,9 +214,23 @@ def test_containerized_group_default_fields(instance_group, kube_credential): def test_instance_attach_to_instance_group(post, instance_group, node_type_instance, admin, node_type): instance = node_type_instance(hostname=node_type, node_type=node_type) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk}) post(url, {'associate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 2 # the second is an update of the instance group policy + new_activity = new_activity[0] + assert new_activity.operation == 'associate' + assert new_activity.object1 == 'instance_group' + assert new_activity.object2 == 'instance' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) @@ -223,18 +238,46 @@ def test_instance_unattach_from_instance_group(post, instance_group, node_type_i instance = node_type_instance(hostname=node_type, node_type=node_type) instance_group.instances.add(instance) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk}) post(url, {'disassociate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 1 + new_activity = new_activity[0] + assert new_activity.operation == 'disassociate' + assert new_activity.object1 == 'instance_group' + assert new_activity.object2 == 'instance' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) def test_instance_group_attach_to_instance(post, instance_group, node_type_instance, admin, node_type): instance = node_type_instance(hostname=node_type, node_type=node_type) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk}) post(url, {'associate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400) + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 2 # the second is an update of the instance group policy + new_activity = new_activity[0] + assert new_activity.operation == 'associate' + assert new_activity.object1 == 'instance' + assert new_activity.object2 == 'instance_group' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity + @pytest.mark.django_db @pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution']) @@ -242,5 +285,19 @@ def test_instance_group_unattach_from_instance(post, instance_group, node_type_i instance = node_type_instance(hostname=node_type, node_type=node_type) instance_group.instances.add(instance) + count = ActivityStream.objects.count() + url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk}) post(url, {'disassociate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400) + + new_activity = ActivityStream.objects.all()[count:] + if node_type != 'control': + assert len(new_activity) == 1 + new_activity = new_activity[0] + assert new_activity.operation == 'disassociate' + assert new_activity.object1 == 'instance' + assert new_activity.object2 == 'instance_group' + assert new_activity.instance.first() == instance + assert new_activity.instance_group.first() == instance_group + else: + assert not new_activity diff --git a/awx/main/tests/functional/models/test_activity_stream.py b/awx/main/tests/functional/models/test_activity_stream.py index bc6c3e8c51..f8ae40b540 100644 --- a/awx/main/tests/functional/models/test_activity_stream.py +++ b/awx/main/tests/functional/models/test_activity_stream.py @@ -170,7 +170,7 @@ def test_activity_stream_actor(admin_user): @pytest.mark.django_db -def test_annon_user_action(): +def test_anon_user_action(): with mock.patch('awx.main.signals.get_current_user') as u_mock: u_mock.return_value = AnonymousUser() inv = Inventory.objects.create(name='ainventory') diff --git a/awx/main/tests/functional/test_ha.py b/awx/main/tests/functional/test_ha.py index bda4da35a5..0b4ced53c2 100644 --- a/awx/main/tests/functional/test_ha.py +++ b/awx/main/tests/functional/test_ha.py @@ -14,6 +14,6 @@ def test_multiple_instances(): @pytest.mark.django_db def test_db_localhost(): - Instance.objects.create(hostname='foo', node_type='hybrd') + Instance.objects.create(hostname='foo', node_type='hybrid') Instance.objects.create(hostname='bar', node_type='execution') assert is_ha_environment() is False diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index a0a06b4ae5..21a17ff2b5 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -2,6 +2,7 @@ import pytest from unittest import mock from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate, ProjectUpdate +from awx.main.models.activity_stream import ActivityStream from awx.main.models.ha import Instance, InstanceGroup from awx.main.tasks import apply_cluster_membership_policies from awx.api.versioning import reverse @@ -72,6 +73,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan i1 = instance_factory("i1") i2 = instance_factory("i2") i3 = instance_factory("i3") + ig_all = instance_group_factory("all", instances=[i1, i2, i3]) ig_dup = instance_group_factory("duplicates", instances=[i1]) project.organization.instance_groups.add(ig_all, ig_dup) @@ -83,7 +85,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan api_num_instances_oa = list(list_response2.data.items())[0][1] assert actual_num_instances == api_num_instances_auditor - # Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in it's group, as expected + # Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected assert api_num_instances_oa == (actual_num_instances - 1) @@ -94,7 +96,13 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory) ig_2 = instance_group_factory("ig2", percentage=25) ig_3 = instance_group_factory("ig3", percentage=25) ig_4 = instance_group_factory("ig4", percentage=25) + + count = ActivityStream.objects.count() + apply_cluster_membership_policies() + # running apply_cluster_membership_policies shouldn't spam the activity stream + assert ActivityStream.objects.count() == count + assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1 @@ -103,8 +111,12 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory) assert i1 in ig_3.instances.all() assert len(ig_4.instances.all()) == 1 assert i1 in ig_4.instances.all() + i2 = instance_factory("i2") + count += 1 apply_cluster_membership_policies() + assert ActivityStream.objects.count() == count + assert len(ig_1.instances.all()) == 1 assert i1 in ig_1.instances.all() assert len(ig_2.instances.all()) == 1