Change the ActivityStream registration for InstanceGroups

to include the m2m fields.  Also to avoid spamminess, disable the
activity stream on the apply_cluster_membership_policies task.
This commit is contained in:
Jeff Bradberry
2021-10-21 10:08:42 -04:00
committed by Shane McDonald
parent 62d50d27be
commit 7010015e8a
3 changed files with 16 additions and 21 deletions

View File

@@ -202,6 +202,7 @@ activity_stream_registrar.connect(Inventory)
activity_stream_registrar.connect(Host) activity_stream_registrar.connect(Host)
activity_stream_registrar.connect(Group) activity_stream_registrar.connect(Group)
activity_stream_registrar.connect(Instance) activity_stream_registrar.connect(Instance)
activity_stream_registrar.connect(InstanceGroup)
activity_stream_registrar.connect(InventorySource) activity_stream_registrar.connect(InventorySource)
# activity_stream_registrar.connect(InventoryUpdate) # activity_stream_registrar.connect(InventoryUpdate)
activity_stream_registrar.connect(Credential) activity_stream_registrar.connect(Credential)

View File

@@ -676,9 +676,3 @@ def create_access_token_user_if_missing(sender, **kwargs):
post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken) post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
obj.save() obj.save()
post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken) post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
# Connect the Instance Group to Activity Stream receivers.
post_save.connect(activity_stream_create, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_create")
pre_save.connect(activity_stream_update, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_update")
pre_delete.connect(activity_stream_delete, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_delete")

View File

@@ -191,6 +191,8 @@ def inform_cluster_of_shutdown():
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def apply_cluster_membership_policies(): def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream
started_waiting = time.time() started_waiting = time.time()
with advisory_lock('cluster_policy_lock', wait=True): with advisory_lock('cluster_policy_lock', wait=True):
lock_time = time.time() - started_waiting lock_time = time.time() - started_waiting
@@ -282,21 +284,19 @@ def apply_cluster_membership_policies():
# On a differential basis, apply instances to groups # On a differential basis, apply instances to groups
with transaction.atomic(): with transaction.atomic():
for g in actual_groups: with disable_activity_stream():
if g.obj.is_container_group: for g in actual_groups:
logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) if g.obj.is_container_group:
continue logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name))
instances_to_add = set(g.instances) - set(g.prior_instances) continue
instances_to_remove = set(g.prior_instances) - set(g.instances) instances_to_add = set(g.instances) - set(g.prior_instances)
# The following writes to the db don't spam the activity stream, because instances_to_remove = set(g.prior_instances) - set(g.instances)
# InstanceGroup is special-cased in signals.py to connect to only the non-m2m if instances_to_add:
# signal handlers. logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
if instances_to_add: g.obj.instances.add(*instances_to_add)
logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name)) if instances_to_remove:
g.obj.instances.add(*instances_to_add) logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
if instances_to_remove: g.obj.instances.remove(*instances_to_remove)
logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
g.obj.instances.remove(*instances_to_remove)
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))