DROP unnecessary unpartioned event tables (#14055)

This commit is contained in:
Alan Rominger 2023-10-03 11:49:23 -04:00 committed by GitHub
parent 05582702c6
commit f5922f76fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -195,14 +195,35 @@ class Command(BaseCommand):
delete_meta.delete_jobs()
return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count)
def _cascade_delete_job_events(self, model, pk_list):
def _handle_unpartitioned_events(self, model, pk_list):
"""
If unpartitioned job events remain, it will cascade those from jobs in pk_list
if the unpartitioned table is no longer necessary, it will drop the table
"""
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key
with connection.cursor() as cursor:
cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';")
row = cursor.fetchone()
if row is None:
self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated')
return
if pk_list:
with connection.cursor() as cursor:
tblname = unified_job_class_to_event_table_name(model)
pk_list_csv = ','.join(map(str, pk_list))
rel_name = model().event_parent_key
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})")
with connection.cursor() as cursor:
# same as UnpartitionedJobEvent.objects.aggregate(Max('created'))
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"')
row = cursor.fetchone()
last_created = row[0]
if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
else:
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')
if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}')
cursor.execute(f'DROP TABLE _unpartitioned_{tblname}')
def cleanup_jobs(self):
batch_size = 100000
@ -227,7 +248,7 @@ class Command(BaseCommand):
_, results = qs_batch.delete()
deleted += results['main.Job']
self._cascade_delete_job_events(Job, pk_list)
self._handle_unpartitioned_events(Job, pk_list)
return skipped, deleted
@ -250,7 +271,7 @@ class Command(BaseCommand):
deleted += 1
if not self.dry_run:
self._cascade_delete_job_events(AdHocCommand, pk_list)
self._handle_unpartitioned_events(AdHocCommand, pk_list)
skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
@ -278,7 +299,7 @@ class Command(BaseCommand):
deleted += 1
if not self.dry_run:
self._cascade_delete_job_events(ProjectUpdate, pk_list)
self._handle_unpartitioned_events(ProjectUpdate, pk_list)
skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
@ -306,7 +327,7 @@ class Command(BaseCommand):
deleted += 1
if not self.dry_run:
self._cascade_delete_job_events(InventoryUpdate, pk_list)
self._handle_unpartitioned_events(InventoryUpdate, pk_list)
skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
@ -330,7 +351,7 @@ class Command(BaseCommand):
deleted += 1
if not self.dry_run:
self._cascade_delete_job_events(SystemJob, pk_list)
self._handle_unpartitioned_events(SystemJob, pk_list)
skipped += SystemJob.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted