upgrade to partitions without a costly bulk data migration

keep pre-upgrade events in an old table (instead of a partition)

- instead of creating a default partition, keep all events in special
"unpartitioned" tables
- track these tables via distinct proxy=true models
- when generating the queryset for a UnifiedJob's events, look at the
  creation date of the job; if it's before the date of the migration,
  query on the old unpartitioned table, otherwise use the more modern table
  that provides auto-partitioning
This commit is contained in:
Ryan Petrello 2021-03-18 23:40:14 -04:00 committed by Jim Ladd
parent 0eddd5ce7f
commit 200901e53b
13 changed files with 146 additions and 136 deletions

View File

@ -21,7 +21,7 @@ from urllib3.exceptions import ConnectTimeoutError
from django.conf import settings
from django.core.exceptions import FieldError, ObjectDoesNotExist
from django.db.models import Q, Sum
from django.db import IntegrityError, transaction, connection
from django.db import IntegrityError, ProgrammingError, transaction, connection
from django.shortcuts import get_object_or_404
from django.utils.safestring import mark_safe
from django.utils.timezone import now
@ -177,6 +177,15 @@ from awx.api.views.webhooks import WebhookKeyView, GithubWebhookReceiver, Gitlab
logger = logging.getLogger('awx.api.views')
def unpartitioned_event_horizon(cls):
with connection.cursor() as cursor:
try:
cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{cls._meta.db_table}')
return cursor.fetchone()[0]
except ProgrammingError:
return 0
def api_exception_handler(exc, context):
"""
Override default API exception handler to catch IntegrityError exceptions.
@ -884,9 +893,9 @@ class ProjectUpdateEventsList(SubListAPIView):
return super(ProjectUpdateEventsList, self).finalize_response(request, response, *args, **kwargs)
def get_queryset(self):
return super(ProjectUpdateEventsList, self).get_queryset().filter(
job_created=self.get_parent_object().created_or_epoch
)
pu = self.get_parent_object()
self.check_parent_access(pu)
return pu.get_event_queryset()
class SystemJobEventsList(SubListAPIView):
@ -903,9 +912,9 @@ class SystemJobEventsList(SubListAPIView):
return super(SystemJobEventsList, self).finalize_response(request, response, *args, **kwargs)
def get_queryset(self):
return super(SystemJobEventsList, self).get_queryset().filter(
job_created=self.get_parent_object().created_or_epoch
)
job = self.get_parent_object()
self.check_parent_access(job)
return job.get_event_queryset()
class ProjectUpdateCancel(RetrieveAPIView):
@ -3741,9 +3750,18 @@ class JobHostSummaryDetail(RetrieveAPIView):
class JobEventDetail(RetrieveAPIView):
model = models.JobEvent
serializer_class = serializers.JobEventSerializer
@property
def is_partitioned(self):
return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent)
@property
def model(self):
if self.is_partitioned:
return models.JobEvent
return models.UnpartitionedJobEvent
def get_serializer_context(self):
context = super().get_serializer_context()
context.update(no_truncate=True)
@ -3752,37 +3770,31 @@ class JobEventDetail(RetrieveAPIView):
class JobEventChildrenList(NoTruncateMixin, SubListAPIView):
model = models.JobEvent
serializer_class = serializers.JobEventSerializer
parent_model = models.JobEvent
relationship = 'children'
name = _('Job Event Children List')
search_fields = ('stdout',)
@property
def is_partitioned(self):
return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent)
@property
def model(self):
if self.is_partitioned:
return models.JobEvent
return models.UnpartitionedJobEvent
@property
def parent_model(self):
return self.model
def get_queryset(self):
parent_event = self.get_parent_object()
self.check_parent_access(parent_event)
qs = self.request.user.get_queryset(self.model).filter(
return parent_event.job.get_event_queryset().filter(
parent_uuid=parent_event.uuid
).filter(
job_created=parent_event.job.created_or_epoch
)
return qs
class JobEventHostsList(HostRelatedSearchMixin, SubListAPIView):
model = models.Host
serializer_class = serializers.HostSerializer
parent_model = models.JobEvent
relationship = 'hosts'
name = _('Job Event Hosts List')
def get_queryset(self):
parent_event = self.get_parent_object()
self.check_parent_access(parent_event)
qs = self.request.user.get_queryset(self.model).filter(job_events_as_primary_host=parent_event)
return qs
class BaseJobEventsList(NoTruncateMixin, SubListAPIView):
@ -3822,10 +3834,7 @@ class JobJobEventsList(BaseJobEventsList):
def get_queryset(self):
job = self.get_parent_object()
self.check_parent_access(job)
qs = job.job_events.filter(
job_created=self.get_parent_object().created_or_epoch
).select_related('host').order_by('start_line')
return qs.all()
return job.get_event_queryset().select_related('host').order_by('start_line')
class AdHocCommandList(ListCreateAPIView):
@ -3983,6 +3992,11 @@ class AdHocCommandEventList(NoTruncateMixin, ListAPIView):
serializer_class = serializers.AdHocCommandEventSerializer
search_fields = ('stdout',)
def get_queryset(self):
adhoc = self.get_parent_object()
self.check_parent_access(adhoc)
return adhoc.get_event_queryset()
class AdHocCommandEventDetail(RetrieveAPIView):
@ -4005,9 +4019,9 @@ class BaseAdHocCommandEventsList(NoTruncateMixin, SubListAPIView):
search_fields = ('stdout',)
def get_queryset(self):
return super(BaseAdHocCommandEventsList, self).get_queryset().filter(
job_created=self.get_parent_object().created_or_epoch
)
adhoc = self.get_parent_object()
self.check_parent_access(adhoc)
return adhoc.get_event_queryset()
class HostAdHocCommandEventsList(BaseAdHocCommandEventsList):

View File

@ -51,9 +51,9 @@ class InventoryUpdateEventsList(SubListAPIView):
search_fields = ('stdout',)
def get_queryset(self):
return super(InventoryUpdateEventsList, self).get_queryset().filter(
job_created=self.get_parent_object().created_or_epoch
)
iu = self.get_parent_object()
self.check_parent_access(iu)
return iu.get_event_queryset()
def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS

View File

@ -45,6 +45,7 @@ from awx.main.models import (
InventoryUpdateEvent,
Job,
JobEvent,
UnpartitionedJobEvent,
JobHostSummary,
JobLaunchConfig,
JobTemplate,
@ -2352,6 +2353,11 @@ class JobEventAccess(BaseAccess):
return False
class UnpartitionedJobEventAccess(JobEventAccess):
model = UnpartitionedJobEvent
class ProjectUpdateEventAccess(BaseAccess):
"""
I can see project update event records whenever I can access the project update
@ -2895,3 +2901,4 @@ class WorkflowApprovalTemplateAccess(BaseAccess):
for cls in BaseAccess.__subclasses__():
access_registry[cls.model] = cls
access_registry[UnpartitionedJobEvent] = UnpartitionedJobEventAccess

View File

@ -11,11 +11,16 @@ from django.conf import settings
from awx.main.utils.filters import SmartFilter
from awx.main.utils.pglock import advisory_lock
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager']
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager']
logger = logging.getLogger('awx.main.managers')
class DeferJobCreatedManager(models.Manager):
def get_queryset(self):
return super(DeferJobCreatedManager, self).get_queryset().defer('job_created')
class HostManager(models.Manager):
"""Custom manager class for Hosts model."""

View File

@ -1,9 +1,4 @@
from datetime import datetime
from django.db import migrations, models, connection
from django.utils.timezone import now
from awx.main.utils.common import create_partition
def migrate_event_data(apps, schema_editor):
@ -73,16 +68,6 @@ def migrate_event_data(apps, schema_editor):
f'ADD CONSTRAINT {tblname}_pkey_new PRIMARY KEY (id, job_created);'
)
current_time = now()
# create initial partition containing all existing events
epoch = datetime.utcfromtimestamp(0)
create_partition(tblname, epoch, current_time, 'old_events')
# .. and first partition
# .. which is a special case, as it only covers remainder of current hour
create_partition(tblname, current_time)
class FakeAddField(migrations.AddField):

View File

@ -3,7 +3,6 @@
# Django
from django.conf import settings # noqa
from django.db import connection
from django.db.models.signals import pre_delete # noqa
# AWX
@ -36,6 +35,11 @@ from awx.main.models.events import ( # noqa
JobEvent,
ProjectUpdateEvent,
SystemJobEvent,
UnpartitionedAdHocCommandEvent,
UnpartitionedInventoryUpdateEvent,
UnpartitionedJobEvent,
UnpartitionedProjectUpdateEvent,
UnpartitionedSystemJobEvent,
)
from awx.main.models.ad_hoc_commands import AdHocCommand # noqa
from awx.main.models.schedules import Schedule # noqa
@ -92,16 +96,6 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
User.add_to_class('accessible_objects', user_accessible_objects)
def migrate_events_to_partitions():
for tblname in ('main_jobevent', 'main_inventoryupdateevent', 'main_projectupdateevent', 'main_adhoccommandevent', 'main_systemjobevent'):
with connection.cursor() as cursor:
cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_unpartitioned_{tblname}',))
if bool(cursor.rowcount):
from awx.main.tasks import migrate_legacy_event_data
migrate_legacy_event_data.apply_async([tblname])
def cleanup_created_modified_by(sender, **kwargs):
# work around a bug in django-polymorphic that doesn't properly
# handle cascades for reverse foreign keys on the polymorphic base model

View File

@ -15,7 +15,7 @@ from django.core.exceptions import ValidationError
# AWX
from awx.api.versioning import reverse
from awx.main.models.base import prevent_search, AD_HOC_JOB_TYPE_CHOICES, VERBOSITY_CHOICES, VarsDictProperty
from awx.main.models.events import AdHocCommandEvent
from awx.main.models.events import AdHocCommandEvent, UnpartitionedAdHocCommandEvent
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.models.notifications import JobNotificationMixin, NotificationTemplate
@ -127,6 +127,8 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
@property
def event_class(self):
if self.has_unpartitioned_events:
return UnpartitionedAdHocCommandEvent
return AdHocCommandEvent
@property

View File

@ -15,6 +15,7 @@ from django.utils.encoding import force_text
from awx.api.versioning import reverse
from awx.main import consumers
from awx.main.managers import DeferJobCreatedManager
from awx.main.fields import JSONField
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore
@ -461,6 +462,8 @@ class JobEvent(BasePlaybookEvent):
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id', 'workflow_job_id', 'job_created']
objects = DeferJobCreatedManager()
class Meta:
app_label = 'main'
ordering = ('pk',)
@ -581,10 +584,18 @@ class JobEvent(BasePlaybookEvent):
return self.job.verbosity
class UnpartitionedJobEvent(JobEvent):
class Meta:
proxy = True
UnpartitionedJobEvent._meta.db_table = '_unpartitioned_' + JobEvent._meta.db_table # noqa
class ProjectUpdateEvent(BasePlaybookEvent):
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id', 'workflow_job_id', 'job_created']
objects = DeferJobCreatedManager()
class Meta:
app_label = 'main'
ordering = ('pk',)
@ -612,6 +623,12 @@ class ProjectUpdateEvent(BasePlaybookEvent):
return 'localhost'
class UnpartitionedProjectUpdateEvent(ProjectUpdateEvent):
class Meta:
proxy = True
UnpartitionedProjectUpdateEvent._meta.db_table = '_unpartitioned_' + ProjectUpdateEvent._meta.db_table # noqa
class BaseCommandEvent(CreatedModifiedModel):
"""
An event/message logged from a command for each host.
@ -707,6 +724,8 @@ class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id', 'job_created']
objects = DeferJobCreatedManager()
class Meta:
app_label = 'main'
ordering = ('-pk',)
@ -796,10 +815,18 @@ class AdHocCommandEvent(BaseCommandEvent):
analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=self)))
class UnpartitionedAdHocCommandEvent(AdHocCommandEvent):
class Meta:
proxy = True
UnpartitionedAdHocCommandEvent._meta.db_table = '_unpartitioned_' + AdHocCommandEvent._meta.db_table # noqa
class InventoryUpdateEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id', 'workflow_job_id', 'job_created']
objects = DeferJobCreatedManager()
class Meta:
app_label = 'main'
ordering = ('-pk',)
@ -834,10 +861,18 @@ class InventoryUpdateEvent(BaseCommandEvent):
return False
class UnpartitionedInventoryUpdateEvent(InventoryUpdateEvent):
class Meta:
proxy = True
UnpartitionedInventoryUpdateEvent._meta.db_table = '_unpartitioned_' + InventoryUpdateEvent._meta.db_table # noqa
class SystemJobEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id', 'job_created']
objects = DeferJobCreatedManager()
class Meta:
app_label = 'main'
ordering = ('-pk',)
@ -870,3 +905,9 @@ class SystemJobEvent(BaseCommandEvent):
@property
def changed(self):
return False
class UnpartitionedSystemJobEvent(SystemJobEvent):
class Meta:
proxy = True
UnpartitionedSystemJobEvent._meta.db_table = '_unpartitioned_' + SystemJobEvent._meta.db_table # noqa

View File

@ -35,7 +35,7 @@ from awx.main.fields import (
)
from awx.main.managers import HostManager
from awx.main.models.base import BaseModel, CommonModelNameNotUnique, VarsDictProperty, CLOUD_INVENTORY_SOURCES, prevent_search, accepts_json
from awx.main.models.events import InventoryUpdateEvent
from awx.main.models.events import InventoryUpdateEvent, UnpartitionedInventoryUpdateEvent
from awx.main.models.unified_jobs import UnifiedJob, UnifiedJobTemplate
from awx.main.models.mixins import (
ResourceMixin,
@ -1265,6 +1265,8 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
@property
def event_class(self):
if self.has_unpartitioned_events:
return UnpartitionedInventoryUpdateEvent
return InventoryUpdateEvent
@property

View File

@ -37,7 +37,7 @@ from awx.main.models.base import (
VERBOSITY_CHOICES,
VarsDictProperty,
)
from awx.main.models.events import JobEvent, SystemJobEvent
from awx.main.models.events import JobEvent, UnpartitionedJobEvent, UnpartitionedSystemJobEvent, SystemJobEvent
from awx.main.models.unified_jobs import UnifiedJobTemplate, UnifiedJob
from awx.main.models.notifications import (
NotificationTemplate,
@ -614,6 +614,8 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
@property
def event_class(self):
if self.has_unpartitioned_events:
return UnpartitionedJobEvent
return JobEvent
def copy_unified_job(self, **new_prompts):
@ -1259,6 +1261,8 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
@property
def event_class(self):
if self.has_unpartitioned_events:
return UnpartitionedSystemJobEvent
return SystemJobEvent
@property

View File

@ -19,7 +19,7 @@ from django.utils.timezone import now, make_aware, get_default_timezone
# AWX
from awx.api.versioning import reverse
from awx.main.models.base import PROJECT_UPDATE_JOB_TYPE_CHOICES, PERM_INVENTORY_DEPLOY
from awx.main.models.events import ProjectUpdateEvent
from awx.main.models.events import ProjectUpdateEvent, UnpartitionedProjectUpdateEvent
from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin,
@ -555,6 +555,8 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
@property
def event_class(self):
if self.has_unpartitioned_events:
return UnpartitionedProjectUpdateEvent
return ProjectUpdateEvent
@property

View File

@ -736,18 +736,6 @@ class UnifiedJob(
def _get_task_class(cls):
raise NotImplementedError # Implement in subclasses.
@property
def created_or_epoch(self):
# returns self.created *unless* the job was created *prior*
# to the datetime the event partition migration is applied
# (in that case, it returns the epoch, which is the date
# which is automatically applied to all events rows that predate
# that migration)
applied = get_event_partition_epoch()
if applied and self.created < applied:
return datetime.datetime.utcfromtimestamp(0)
return self.created
@property
def can_run_containerized(self):
return False
@ -1003,11 +991,18 @@ class UnifiedJob(
'main_systemjob': 'system_job_id',
}[tablename]
@property
def has_unpartitioned_events(self):
applied = get_event_partition_epoch()
return applied and self.created < applied
def get_event_queryset(self):
return self.event_class.objects.filter(**{
kwargs = {
self.event_parent_key: self.id,
'job_created': self.created
})
}
if not self.has_unpartitioned_events:
kwargs['job_created'] = self.created
return self.event_class.objects.filter(**kwargs)
@property
def event_processing_finished(self):
@ -1093,13 +1088,15 @@ class UnifiedJob(
# .write() calls on the fly to maintain this interface
_write = fd.write
fd.write = lambda s: _write(smart_text(s))
tbl = self._meta.db_table + 'event'
created_by_cond = ''
if self.has_unpartitioned_events:
tbl = f'_unpartitioned_{tbl}'
else:
created_by_cond = f"job_created='{self.created.isoformat()}' AND "
cursor.copy_expert(
"copy (select stdout from {} where {}={} and stdout != '' order by start_line) to stdout".format(
self._meta.db_table + 'event', self.event_parent_key, self.id
),
fd,
)
sql = f"copy (select stdout from {tbl} where {created_by_cond}{self.event_parent_key}={self.id} and stdout != '' order by start_line) to stdout" # nosql
cursor.copy_expert(sql, fd)
if hasattr(fd, 'name'):
# If we're dealing with a physical file, use `sed` to clean

View File

@ -32,7 +32,7 @@ import sys
# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
from django.db import transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now
from django.utils.encoding import smart_str
@ -80,8 +80,7 @@ from awx.main.models import (
InventoryUpdateEvent,
AdHocCommandEvent,
SystemJobEvent,
build_safe_env,
migrate_events_to_partitions
build_safe_env
)
from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError, PostRunError
@ -174,12 +173,6 @@ def dispatch_startup():
cluster_node_heartbeat()
Metrics().clear_values()
# at process startup, detect the need to migrate old event records to
# partitions; at *some point* in the future, once certain versions of AWX
# and Tower fall out of use/support, we can probably just _assume_ that
# everybody has moved to partitions, and remove this code entirely
migrate_events_to_partitions()
# Update Tower's rsyslog.conf file based on loggins settings in the db
reconfigure_rsyslog()
@ -689,42 +682,6 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields()
@task(queue=get_local_queuename)
def migrate_legacy_event_data(tblname):
if 'event' not in tblname:
return
with advisory_lock(f'partition_migration_{tblname}', wait=False) as acquired:
if acquired is False:
return
chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE
def _remaining():
try:
cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{tblname};')
return cursor.fetchone()[0]
except ProgrammingError:
# the table is gone (migration is unnecessary)
return None
with connection.cursor() as cursor:
total_rows = _remaining()
while total_rows:
with transaction.atomic():
cursor.execute(f'''INSERT INTO {tblname} SELECT *, '1970-01-01' as job_created FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;''')
last_insert_pk = cursor.fetchone()
if last_insert_pk is None:
# this means that the SELECT from the old table was
# empty, and there was nothing to insert (so we're done)
break
last_insert_pk = last_insert_pk[0]
cursor.execute(f'DELETE FROM _unpartitioned_{tblname} WHERE id IN (SELECT id FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk});')
logger.warn(f'migrated rows to partitioned {tblname} from _unpartitioned_{tblname}; # ({last_insert_pk} rows remaining)')
if _remaining() is None:
cursor.execute(f'DROP TABLE IF EXISTS _unpartitioned_{tblname}')
logger.warn(f'{tblname} migration to partitions has finished')
@task(queue=get_local_queuename)
def delete_inventory(inventory_id, user_id, retries=5):
# Delete inventory as user