Implement an out-of-band migration to change the json fields

This commit is contained in:
Jeff Bradberry
2022-02-14 15:13:10 -05:00
parent 028f09002f
commit 676b8f6d8f
2 changed files with 212 additions and 1 deletions

View File

@@ -3,6 +3,7 @@
# Django # Django
from django.conf import settings # noqa from django.conf import settings # noqa
from django.db import connection
from django.db.models.signals import pre_delete # noqa from django.db.models.signals import pre_delete # noqa
# AWX # AWX
@@ -97,6 +98,93 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
User.add_to_class('accessible_objects', user_accessible_objects) User.add_to_class('accessible_objects', user_accessible_objects)
def convert_jsonfields_to_jsonb():
if connection.vendor != 'postgresql':
return
# fmt: off
fields = [ # Table name, expensive or not, tuple of column names
('conf_setting', False, (
'value',
)),
('main_instancegroup', False, (
'policy_instance_list',
)),
('main_jobtemplate', False, (
'survey_spec',
)),
('main_notificationtemplate', False, (
'notification_configuration',
'messages',
)),
('main_project', False, (
'playbook_files',
'inventory_files',
)),
('main_schedule', False, (
'extra_data',
'char_prompts',
'survey_passwords',
)),
('main_workflowjobtemplate', False, (
'survey_spec',
'char_prompts',
)),
('main_workflowjobtemplatenode', False, (
'char_prompts',
'extra_data',
'survey_passwords',
)),
('main_activitystream', True, (
'setting', # NN = NOT NULL
'deleted_actor',
)),
('main_job', True, (
'survey_passwords', # NN
'artifacts', # NN
)),
('main_joblaunchconfig', True, (
'extra_data', # NN
'survey_passwords', # NN
'char_prompts', # NN
)),
('main_notification', True, (
'body', # NN
)),
('main_unifiedjob', True, (
'job_env', # NN
)),
('main_workflowjob', True, (
'survey_passwords', # NN
'char_prompts', # NN
)),
('main_workflowjobnode', True, (
'char_prompts', # NN
'ancestor_artifacts', # NN
'extra_data', # NN
'survey_passwords', # NN
)),
]
# fmt: on
with connection.cursor() as cursor:
for table, expensive, columns in fields:
cursor.execute(
"""
select count(1) from information_schema.columns
where
table_name = %s and
column_name in %s and
data_type != 'jsonb';
""",
(table, columns),
)
if cursor.fetchone()[0]:
from awx.main.tasks.system import migrate_json_fields
migrate_json_fields.apply_async([table, expensive, columns])
def cleanup_created_modified_by(sender, **kwargs): def cleanup_created_modified_by(sender, **kwargs):
# work around a bug in django-polymorphic that doesn't properly # work around a bug in django-polymorphic that doesn't properly
# handle cascades for reverse foreign keys on the polymorphic base model # handle cascades for reverse foreign keys on the polymorphic base model

View File

@@ -1,5 +1,6 @@
# Python # Python
from collections import namedtuple from collections import namedtuple
import itertools
import functools import functools
import importlib import importlib
import json import json
@@ -13,7 +14,7 @@ from distutils.version import LooseVersion as Version
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError from django.db import connection, transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now from django.utils.timezone import now
from django.utils.encoding import smart_str from django.utils.encoding import smart_str
@@ -22,6 +23,7 @@ from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_noop from django.utils.translation import gettext_noop
from django.core.cache import cache from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.contrib.contenttypes.models import ContentType
# Django-CRUM # Django-CRUM
from crum import impersonate from crum import impersonate
@@ -46,6 +48,7 @@ from awx.main.models import (
Inventory, Inventory,
SmartInventoryMembership, SmartInventoryMembership,
Job, Job,
convert_jsonfields_to_jsonb,
) )
from awx.main.constants import ACTIVE_STATES from awx.main.constants import ACTIVE_STATES
from awx.main.dispatch.publish import task from awx.main.dispatch.publish import task
@@ -78,6 +81,9 @@ Try upgrading OpenSSH or providing your private key in an different format. \
def dispatch_startup(): def dispatch_startup():
startup_logger = logging.getLogger('awx.main.tasks') startup_logger = logging.getLogger('awx.main.tasks')
convert_jsonfields_to_jsonb()
startup_logger.debug("Syncing Schedules") startup_logger.debug("Syncing Schedules")
for sch in Schedule.objects.all(): for sch in Schedule.objects.all():
try: try:
@@ -121,6 +127,123 @@ def inform_cluster_of_shutdown():
logger.exception('Encountered problem with normal shutdown signal.') logger.exception('Encountered problem with normal shutdown signal.')
def migrate_json_fields_expensive(table, columns):
batchsize = 50000
ct = ContentType.objects.get_by_natural_key(*table.split('_', 1))
model = ct.model_class()
# Phase 1: add the new columns, making them nullable to avoid populating them
with connection.schema_editor() as schema_editor:
# See: https://docs.djangoproject.com/en/3.1/ref/schema-editor/
for colname in columns:
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.add_field(model, new_f)
# Create a trigger to make sure new data automatically gets put in both fields.
with connection.cursor() as cursor:
# It's a little annoying, I think this trigger will re-do
# the same work as the update query in Phase 2
cursor.execute(
f"""
create or replace function update_{table}_{colname}()
returns trigger as $body$
begin
new._{colname} = new.{colname}::jsonb
return new;
end
$body$ language plpgsql;
"""
)
cursor.execute(
f"""
create trigger {table}_{colname}_trigger
before insert or update
on {table}
for each row
execute procedure update_{table}_{colname};
"""
)
# Phase 2: copy over the data
with connection.cursor() as cursor:
rows = 0
for i in itertools.count(0, batchsize):
cursor.execute(f"select count(1) from {table} where id >= %s;", (i,))
if not cursor.fetchone()[0]:
break
column_expr = ', '.join(f"_{colname} = {colname}::jsonb" for colname in columns)
cursor.execute(
f"""
update {table}
set {column_expr}
where id >= %s and id < %s;
""",
(i, i + batchsize),
)
rows += cursor.rowcount
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}.")
logger.warning(f"Data copied for {rows} rows on {table}.")
# Phase 3: drop the old column and rename the new one
with connection.schema_editor() as schema_editor:
# FIXME: Grab a lock explicitly here?
for colname in columns:
with connection.cursor() as cursor:
cursor.execute(f"drop trigger {table}_{colname}_trigger;")
cursor.execute(f"drop function update_{table}_{colname};")
f = model._meta.get_field(colname)
_, _, args, kwargs = f.deconstruct()
kwargs['null'] = True
new_f = f.__class__(*args, **kwargs)
new_f.set_attributes_from_name(f'_{colname}')
schema_editor.remove_field(model, f)
_, _, args, kwargs = new_f.deconstruct()
f = new_f.__class__(*args, **kwargs)
f.set_attributes_from_name(colname)
schema_editor.alter_field(model, new_f, f)
@task(queue=get_local_queuename)
def migrate_json_fields(table, expensive, columns):
logger.warning(f"Migrating json fields: {table} {columns}")
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
if not acquired:
return
from django.db.migrations.executor import MigrationExecutor
# If Django is currently running migrations, wait until it is done.
while True:
executor = MigrationExecutor(connection)
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
break
time.sleep(60)
if expensive:
migrate_json_fields_expensive(table, columns)
else:
with connection.cursor() as cursor:
column_expr = " ".join(f"ALTER {colname} TYPE jsonb" for colname in columns)
cursor.execute(f"ALTER TABLE {table} {column_expr};")
logger.warning(f"Migration of {table} to jsonb is finished")
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def apply_cluster_membership_policies(): def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream from awx.main.signals import disable_activity_stream