diff --git a/awx/main/management/commands/cleanup_jobs.py b/awx/main/management/commands/cleanup_jobs.py index bd87e1e198..321d552503 100644 --- a/awx/main/management/commands/cleanup_jobs.py +++ b/awx/main/management/commands/cleanup_jobs.py @@ -195,14 +195,35 @@ class Command(BaseCommand): delete_meta.delete_jobs() return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count) - def _cascade_delete_job_events(self, model, pk_list): + def _handle_unpartitioned_events(self, model, pk_list): + """ + If unpartitioned job events remain, it will cascade those from jobs in pk_list + if the unpartitioned table is no longer necessary, it will drop the table + """ + tblname = unified_job_class_to_event_table_name(model) + rel_name = model().event_parent_key + with connection.cursor() as cursor: + cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';") + row = cursor.fetchone() + if row is None: + self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated') + return if pk_list: with connection.cursor() as cursor: - tblname = unified_job_class_to_event_table_name(model) - pk_list_csv = ','.join(map(str, pk_list)) - rel_name = model().event_parent_key cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})") + with connection.cursor() as cursor: + # same as UnpartitionedJobEvent.objects.aggregate(Max('created')) + cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"') + row = cursor.fetchone() + last_created = row[0] + if last_created: + self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}') + else: + self.logger.info(f'Table _unpartitioned_{tblname} has no events in it') + if (last_created is None) or (last_created < self.cutoff): + self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}') + cursor.execute(f'DROP TABLE _unpartitioned_{tblname}') def cleanup_jobs(self): batch_size = 100000 @@ -227,7 +248,7 @@ class Command(BaseCommand): _, results = qs_batch.delete() deleted += results['main.Job'] - self._cascade_delete_job_events(Job, pk_list) + self._handle_unpartitioned_events(Job, pk_list) return skipped, deleted @@ -250,7 +271,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._cascade_delete_job_events(AdHocCommand, pk_list) + self._handle_unpartitioned_events(AdHocCommand, pk_list) skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -278,7 +299,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._cascade_delete_job_events(ProjectUpdate, pk_list) + self._handle_unpartitioned_events(ProjectUpdate, pk_list) skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -306,7 +327,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._cascade_delete_job_events(InventoryUpdate, pk_list) + self._handle_unpartitioned_events(InventoryUpdate, pk_list) skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count() return skipped, deleted @@ -330,7 +351,7 @@ class Command(BaseCommand): deleted += 1 if not self.dry_run: - self._cascade_delete_job_events(SystemJob, pk_list) + self._handle_unpartitioned_events(SystemJob, pk_list) skipped += SystemJob.objects.filter(created__gte=self.cutoff).count() return skipped, deleted