From 07f49f59251fa1add716974e095aa00dfd7e6069 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 13 Oct 2023 15:50:51 -0400 Subject: [PATCH] AAP-16926 Delete unpartitioned tables in a separate transaction (#14572) --- awx/main/management/commands/cleanup_jobs.py | 110 ++++++++++++------- 1 file changed, 71 insertions(+), 39 deletions(-) diff --git a/awx/main/management/commands/cleanup_jobs.py b/awx/main/management/commands/cleanup_jobs.py index 455e72ad90..ddd871330c 100644 --- a/awx/main/management/commands/cleanup_jobs.py +++ b/awx/main/management/commands/cleanup_jobs.py @@ -9,6 +9,7 @@ import re # Django +from django.apps import apps from django.core.management.base import BaseCommand, CommandError from django.db import transaction, connection from django.db.models import Min, Max @@ -198,35 +199,56 @@ class Command(BaseCommand): delete_meta.delete_jobs() return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count) - def _handle_unpartitioned_events(self, model, pk_list): - """ - If unpartitioned job events remain, it will cascade those from jobs in pk_list - if the unpartitioned table is no longer necessary, it will drop the table - """ + def has_unpartitioned_table(self, model): tblname = unified_job_class_to_event_table_name(model) - rel_name = model().event_parent_key with connection.cursor() as cursor: cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';") row = cursor.fetchone() if row is None: - self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated') - return - if pk_list: - with connection.cursor() as cursor: - pk_list_csv = ','.join(map(str, pk_list)) - cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})") + return False + return True + + def _delete_unpartitioned_table(self, model): + "If the unpartitioned table is no longer necessary, it will drop the table" + tblname = unified_job_class_to_event_table_name(model) + if not self.has_unpartitioned_table(model): + self.logger.debug(f'Table _unpartitioned_{tblname} does not exist, you are fully migrated.') + return + with connection.cursor() as cursor: # same as UnpartitionedJobEvent.objects.aggregate(Max('created')) - cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"') + cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}";') row = cursor.fetchone() last_created = row[0] - if last_created: - self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}') - else: - self.logger.info(f'Table _unpartitioned_{tblname} has no events in it') - if (last_created is None) or (last_created < self.cutoff): - self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}') - cursor.execute(f'DROP TABLE _unpartitioned_{tblname}') + + if last_created: + self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}') + else: + self.logger.info(f'Table _unpartitioned_{tblname} has no events in it') + + if (last_created is None) or (last_created < self.cutoff): + self.logger.warning( + f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}\n' + 'WARNING - this will happen in a separate transaction so a failure will not roll back prior cleanup' + ) + with connection.cursor() as cursor: + cursor.execute(f'DROP TABLE _unpartitioned_{tblname};') + + def _delete_unpartitioned_events(self, model, pk_list): + "If unpartitioned job events remain, it will cascade those from jobs in pk_list" + tblname = unified_job_class_to_event_table_name(model) + rel_name = model().event_parent_key + + # Bail if the unpartitioned table does not exist anymore + if not self.has_unpartitioned_table(model): + return + + # Table still exists, delete individual unpartitioned events + if pk_list: + with connection.cursor() as cursor: + self.logger.debug(f'Deleting {len(pk_list)} events from _unpartitioned_{tblname}, use a longer cleanup window to delete the table.') + pk_list_csv = ','.join(map(str, pk_list)) + cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv});") def cleanup_jobs(self): # Hack to avoid doing N+1 queries as each item in the Job query set does @@ -249,7 +271,8 @@ class Command(BaseCommand): _, results = qs_batch.delete() deleted += results['main.Job'] - self._handle_unpartitioned_events(Job, pk_list) + # Avoid dropping the job event table in case we have interacted with it already + self._delete_unpartitioned_events(Job, pk_list) return skipped, deleted @@ -272,7 +295,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._handle_unpartitioned_events(AdHocCommand, pk_list) + self._delete_unpartitioned_events(AdHocCommand, pk_list) skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -300,7 +323,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._handle_unpartitioned_events(ProjectUpdate, pk_list) + self._delete_unpartitioned_events(ProjectUpdate, pk_list) skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -328,7 +351,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._handle_unpartitioned_events(InventoryUpdate, pk_list) + self._delete_unpartitioned_events(InventoryUpdate, pk_list) skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -352,7 +375,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._handle_unpartitioned_events(SystemJob, pk_list) + self._delete_unpartitioned_events(SystemJob, pk_list) skipped += SystemJob.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -397,7 +420,6 @@ class Command(BaseCommand): skipped += Notification.objects.filter(created__gte=self.cutoff).count() return skipped, deleted - @transaction.atomic def handle(self, *args, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() @@ -425,19 +447,29 @@ class Command(BaseCommand): del s.receivers[:] s.sender_receivers_cache.clear() - for m in model_names: - if m not in models_to_cleanup: - continue + with transaction.atomic(): + for m in models_to_cleanup: + skipped, deleted = getattr(self, 'cleanup_%s' % m)() - skipped, deleted = getattr(self, 'cleanup_%s' % m)() + func = getattr(self, 'cleanup_%s_partition' % m, None) + if func: + skipped_partition, deleted_partition = func() + skipped += skipped_partition + deleted += deleted_partition - func = getattr(self, 'cleanup_%s_partition' % m, None) - if func: - skipped_partition, deleted_partition = func() - skipped += skipped_partition - deleted += deleted_partition + if self.dry_run: + self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped) + else: + self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped) - if self.dry_run: - self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped) - else: - self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped) + # Deleting unpartitioned tables cannot be done in same transaction as updates to related tables + if not self.dry_run: + with transaction.atomic(): + for m in models_to_cleanup: + unified_job_class_name = m[:-1].title().replace('Management', 'System').replace('_', '') + unified_job_class = apps.get_model('main', unified_job_class_name) + try: + unified_job_class().event_class + except (NotImplementedError, AttributeError): + continue # no need to run this for models without events + self._delete_unpartitioned_table(unified_job_class)