Files
awx/awx/api/views/mixin.py
Dirk Julich 33d18f5e5e Fix cartesian product in organization user/admin count queries
The organizations list and detail endpoints annotated each org with user and admin counts using two Count() calls that traverse the Role.members M2M. Django generated two LEFT JOINs on the same through table, crossing every member row with every admin row before COUNT(DISTINCT) reduced the product.

At scale (2,617 members × 46,233 admins) this produced 120M intermediate rows and 96-second query times, causing 504 timeouts.

Replace with independent Subquery expressions that each query main_rbac_roles_members separately - no cross product.

Fixes: AAP-72817
Fixes: AAP-72480
2026-06-16 15:21:18 +02:00

229 lines
10 KiB
Python

# Copyright (c) 2018 Red Hat, Inc.
# All Rights Reserved.
import dateutil
import logging
from django.db.models import Count, IntegerField, OuterRef, Subquery
from django.db.models.functions import Coalesce
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import PermissionDenied
from rest_framework.response import Response
from rest_framework import status
from awx.main.constants import ACTIVE_STATES
from awx.main.models import Organization, Role
from awx.main.utils import get_object_or_400
from awx.main.models.ha import Instance, InstanceGroup, schedule_policy_task
from awx.main.models.organization import Team
from awx.main.models.projects import Project
from awx.main.models.inventory import Inventory
from awx.main.models.jobs import JobTemplate
from awx.api.exceptions import ActiveJobConflict
logger = logging.getLogger('awx.api.views.mixin')
class UnifiedJobDeletionMixin(object):
"""
Special handling when deleting a running unified job object.
"""
def destroy(self, request, *args, **kwargs):
obj = self.get_object()
if not request.user.can_access(self.model, 'delete', obj):
raise PermissionDenied()
try:
if obj.unified_job_node.workflow_job.status in ACTIVE_STATES:
raise PermissionDenied(detail=_('Cannot delete job resource when associated workflow job is running.'))
except self.model.unified_job_node.RelatedObjectDoesNotExist:
pass
# Still allow deletion of new status, because these can be manually created
if obj.status in ACTIVE_STATES and obj.status != 'new':
raise PermissionDenied(detail=_("Cannot delete running job resource."))
elif not obj.event_processing_finished:
# Prohibit deletion if job events are still coming in
if obj.finished and now() < obj.finished + dateutil.relativedelta.relativedelta(minutes=1):
# less than 1 minute has passed since job finished and events are not in
return Response({"error": _("Job has not finished processing events.")}, status=status.HTTP_400_BAD_REQUEST)
else:
# if it has been > 1 minute, events are probably lost
logger.warning('Allowing deletion of {} through the API without all events processed.'.format(obj.log_format))
# Manually cascade delete events if unpartitioned job
if obj.has_unpartitioned_events:
obj.get_event_queryset().delete()
obj.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class OrganizationInstanceGroupMembershipMixin(object):
"""
This mixin overloads attach/detach so that it calls Organization.save(),
to ensure instance group updates are persisted
"""
def unattach(self, request, *args, **kwargs):
with transaction.atomic():
organization_queryset = Organization.objects.select_for_update()
organization = organization_queryset.get(pk=self.get_parent_object().id)
response = super(OrganizationInstanceGroupMembershipMixin, self).unattach(request, *args, **kwargs)
organization.save()
return response
class InstanceGroupMembershipMixin(object):
"""
This mixin overloads attach/detach so that it calls InstanceGroup.save(),
triggering a background recalculation of policy-based instance group
membership.
"""
def attach(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).attach(request, *args, **kwargs)
if status.is_success(response.status_code):
sub_id = request.data.get('id', None)
if self.parent_model is Instance:
inst_name = self.get_parent_object().hostname
else:
inst_name = get_object_or_400(self.model, pk=sub_id).hostname
with transaction.atomic():
instance_groups_queryset = InstanceGroup.objects.select_for_update()
if self.parent_model is Instance:
ig_obj = get_object_or_400(instance_groups_queryset, pk=sub_id)
else:
# similar to get_parent_object, but selected for update
parent_filter = {self.lookup_field: self.kwargs.get(self.lookup_field, None)}
ig_obj = get_object_or_404(instance_groups_queryset, **parent_filter)
if inst_name not in ig_obj.policy_instance_list:
ig_obj.policy_instance_list.append(inst_name)
ig_obj.save(update_fields=['policy_instance_list'])
return response
def unattach(self, request, *args, **kwargs):
response = super(InstanceGroupMembershipMixin, self).unattach(request, *args, **kwargs)
if status.is_success(response.status_code):
sub_id = request.data.get('id', None)
if self.parent_model is Instance:
inst_name = self.get_parent_object().hostname
else:
inst_name = get_object_or_400(self.model, pk=sub_id).hostname
with transaction.atomic():
instance_groups_queryset = InstanceGroup.objects.select_for_update()
if self.parent_model is Instance:
ig_obj = get_object_or_400(instance_groups_queryset, pk=sub_id)
else:
# similar to get_parent_object, but selected for update
parent_filter = {self.lookup_field: self.kwargs.get(self.lookup_field, None)}
ig_obj = get_object_or_404(instance_groups_queryset, **parent_filter)
if inst_name in ig_obj.policy_instance_list:
ig_obj.policy_instance_list.pop(ig_obj.policy_instance_list.index(inst_name))
ig_obj.save(update_fields=['policy_instance_list'])
# sometimes removing an instance has a non-obvious consequence
# this is almost always true if policy_instance_percentage or _minimum is non-zero
# after removing a single instance, the other memberships need to be re-balanced
schedule_policy_task()
return response
class RelatedJobsPreventDeleteMixin(object):
def perform_destroy(self, obj):
self.check_related_active_jobs(obj)
return super(RelatedJobsPreventDeleteMixin, self).perform_destroy(obj)
def check_related_active_jobs(self, obj):
active_jobs = obj.get_active_jobs()
if len(active_jobs) > 0:
raise ActiveJobConflict(active_jobs)
time_cutoff = now() - dateutil.relativedelta.relativedelta(minutes=1)
recent_jobs = obj._get_related_jobs().filter(finished__gte=time_cutoff)
for unified_job in recent_jobs.get_real_instances():
if not unified_job.event_processing_finished:
raise PermissionDenied(_('Related job {} is still processing events.').format(unified_job.log_format))
class OrganizationCountsMixin(object):
def get_serializer_context(self, *args, **kwargs):
full_context = super(OrganizationCountsMixin, self).get_serializer_context(*args, **kwargs)
if self.request is None:
return full_context
db_results = {}
org_qs = self.model.accessible_objects(self.request.user, 'read_role')
org_id_list = org_qs.values('id')
if len(org_id_list) == 0:
if self.request.method == 'POST':
full_context['related_field_counts'] = {}
return full_context
inv_qs = Inventory.accessible_objects(self.request.user, 'read_role')
project_qs = Project.accessible_objects(self.request.user, 'read_role')
jt_qs = JobTemplate.accessible_objects(self.request.user, 'read_role')
# Produce counts of Foreign Key relationships
db_results['inventories'] = inv_qs.values('organization').annotate(Count('organization')).order_by('organization')
db_results['teams'] = (
Team.accessible_objects(self.request.user, 'read_role').values('organization').annotate(Count('organization')).order_by('organization')
)
db_results['job_templates'] = jt_qs.values('organization').annotate(Count('organization')).order_by('organization')
db_results['projects'] = project_qs.values('organization').annotate(Count('organization')).order_by('organization')
# Other members and admins of organization are always viewable
#
# Use independent subqueries instead of double-JOIN Count to avoid
# cartesian product.
RoleMember = Role.members.through
member_count = Subquery(
RoleMember.objects.filter(role_id=OuterRef('member_role_id')).values('role_id').annotate(cnt=Count('user_id', distinct=True)).values('cnt'),
output_field=IntegerField(),
)
admin_count = Subquery(
RoleMember.objects.filter(role_id=OuterRef('admin_role_id')).values('role_id').annotate(cnt=Count('user_id', distinct=True)).values('cnt'),
output_field=IntegerField(),
)
db_results['users'] = org_qs.annotate(
users=Coalesce(member_count, 0),
admins=Coalesce(admin_count, 0),
).values('id', 'users', 'admins')
count_context = {}
for org in org_id_list:
org_id = org['id']
count_context[org_id] = {'inventories': 0, 'teams': 0, 'users': 0, 'job_templates': 0, 'admins': 0, 'projects': 0}
for res, count_qs in db_results.items():
if res == 'users':
org_reference = 'id'
else:
org_reference = 'organization'
for entry in count_qs:
org_id = entry[org_reference]
if org_id in count_context:
if res == 'users':
count_context[org_id]['admins'] = entry['admins']
count_context[org_id]['users'] = entry['users']
continue
count_context[org_id][res] = entry['%s__count' % org_reference]
full_context['related_field_counts'] = count_context
return full_context
class NoTruncateMixin(object):
def get_serializer_context(self):
context = super().get_serializer_context()
if self.request.query_params.get('no_truncate'):
context.update(no_truncate=True)
return context