Merge pull request #5376 from ansible/enable-instance-activitystream

[4.1] ActivityStream capture for Instances
This commit is contained in:
Jeff Bradberry
2021-10-21 15:41:02 -04:00
committed by GitHub
9 changed files with 96 additions and 22 deletions

View File

@@ -5003,6 +5003,7 @@ class ActivityStreamSerializer(BaseSerializer):
('credential_type', ('id', 'name', 'description', 'kind', 'managed')),
('ad_hoc_command', ('id', 'name', 'status', 'limit')),
('workflow_approval', ('id', 'name', 'unified_job_id')),
('instance', ('id', 'hostname')),
]
return field_list

View File

@@ -201,6 +201,8 @@ activity_stream_registrar.connect(Organization)
activity_stream_registrar.connect(Inventory)
activity_stream_registrar.connect(Host)
activity_stream_registrar.connect(Group)
activity_stream_registrar.connect(Instance)
activity_stream_registrar.connect(InstanceGroup)
activity_stream_registrar.connect(InventorySource)
# activity_stream_registrar.connect(InventoryUpdate)
activity_stream_registrar.connect(Credential)

View File

@@ -34,7 +34,6 @@ from awx.main.models import (
ExecutionEnvironment,
Group,
Host,
InstanceGroup,
Inventory,
InventorySource,
Job,
@@ -377,6 +376,7 @@ def model_serializer_mapping():
models.Inventory: serializers.InventorySerializer,
models.Host: serializers.HostSerializer,
models.Group: serializers.GroupSerializer,
models.Instance: serializers.InstanceSerializer,
models.InstanceGroup: serializers.InstanceGroupSerializer,
models.InventorySource: serializers.InventorySourceSerializer,
models.Credential: serializers.CredentialSerializer,
@@ -675,9 +675,3 @@ def create_access_token_user_if_missing(sender, **kwargs):
post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
obj.save()
post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
# Connect the Instance Group to Activity Stream receivers.
post_save.connect(activity_stream_create, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_create")
pre_save.connect(activity_stream_update, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_update")
pre_delete.connect(activity_stream_delete, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_delete")

View File

@@ -191,6 +191,8 @@ def inform_cluster_of_shutdown():
@task(queue=get_local_queuename)
def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream
started_waiting = time.time()
with advisory_lock('cluster_policy_lock', wait=True):
lock_time = time.time() - started_waiting
@@ -282,18 +284,19 @@ def apply_cluster_membership_policies():
# On a differential basis, apply instances to groups
with transaction.atomic():
for g in actual_groups:
if g.obj.is_container_group:
logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name))
continue
instances_to_add = set(g.instances) - set(g.prior_instances)
instances_to_remove = set(g.prior_instances) - set(g.instances)
if instances_to_add:
logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
g.obj.instances.add(*instances_to_add)
if instances_to_remove:
logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
g.obj.instances.remove(*instances_to_remove)
with disable_activity_stream():
for g in actual_groups:
if g.obj.is_container_group:
logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name))
continue
instances_to_add = set(g.instances) - set(g.prior_instances)
instances_to_remove = set(g.prior_instances) - set(g.instances)
if instances_to_add:
logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
g.obj.instances.add(*instances_to_add)
if instances_to_remove:
logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
g.obj.instances.remove(*instances_to_remove)
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))

View File

@@ -3,6 +3,7 @@ import pytest
from unittest import mock
from awx.api.versioning import reverse
from awx.main.models.activity_stream import ActivityStream
from awx.main.models.ha import Instance
import redis
@@ -17,6 +18,7 @@ INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_c
@pytest.mark.django_db
def test_disabled_zeros_capacity(patch, admin_user):
instance = Instance.objects.create(**INSTANCE_KWARGS)
assert ActivityStream.objects.filter(instance=instance).count() == 1
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
@@ -25,12 +27,14 @@ def test_disabled_zeros_capacity(patch, admin_user):
instance.refresh_from_db()
assert instance.capacity == 0
assert ActivityStream.objects.filter(instance=instance).count() == 2
@pytest.mark.django_db
def test_enabled_sets_capacity(patch, admin_user):
instance = Instance.objects.create(enabled=False, capacity=0, **INSTANCE_KWARGS)
assert instance.capacity == 0
assert ActivityStream.objects.filter(instance=instance).count() == 1
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
@@ -39,6 +43,7 @@ def test_enabled_sets_capacity(patch, admin_user):
instance.refresh_from_db()
assert instance.capacity > 0
assert ActivityStream.objects.filter(instance=instance).count() == 2
@pytest.mark.django_db

View File

@@ -4,6 +4,7 @@ import pytest
from awx.api.versioning import reverse
from awx.main.models import (
ActivityStream,
Instance,
InstanceGroup,
ProjectUpdate,
@@ -213,9 +214,23 @@ def test_containerized_group_default_fields(instance_group, kube_credential):
def test_instance_attach_to_instance_group(post, instance_group, node_type_instance, admin, node_type):
instance = node_type_instance(hostname=node_type, node_type=node_type)
count = ActivityStream.objects.count()
url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk})
post(url, {'associate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400)
new_activity = ActivityStream.objects.all()[count:]
if node_type != 'control':
assert len(new_activity) == 2 # the second is an update of the instance group policy
new_activity = new_activity[0]
assert new_activity.operation == 'associate'
assert new_activity.object1 == 'instance_group'
assert new_activity.object2 == 'instance'
assert new_activity.instance.first() == instance
assert new_activity.instance_group.first() == instance_group
else:
assert not new_activity
@pytest.mark.django_db
@pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution'])
@@ -223,18 +238,46 @@ def test_instance_unattach_from_instance_group(post, instance_group, node_type_i
instance = node_type_instance(hostname=node_type, node_type=node_type)
instance_group.instances.add(instance)
count = ActivityStream.objects.count()
url = reverse(f'api:instance_group_instance_list', kwargs={'pk': instance_group.pk})
post(url, {'disassociate': True, 'id': instance.id}, admin, expect=204 if node_type != 'control' else 400)
new_activity = ActivityStream.objects.all()[count:]
if node_type != 'control':
assert len(new_activity) == 1
new_activity = new_activity[0]
assert new_activity.operation == 'disassociate'
assert new_activity.object1 == 'instance_group'
assert new_activity.object2 == 'instance'
assert new_activity.instance.first() == instance
assert new_activity.instance_group.first() == instance_group
else:
assert not new_activity
@pytest.mark.django_db
@pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution'])
def test_instance_group_attach_to_instance(post, instance_group, node_type_instance, admin, node_type):
instance = node_type_instance(hostname=node_type, node_type=node_type)
count = ActivityStream.objects.count()
url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk})
post(url, {'associate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400)
new_activity = ActivityStream.objects.all()[count:]
if node_type != 'control':
assert len(new_activity) == 2 # the second is an update of the instance group policy
new_activity = new_activity[0]
assert new_activity.operation == 'associate'
assert new_activity.object1 == 'instance'
assert new_activity.object2 == 'instance_group'
assert new_activity.instance.first() == instance
assert new_activity.instance_group.first() == instance_group
else:
assert not new_activity
@pytest.mark.django_db
@pytest.mark.parametrize('node_type', ['control', 'hybrid', 'execution'])
@@ -242,5 +285,19 @@ def test_instance_group_unattach_from_instance(post, instance_group, node_type_i
instance = node_type_instance(hostname=node_type, node_type=node_type)
instance_group.instances.add(instance)
count = ActivityStream.objects.count()
url = reverse(f'api:instance_instance_groups_list', kwargs={'pk': instance.pk})
post(url, {'disassociate': True, 'id': instance_group.id}, admin, expect=204 if node_type != 'control' else 400)
new_activity = ActivityStream.objects.all()[count:]
if node_type != 'control':
assert len(new_activity) == 1
new_activity = new_activity[0]
assert new_activity.operation == 'disassociate'
assert new_activity.object1 == 'instance'
assert new_activity.object2 == 'instance_group'
assert new_activity.instance.first() == instance
assert new_activity.instance_group.first() == instance_group
else:
assert not new_activity

View File

@@ -170,7 +170,7 @@ def test_activity_stream_actor(admin_user):
@pytest.mark.django_db
def test_annon_user_action():
def test_anon_user_action():
with mock.patch('awx.main.signals.get_current_user') as u_mock:
u_mock.return_value = AnonymousUser()
inv = Inventory.objects.create(name='ainventory')

View File

@@ -14,6 +14,6 @@ def test_multiple_instances():
@pytest.mark.django_db
def test_db_localhost():
Instance.objects.create(hostname='foo', node_type='hybrd')
Instance.objects.create(hostname='foo', node_type='hybrid')
Instance.objects.create(hostname='bar', node_type='execution')
assert is_ha_environment() is False

View File

@@ -2,6 +2,7 @@ import pytest
from unittest import mock
from awx.main.models import AdHocCommand, InventoryUpdate, JobTemplate, ProjectUpdate
from awx.main.models.activity_stream import ActivityStream
from awx.main.models.ha import Instance, InstanceGroup
from awx.main.tasks import apply_cluster_membership_policies
from awx.api.versioning import reverse
@@ -72,6 +73,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
i1 = instance_factory("i1")
i2 = instance_factory("i2")
i3 = instance_factory("i3")
ig_all = instance_group_factory("all", instances=[i1, i2, i3])
ig_dup = instance_group_factory("duplicates", instances=[i1])
project.organization.instance_groups.add(ig_all, ig_dup)
@@ -83,7 +85,7 @@ def test_instance_dup(org_admin, organization, project, instance_factory, instan
api_num_instances_oa = list(list_response2.data.items())[0][1]
assert actual_num_instances == api_num_instances_auditor
# Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in it's group, as expected
# Note: The org_admin will not see the default 'tower' node (instance fixture) because it is not in its group, as expected
assert api_num_instances_oa == (actual_num_instances - 1)
@@ -94,7 +96,13 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory)
ig_2 = instance_group_factory("ig2", percentage=25)
ig_3 = instance_group_factory("ig3", percentage=25)
ig_4 = instance_group_factory("ig4", percentage=25)
count = ActivityStream.objects.count()
apply_cluster_membership_policies()
# running apply_cluster_membership_policies shouldn't spam the activity stream
assert ActivityStream.objects.count() == count
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1
@@ -103,8 +111,12 @@ def test_policy_instance_few_instances(instance_factory, instance_group_factory)
assert i1 in ig_3.instances.all()
assert len(ig_4.instances.all()) == 1
assert i1 in ig_4.instances.all()
i2 = instance_factory("i2")
count += 1
apply_cluster_membership_policies()
assert ActivityStream.objects.count() == count
assert len(ig_1.instances.all()) == 1
assert i1 in ig_1.instances.all()
assert len(ig_2.instances.all()) == 1