From 200901e53b9efd6df7eb4b9483a9204f5d5c0ae1 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 18 Mar 2021 23:40:14 -0400 Subject: [PATCH] upgrade to partitions without a costly bulk data migration keep pre-upgrade events in an old table (instead of a partition) - instead of creating a default partition, keep all events in special "unpartitioned" tables - track these tables via distinct proxy=true models - when generating the queryset for a UnifiedJob's events, look at the creation date of the job; if it's before the date of the migration, query on the old unpartitioned table, otherwise use the more modern table that provides auto-partitioning --- awx/api/views/__init__.py | 86 ++++++++++++-------- awx/api/views/inventory.py | 6 +- awx/main/access.py | 7 ++ awx/main/managers.py | 7 +- awx/main/migrations/0132_event_partitions.py | 15 ---- awx/main/models/__init__.py | 16 ++-- awx/main/models/ad_hoc_commands.py | 4 +- awx/main/models/events.py | 41 ++++++++++ awx/main/models/inventory.py | 4 +- awx/main/models/jobs.py | 6 +- awx/main/models/projects.py | 4 +- awx/main/models/unified_jobs.py | 39 ++++----- awx/main/tasks.py | 47 +---------- 13 files changed, 146 insertions(+), 136 deletions(-) diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 12690379c1..9ee3e8dc31 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -21,7 +21,7 @@ from urllib3.exceptions import ConnectTimeoutError from django.conf import settings from django.core.exceptions import FieldError, ObjectDoesNotExist from django.db.models import Q, Sum -from django.db import IntegrityError, transaction, connection +from django.db import IntegrityError, ProgrammingError, transaction, connection from django.shortcuts import get_object_or_404 from django.utils.safestring import mark_safe from django.utils.timezone import now @@ -177,6 +177,15 @@ from awx.api.views.webhooks import WebhookKeyView, GithubWebhookReceiver, Gitlab logger = logging.getLogger('awx.api.views') +def unpartitioned_event_horizon(cls): + with connection.cursor() as cursor: + try: + cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{cls._meta.db_table}') + return cursor.fetchone()[0] + except ProgrammingError: + return 0 + + def api_exception_handler(exc, context): """ Override default API exception handler to catch IntegrityError exceptions. @@ -884,9 +893,9 @@ class ProjectUpdateEventsList(SubListAPIView): return super(ProjectUpdateEventsList, self).finalize_response(request, response, *args, **kwargs) def get_queryset(self): - return super(ProjectUpdateEventsList, self).get_queryset().filter( - job_created=self.get_parent_object().created_or_epoch - ) + pu = self.get_parent_object() + self.check_parent_access(pu) + return pu.get_event_queryset() class SystemJobEventsList(SubListAPIView): @@ -903,9 +912,9 @@ class SystemJobEventsList(SubListAPIView): return super(SystemJobEventsList, self).finalize_response(request, response, *args, **kwargs) def get_queryset(self): - return super(SystemJobEventsList, self).get_queryset().filter( - job_created=self.get_parent_object().created_or_epoch - ) + job = self.get_parent_object() + self.check_parent_access(job) + return job.get_event_queryset() class ProjectUpdateCancel(RetrieveAPIView): @@ -3741,9 +3750,18 @@ class JobHostSummaryDetail(RetrieveAPIView): class JobEventDetail(RetrieveAPIView): - model = models.JobEvent serializer_class = serializers.JobEventSerializer + @property + def is_partitioned(self): + return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent) + + @property + def model(self): + if self.is_partitioned: + return models.JobEvent + return models.UnpartitionedJobEvent + def get_serializer_context(self): context = super().get_serializer_context() context.update(no_truncate=True) @@ -3752,37 +3770,31 @@ class JobEventDetail(RetrieveAPIView): class JobEventChildrenList(NoTruncateMixin, SubListAPIView): - model = models.JobEvent serializer_class = serializers.JobEventSerializer - parent_model = models.JobEvent relationship = 'children' name = _('Job Event Children List') search_fields = ('stdout',) + @property + def is_partitioned(self): + return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent) + + @property + def model(self): + if self.is_partitioned: + return models.JobEvent + return models.UnpartitionedJobEvent + + @property + def parent_model(self): + return self.model + def get_queryset(self): parent_event = self.get_parent_object() self.check_parent_access(parent_event) - qs = self.request.user.get_queryset(self.model).filter( + return parent_event.job.get_event_queryset().filter( parent_uuid=parent_event.uuid - ).filter( - job_created=parent_event.job.created_or_epoch ) - return qs - - -class JobEventHostsList(HostRelatedSearchMixin, SubListAPIView): - - model = models.Host - serializer_class = serializers.HostSerializer - parent_model = models.JobEvent - relationship = 'hosts' - name = _('Job Event Hosts List') - - def get_queryset(self): - parent_event = self.get_parent_object() - self.check_parent_access(parent_event) - qs = self.request.user.get_queryset(self.model).filter(job_events_as_primary_host=parent_event) - return qs class BaseJobEventsList(NoTruncateMixin, SubListAPIView): @@ -3822,10 +3834,7 @@ class JobJobEventsList(BaseJobEventsList): def get_queryset(self): job = self.get_parent_object() self.check_parent_access(job) - qs = job.job_events.filter( - job_created=self.get_parent_object().created_or_epoch - ).select_related('host').order_by('start_line') - return qs.all() + return job.get_event_queryset().select_related('host').order_by('start_line') class AdHocCommandList(ListCreateAPIView): @@ -3983,6 +3992,11 @@ class AdHocCommandEventList(NoTruncateMixin, ListAPIView): serializer_class = serializers.AdHocCommandEventSerializer search_fields = ('stdout',) + def get_queryset(self): + adhoc = self.get_parent_object() + self.check_parent_access(adhoc) + return adhoc.get_event_queryset() + class AdHocCommandEventDetail(RetrieveAPIView): @@ -4005,9 +4019,9 @@ class BaseAdHocCommandEventsList(NoTruncateMixin, SubListAPIView): search_fields = ('stdout',) def get_queryset(self): - return super(BaseAdHocCommandEventsList, self).get_queryset().filter( - job_created=self.get_parent_object().created_or_epoch - ) + adhoc = self.get_parent_object() + self.check_parent_access(adhoc) + return adhoc.get_event_queryset() class HostAdHocCommandEventsList(BaseAdHocCommandEventsList): diff --git a/awx/api/views/inventory.py b/awx/api/views/inventory.py index d255a8110b..f179424ccc 100644 --- a/awx/api/views/inventory.py +++ b/awx/api/views/inventory.py @@ -51,9 +51,9 @@ class InventoryUpdateEventsList(SubListAPIView): search_fields = ('stdout',) def get_queryset(self): - return super(InventoryUpdateEventsList, self).get_queryset().filter( - job_created=self.get_parent_object().created_or_epoch - ) + iu = self.get_parent_object() + self.check_parent_access(iu) + return iu.get_event_queryset() def finalize_response(self, request, response, *args, **kwargs): response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS diff --git a/awx/main/access.py b/awx/main/access.py index 5fd06b105f..f9a6983b5b 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -45,6 +45,7 @@ from awx.main.models import ( InventoryUpdateEvent, Job, JobEvent, + UnpartitionedJobEvent, JobHostSummary, JobLaunchConfig, JobTemplate, @@ -2352,6 +2353,11 @@ class JobEventAccess(BaseAccess): return False +class UnpartitionedJobEventAccess(JobEventAccess): + + model = UnpartitionedJobEvent + + class ProjectUpdateEventAccess(BaseAccess): """ I can see project update event records whenever I can access the project update @@ -2895,3 +2901,4 @@ class WorkflowApprovalTemplateAccess(BaseAccess): for cls in BaseAccess.__subclasses__(): access_registry[cls.model] = cls +access_registry[UnpartitionedJobEvent] = UnpartitionedJobEventAccess diff --git a/awx/main/managers.py b/awx/main/managers.py index ada38ddd18..3355b4e8e4 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -11,11 +11,16 @@ from django.conf import settings from awx.main.utils.filters import SmartFilter from awx.main.utils.pglock import advisory_lock -___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager'] +___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager'] logger = logging.getLogger('awx.main.managers') +class DeferJobCreatedManager(models.Manager): + def get_queryset(self): + return super(DeferJobCreatedManager, self).get_queryset().defer('job_created') + + class HostManager(models.Manager): """Custom manager class for Hosts model.""" diff --git a/awx/main/migrations/0132_event_partitions.py b/awx/main/migrations/0132_event_partitions.py index 9867748c4a..7b8a5413e7 100644 --- a/awx/main/migrations/0132_event_partitions.py +++ b/awx/main/migrations/0132_event_partitions.py @@ -1,9 +1,4 @@ -from datetime import datetime - from django.db import migrations, models, connection -from django.utils.timezone import now - -from awx.main.utils.common import create_partition def migrate_event_data(apps, schema_editor): @@ -73,16 +68,6 @@ def migrate_event_data(apps, schema_editor): f'ADD CONSTRAINT {tblname}_pkey_new PRIMARY KEY (id, job_created);' ) - current_time = now() - - # create initial partition containing all existing events - epoch = datetime.utcfromtimestamp(0) - create_partition(tblname, epoch, current_time, 'old_events') - - # .. and first partition - # .. which is a special case, as it only covers remainder of current hour - create_partition(tblname, current_time) - class FakeAddField(migrations.AddField): diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index f67e192f0a..0fab2cd4f6 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -3,7 +3,6 @@ # Django from django.conf import settings # noqa -from django.db import connection from django.db.models.signals import pre_delete # noqa # AWX @@ -36,6 +35,11 @@ from awx.main.models.events import ( # noqa JobEvent, ProjectUpdateEvent, SystemJobEvent, + UnpartitionedAdHocCommandEvent, + UnpartitionedInventoryUpdateEvent, + UnpartitionedJobEvent, + UnpartitionedProjectUpdateEvent, + UnpartitionedSystemJobEvent, ) from awx.main.models.ad_hoc_commands import AdHocCommand # noqa from awx.main.models.schedules import Schedule # noqa @@ -92,16 +96,6 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors) User.add_to_class('accessible_objects', user_accessible_objects) -def migrate_events_to_partitions(): - for tblname in ('main_jobevent', 'main_inventoryupdateevent', 'main_projectupdateevent', 'main_adhoccommandevent', 'main_systemjobevent'): - with connection.cursor() as cursor: - cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_unpartitioned_{tblname}',)) - if bool(cursor.rowcount): - from awx.main.tasks import migrate_legacy_event_data - - migrate_legacy_event_data.apply_async([tblname]) - - def cleanup_created_modified_by(sender, **kwargs): # work around a bug in django-polymorphic that doesn't properly # handle cascades for reverse foreign keys on the polymorphic base model diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 94318a17da..f15af65f61 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -15,7 +15,7 @@ from django.core.exceptions import ValidationError # AWX from awx.api.versioning import reverse from awx.main.models.base import prevent_search, AD_HOC_JOB_TYPE_CHOICES, VERBOSITY_CHOICES, VarsDictProperty -from awx.main.models.events import AdHocCommandEvent +from awx.main.models.events import AdHocCommandEvent, UnpartitionedAdHocCommandEvent from awx.main.models.unified_jobs import UnifiedJob from awx.main.models.notifications import JobNotificationMixin, NotificationTemplate @@ -127,6 +127,8 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): @property def event_class(self): + if self.has_unpartitioned_events: + return UnpartitionedAdHocCommandEvent return AdHocCommandEvent @property diff --git a/awx/main/models/events.py b/awx/main/models/events.py index a453aa7cb0..ef83a1b6c7 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -15,6 +15,7 @@ from django.utils.encoding import force_text from awx.api.versioning import reverse from awx.main import consumers +from awx.main.managers import DeferJobCreatedManager from awx.main.fields import JSONField from awx.main.models.base import CreatedModifiedModel from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore @@ -461,6 +462,8 @@ class JobEvent(BasePlaybookEvent): VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id', 'workflow_job_id', 'job_created'] + objects = DeferJobCreatedManager() + class Meta: app_label = 'main' ordering = ('pk',) @@ -581,10 +584,18 @@ class JobEvent(BasePlaybookEvent): return self.job.verbosity +class UnpartitionedJobEvent(JobEvent): + class Meta: + proxy = True +UnpartitionedJobEvent._meta.db_table = '_unpartitioned_' + JobEvent._meta.db_table # noqa + + class ProjectUpdateEvent(BasePlaybookEvent): VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id', 'workflow_job_id', 'job_created'] + objects = DeferJobCreatedManager() + class Meta: app_label = 'main' ordering = ('pk',) @@ -612,6 +623,12 @@ class ProjectUpdateEvent(BasePlaybookEvent): return 'localhost' +class UnpartitionedProjectUpdateEvent(ProjectUpdateEvent): + class Meta: + proxy = True +UnpartitionedProjectUpdateEvent._meta.db_table = '_unpartitioned_' + ProjectUpdateEvent._meta.db_table # noqa + + class BaseCommandEvent(CreatedModifiedModel): """ An event/message logged from a command for each host. @@ -707,6 +724,8 @@ class AdHocCommandEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id', 'job_created'] + objects = DeferJobCreatedManager() + class Meta: app_label = 'main' ordering = ('-pk',) @@ -796,10 +815,18 @@ class AdHocCommandEvent(BaseCommandEvent): analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=self))) +class UnpartitionedAdHocCommandEvent(AdHocCommandEvent): + class Meta: + proxy = True +UnpartitionedAdHocCommandEvent._meta.db_table = '_unpartitioned_' + AdHocCommandEvent._meta.db_table # noqa + + class InventoryUpdateEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id', 'workflow_job_id', 'job_created'] + objects = DeferJobCreatedManager() + class Meta: app_label = 'main' ordering = ('-pk',) @@ -834,10 +861,18 @@ class InventoryUpdateEvent(BaseCommandEvent): return False +class UnpartitionedInventoryUpdateEvent(InventoryUpdateEvent): + class Meta: + proxy = True +UnpartitionedInventoryUpdateEvent._meta.db_table = '_unpartitioned_' + InventoryUpdateEvent._meta.db_table # noqa + + class SystemJobEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id', 'job_created'] + objects = DeferJobCreatedManager() + class Meta: app_label = 'main' ordering = ('-pk',) @@ -870,3 +905,9 @@ class SystemJobEvent(BaseCommandEvent): @property def changed(self): return False + + +class UnpartitionedSystemJobEvent(SystemJobEvent): + class Meta: + proxy = True +UnpartitionedSystemJobEvent._meta.db_table = '_unpartitioned_' + SystemJobEvent._meta.db_table # noqa diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index a48c2f0a62..2325e1d34c 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -35,7 +35,7 @@ from awx.main.fields import ( ) from awx.main.managers import HostManager from awx.main.models.base import BaseModel, CommonModelNameNotUnique, VarsDictProperty, CLOUD_INVENTORY_SOURCES, prevent_search, accepts_json -from awx.main.models.events import InventoryUpdateEvent +from awx.main.models.events import InventoryUpdateEvent, UnpartitionedInventoryUpdateEvent from awx.main.models.unified_jobs import UnifiedJob, UnifiedJobTemplate from awx.main.models.mixins import ( ResourceMixin, @@ -1265,6 +1265,8 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, @property def event_class(self): + if self.has_unpartitioned_events: + return UnpartitionedInventoryUpdateEvent return InventoryUpdateEvent @property diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 735623142c..38d7ebd805 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -37,7 +37,7 @@ from awx.main.models.base import ( VERBOSITY_CHOICES, VarsDictProperty, ) -from awx.main.models.events import JobEvent, SystemJobEvent +from awx.main.models.events import JobEvent, UnpartitionedJobEvent, UnpartitionedSystemJobEvent, SystemJobEvent from awx.main.models.unified_jobs import UnifiedJobTemplate, UnifiedJob from awx.main.models.notifications import ( NotificationTemplate, @@ -614,6 +614,8 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana @property def event_class(self): + if self.has_unpartitioned_events: + return UnpartitionedJobEvent return JobEvent def copy_unified_job(self, **new_prompts): @@ -1259,6 +1261,8 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): @property def event_class(self): + if self.has_unpartitioned_events: + return UnpartitionedSystemJobEvent return SystemJobEvent @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 17ed982b1d..7192265412 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -19,7 +19,7 @@ from django.utils.timezone import now, make_aware, get_default_timezone # AWX from awx.api.versioning import reverse from awx.main.models.base import PROJECT_UPDATE_JOB_TYPE_CHOICES, PERM_INVENTORY_DEPLOY -from awx.main.models.events import ProjectUpdateEvent +from awx.main.models.events import ProjectUpdateEvent, UnpartitionedProjectUpdateEvent from awx.main.models.notifications import ( NotificationTemplate, JobNotificationMixin, @@ -555,6 +555,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage @property def event_class(self): + if self.has_unpartitioned_events: + return UnpartitionedProjectUpdateEvent return ProjectUpdateEvent @property diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 2df9e44fc3..ec303710ea 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -736,18 +736,6 @@ class UnifiedJob( def _get_task_class(cls): raise NotImplementedError # Implement in subclasses. - @property - def created_or_epoch(self): - # returns self.created *unless* the job was created *prior* - # to the datetime the event partition migration is applied - # (in that case, it returns the epoch, which is the date - # which is automatically applied to all events rows that predate - # that migration) - applied = get_event_partition_epoch() - if applied and self.created < applied: - return datetime.datetime.utcfromtimestamp(0) - return self.created - @property def can_run_containerized(self): return False @@ -1003,11 +991,18 @@ class UnifiedJob( 'main_systemjob': 'system_job_id', }[tablename] + @property + def has_unpartitioned_events(self): + applied = get_event_partition_epoch() + return applied and self.created < applied + def get_event_queryset(self): - return self.event_class.objects.filter(**{ + kwargs = { self.event_parent_key: self.id, - 'job_created': self.created - }) + } + if not self.has_unpartitioned_events: + kwargs['job_created'] = self.created + return self.event_class.objects.filter(**kwargs) @property def event_processing_finished(self): @@ -1093,13 +1088,15 @@ class UnifiedJob( # .write() calls on the fly to maintain this interface _write = fd.write fd.write = lambda s: _write(smart_text(s)) + tbl = self._meta.db_table + 'event' + created_by_cond = '' + if self.has_unpartitioned_events: + tbl = f'_unpartitioned_{tbl}' + else: + created_by_cond = f"job_created='{self.created.isoformat()}' AND " - cursor.copy_expert( - "copy (select stdout from {} where {}={} and stdout != '' order by start_line) to stdout".format( - self._meta.db_table + 'event', self.event_parent_key, self.id - ), - fd, - ) + sql = f"copy (select stdout from {tbl} where {created_by_cond}{self.event_parent_key}={self.id} and stdout != '' order by start_line) to stdout" # nosql + cursor.copy_expert(sql, fd) if hasattr(fd, 'name'): # If we're dealing with a physical file, use `sed` to clean diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 84411b7c96..98635f19bb 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -32,7 +32,7 @@ import sys # Django from django.conf import settings -from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection +from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey from django.utils.timezone import now from django.utils.encoding import smart_str @@ -80,8 +80,7 @@ from awx.main.models import ( InventoryUpdateEvent, AdHocCommandEvent, SystemJobEvent, - build_safe_env, - migrate_events_to_partitions + build_safe_env ) from awx.main.constants import ACTIVE_STATES from awx.main.exceptions import AwxTaskError, PostRunError @@ -174,12 +173,6 @@ def dispatch_startup(): cluster_node_heartbeat() Metrics().clear_values() - # at process startup, detect the need to migrate old event records to - # partitions; at *some point* in the future, once certain versions of AWX - # and Tower fall out of use/support, we can probably just _assume_ that - # everybody has moved to partitions, and remove this code entirely - migrate_events_to_partitions() - # Update Tower's rsyslog.conf file based on loggins settings in the db reconfigure_rsyslog() @@ -689,42 +682,6 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields() -@task(queue=get_local_queuename) -def migrate_legacy_event_data(tblname): - if 'event' not in tblname: - return - with advisory_lock(f'partition_migration_{tblname}', wait=False) as acquired: - if acquired is False: - return - chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE - - def _remaining(): - try: - cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{tblname};') - return cursor.fetchone()[0] - except ProgrammingError: - # the table is gone (migration is unnecessary) - return None - - with connection.cursor() as cursor: - total_rows = _remaining() - while total_rows: - with transaction.atomic(): - cursor.execute(f'''INSERT INTO {tblname} SELECT *, '1970-01-01' as job_created FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;''') - last_insert_pk = cursor.fetchone() - if last_insert_pk is None: - # this means that the SELECT from the old table was - # empty, and there was nothing to insert (so we're done) - break - last_insert_pk = last_insert_pk[0] - cursor.execute(f'DELETE FROM _unpartitioned_{tblname} WHERE id IN (SELECT id FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk});') - logger.warn(f'migrated rows to partitioned {tblname} from _unpartitioned_{tblname}; # ({last_insert_pk} rows remaining)') - - if _remaining() is None: - cursor.execute(f'DROP TABLE IF EXISTS _unpartitioned_{tblname}') - logger.warn(f'{tblname} migration to partitions has finished') - - @task(queue=get_local_queuename) def delete_inventory(inventory_id, user_id, retries=5): # Delete inventory as user