Remove the out-of-band migration

that was turning all old JSONFields into a jsonb type database column.
The use of JSONBlob makes this unnecessary.
This commit is contained in:
Jeff Bradberry 2022-03-23 17:33:23 -04:00
parent e3f3ab224a
commit d54838cd94
2 changed files with 1 additions and 211 deletions

View File

@ -3,7 +3,6 @@
# Django
from django.conf import settings # noqa
from django.db import connection
from django.db.models.signals import pre_delete # noqa
# AWX
@ -98,93 +97,6 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
User.add_to_class('accessible_objects', user_accessible_objects)
def convert_jsonfields_to_jsonb():
if connection.vendor != 'postgresql':
return
# fmt: off
fields = [ # Table name, expensive or not, tuple of column names
('conf_setting', False, (
'value',
)),
('main_instancegroup', False, (
'policy_instance_list',
)),
('main_jobtemplate', False, (
'survey_spec',
)),
('main_notificationtemplate', False, (
'notification_configuration',
'messages',
)),
('main_project', False, (
'playbook_files',
'inventory_files',
)),
('main_schedule', False, (
'extra_data',
'char_prompts',
'survey_passwords',
)),
('main_workflowjobtemplate', False, (
'survey_spec',
'char_prompts',
)),
('main_workflowjobtemplatenode', False, (
'char_prompts',
'extra_data',
'survey_passwords',
)),
('main_activitystream', True, (
'setting', # NN = NOT NULL
'deleted_actor',
)),
('main_job', True, (
'survey_passwords', # NN
'artifacts', # NN
)),
('main_joblaunchconfig', True, (
'extra_data', # NN
'survey_passwords', # NN
'char_prompts', # NN
)),
('main_notification', True, (
'body', # NN
)),
('main_unifiedjob', True, (
'job_env', # NN
)),
('main_workflowjob', True, (
'survey_passwords', # NN
'char_prompts', # NN
)),
('main_workflowjobnode', True, (
'char_prompts', # NN
'ancestor_artifacts', # NN
'extra_data', # NN
'survey_passwords', # NN
)),
]
# fmt: on
with connection.cursor() as cursor:
for table, expensive, columns in fields:
cursor.execute(
"""
select count(1) from information_schema.columns
where
table_name = %s and
column_name in %s and
data_type != 'jsonb';
""",
(table, columns),
)
if cursor.fetchone()[0]:
from awx.main.tasks.system import migrate_json_fields
migrate_json_fields.apply_async([table, expensive, columns])
def cleanup_created_modified_by(sender, **kwargs):
# work around a bug in django-polymorphic that doesn't properly
# handle cascades for reverse foreign keys on the polymorphic base model

View File

@ -1,6 +1,5 @@
# Python
from collections import namedtuple
import itertools
import functools
import importlib
import json
@ -14,7 +13,7 @@ from distutils.version import LooseVersion as Version
# Django
from django.conf import settings
from django.db import connection, transaction, DatabaseError, IntegrityError
from django.db import transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now
from django.utils.encoding import smart_str
@ -23,7 +22,6 @@ from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_noop
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.contenttypes.models import ContentType
# Django-CRUM
from crum import impersonate
@ -48,7 +46,6 @@ from awx.main.models import (
Inventory,
SmartInventoryMembership,
Job,
convert_jsonfields_to_jsonb,
)
from awx.main.constants import ACTIVE_STATES
from awx.main.dispatch.publish import task
@ -82,8 +79,6 @@ Try upgrading OpenSSH or providing your private key in an different format. \
def dispatch_startup():
startup_logger = logging.getLogger('awx.main.tasks')
convert_jsonfields_to_jsonb()
startup_logger.debug("Syncing Schedules")
for sch in Schedule.objects.all():
try:
@ -127,123 +122,6 @@ def inform_cluster_of_shutdown():
logger.exception('Encountered problem with normal shutdown signal.')
def migrate_json_fields_expensive(table, columns):
batchsize = 50000
ct = ContentType.objects.get_by_natural_key(*table.split('_', 1))
model = ct.model_class()
# Phase 1: add the new columns, making them nullable to avoid populating them
with connection.schema_editor() as schema_editor:
# See: https://docs.djangoproject.com/en/3.1/ref/schema-editor/
for colname in columns:
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.add_field(model, new_f)
# Create a trigger to make sure new data automatically gets put in both fields.
with connection.cursor() as cursor:
# It's a little annoying, I think this trigger will re-do
# the same work as the update query in Phase 2
cursor.execute(
f"""
create or replace function update_{table}_{colname}()
returns trigger as $body$
begin
new._{colname} = new.{colname}::jsonb
return new;
end
$body$ language plpgsql;
"""
)
cursor.execute(
f"""
create trigger {table}_{colname}_trigger
before insert or update
on {table}
for each row
execute procedure update_{table}_{colname};
"""
)
# Phase 2: copy over the data
with connection.cursor() as cursor:
rows = 0
for i in itertools.count(0, batchsize):
cursor.execute(f"select count(1) from {table} where id >= %s;", (i,))
if not cursor.fetchone()[0]:
break
column_expr = ', '.join(f"_{colname} = {colname}::jsonb" for colname in columns)
cursor.execute(
f"""
update {table}
set {column_expr}
where id >= %s and id < %s;
""",
(i, i + batchsize),
)
rows += cursor.rowcount
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}.")
logger.warning(f"Data copied for {rows} rows on {table}.")
# Phase 3: drop the old column and rename the new one
with connection.schema_editor() as schema_editor:
# FIXME: Grab a lock explicitly here?
for colname in columns:
with connection.cursor() as cursor:
cursor.execute(f"drop trigger {table}_{colname}_trigger;")
cursor.execute(f"drop function update_{table}_{colname};")
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.remove_field(model, f)
_, _, args, kwargs = new_f.deconstruct()
f = new_f.__class__(*args, **kwargs)
f.set_attributes_from_name(colname)
schema_editor.alter_field(model, new_f, f)
@task(queue=get_local_queuename)
def migrate_json_fields(table, expensive, columns):
logger.warning(f"Migrating json fields: {table} {columns}")
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
if not acquired:
return
from django.db.migrations.executor import MigrationExecutor
# If Django is currently running migrations, wait until it is done.
while True:
executor = MigrationExecutor(connection)
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
break
time.sleep(60)
if expensive:
migrate_json_fields_expensive(table, columns)
else:
with connection.cursor() as cursor:
column_expr = " ".join(f"ALTER {colname} TYPE jsonb" for colname in columns)
cursor.execute(f"ALTER TABLE {table} {column_expr};")
logger.warning(f"Migration of {table} to jsonb is finished")
@task(queue=get_local_queuename)
def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream