AAP-16926 Delete unpartitioned tables in a separate transaction (#14572)

This commit is contained in:
Alan Rominger
2023-10-13 15:50:51 -04:00
committed by GitHub
parent 376993077a
commit 07f49f5925

View File

@@ -9,6 +9,7 @@ import re
# Django # Django
from django.apps import apps
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connection from django.db import transaction, connection
from django.db.models import Min, Max from django.db.models import Min, Max
@@ -198,35 +199,56 @@ class Command(BaseCommand):
delete_meta.delete_jobs() delete_meta.delete_jobs()
return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count) return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count)
def _handle_unpartitioned_events(self, model, pk_list): def has_unpartitioned_table(self, model):
"""
If unpartitioned job events remain, it will cascade those from jobs in pk_list
if the unpartitioned table is no longer necessary, it will drop the table
"""
tblname = unified_job_class_to_event_table_name(model) tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';") cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';")
row = cursor.fetchone() row = cursor.fetchone()
if row is None: if row is None:
self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated') return False
return return True
if pk_list:
with connection.cursor() as cursor: def _delete_unpartitioned_table(self, model):
pk_list_csv = ','.join(map(str, pk_list)) "If the unpartitioned table is no longer necessary, it will drop the table"
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})") tblname = unified_job_class_to_event_table_name(model)
if not self.has_unpartitioned_table(model):
self.logger.debug(f'Table _unpartitioned_{tblname} does not exist, you are fully migrated.')
return
with connection.cursor() as cursor: with connection.cursor() as cursor:
# same as UnpartitionedJobEvent.objects.aggregate(Max('created')) # same as UnpartitionedJobEvent.objects.aggregate(Max('created'))
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"') cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}";')
row = cursor.fetchone() row = cursor.fetchone()
last_created = row[0] last_created = row[0]
if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}') if last_created:
else: self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it') else:
if (last_created is None) or (last_created < self.cutoff): self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')
self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}')
cursor.execute(f'DROP TABLE _unpartitioned_{tblname}') if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(
f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}\n'
'WARNING - this will happen in a separate transaction so a failure will not roll back prior cleanup'
)
with connection.cursor() as cursor:
cursor.execute(f'DROP TABLE _unpartitioned_{tblname};')
def _delete_unpartitioned_events(self, model, pk_list):
"If unpartitioned job events remain, it will cascade those from jobs in pk_list"
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key
# Bail if the unpartitioned table does not exist anymore
if not self.has_unpartitioned_table(model):
return
# Table still exists, delete individual unpartitioned events
if pk_list:
with connection.cursor() as cursor:
self.logger.debug(f'Deleting {len(pk_list)} events from _unpartitioned_{tblname}, use a longer cleanup window to delete the table.')
pk_list_csv = ','.join(map(str, pk_list))
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv});")
def cleanup_jobs(self): def cleanup_jobs(self):
# Hack to avoid doing N+1 queries as each item in the Job query set does # Hack to avoid doing N+1 queries as each item in the Job query set does
@@ -249,7 +271,8 @@ class Command(BaseCommand):
_, results = qs_batch.delete() _, results = qs_batch.delete()
deleted += results['main.Job'] deleted += results['main.Job']
self._handle_unpartitioned_events(Job, pk_list) # Avoid dropping the job event table in case we have interacted with it already
self._delete_unpartitioned_events(Job, pk_list)
return skipped, deleted return skipped, deleted
@@ -272,7 +295,7 @@ class Command(BaseCommand):
deleted += 1 deleted += 1
if not self.dry_run: if not self.dry_run:
self._handle_unpartitioned_events(AdHocCommand, pk_list) self._delete_unpartitioned_events(AdHocCommand, pk_list)
skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count() skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted return skipped, deleted
@@ -300,7 +323,7 @@ class Command(BaseCommand):
deleted += 1 deleted += 1
if not self.dry_run: if not self.dry_run:
self._handle_unpartitioned_events(ProjectUpdate, pk_list) self._delete_unpartitioned_events(ProjectUpdate, pk_list)
skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count() skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted return skipped, deleted
@@ -328,7 +351,7 @@ class Command(BaseCommand):
deleted += 1 deleted += 1
if not self.dry_run: if not self.dry_run:
self._handle_unpartitioned_events(InventoryUpdate, pk_list) self._delete_unpartitioned_events(InventoryUpdate, pk_list)
skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count() skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted return skipped, deleted
@@ -352,7 +375,7 @@ class Command(BaseCommand):
deleted += 1 deleted += 1
if not self.dry_run: if not self.dry_run:
self._handle_unpartitioned_events(SystemJob, pk_list) self._delete_unpartitioned_events(SystemJob, pk_list)
skipped += SystemJob.objects.filter(created__gte=self.cutoff).count() skipped += SystemJob.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted return skipped, deleted
@@ -397,7 +420,6 @@ class Command(BaseCommand):
skipped += Notification.objects.filter(created__gte=self.cutoff).count() skipped += Notification.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted return skipped, deleted
@transaction.atomic
def handle(self, *args, **options): def handle(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))
self.init_logging() self.init_logging()
@@ -425,19 +447,29 @@ class Command(BaseCommand):
del s.receivers[:] del s.receivers[:]
s.sender_receivers_cache.clear() s.sender_receivers_cache.clear()
for m in model_names: with transaction.atomic():
if m not in models_to_cleanup: for m in models_to_cleanup:
continue skipped, deleted = getattr(self, 'cleanup_%s' % m)()
skipped, deleted = getattr(self, 'cleanup_%s' % m)() func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition
func = getattr(self, 'cleanup_%s_partition' % m, None) if self.dry_run:
if func: self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
skipped_partition, deleted_partition = func() else:
skipped += skipped_partition self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)
deleted += deleted_partition
if self.dry_run: # Deleting unpartitioned tables cannot be done in same transaction as updates to related tables
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped) if not self.dry_run:
else: with transaction.atomic():
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped) for m in models_to_cleanup:
unified_job_class_name = m[:-1].title().replace('Management', 'System').replace('_', '')
unified_job_class = apps.get_model('main', unified_job_class_name)
try:
unified_job_class().event_class
except (NotImplementedError, AttributeError):
continue # no need to run this for models without events
self._delete_unpartitioned_table(unified_job_class)