diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index ed49b98083..107c7a9418 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -3,6 +3,7 @@ # Django from django.conf import settings # noqa +from django.db import connection from django.db.models.signals import pre_delete # noqa # AWX @@ -97,6 +98,93 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors) User.add_to_class('accessible_objects', user_accessible_objects) +def convert_jsonfields_to_jsonb(): + if connection.vendor != 'postgresql': + return + + # fmt: off + fields = [ # Table name, expensive or not, tuple of column names + ('conf_setting', False, ( + 'value', + )), + ('main_instancegroup', False, ( + 'policy_instance_list', + )), + ('main_jobtemplate', False, ( + 'survey_spec', + )), + ('main_notificationtemplate', False, ( + 'notification_configuration', + 'messages', + )), + ('main_project', False, ( + 'playbook_files', + 'inventory_files', + )), + ('main_schedule', False, ( + 'extra_data', + 'char_prompts', + 'survey_passwords', + )), + ('main_workflowjobtemplate', False, ( + 'survey_spec', + 'char_prompts', + )), + ('main_workflowjobtemplatenode', False, ( + 'char_prompts', + 'extra_data', + 'survey_passwords', + )), + ('main_activitystream', True, ( + 'setting', # NN = NOT NULL + 'deleted_actor', + )), + ('main_job', True, ( + 'survey_passwords', # NN + 'artifacts', # NN + )), + ('main_joblaunchconfig', True, ( + 'extra_data', # NN + 'survey_passwords', # NN + 'char_prompts', # NN + )), + ('main_notification', True, ( + 'body', # NN + )), + ('main_unifiedjob', True, ( + 'job_env', # NN + )), + ('main_workflowjob', True, ( + 'survey_passwords', # NN + 'char_prompts', # NN + )), + ('main_workflowjobnode', True, ( + 'char_prompts', # NN + 'ancestor_artifacts', # NN + 'extra_data', # NN + 'survey_passwords', # NN + )), + ] + # fmt: on + + with connection.cursor() as cursor: + for table, expensive, columns in fields: + cursor.execute( + """ + select count(1) from information_schema.columns + where + table_name = %s and + column_name in %s and + data_type != 'jsonb'; + """, + (table, columns), + ) + if cursor.fetchone()[0]: + from awx.main.tasks.system import migrate_json_fields + + migrate_json_fields.apply_async([table, expensive, columns]) + + def cleanup_created_modified_by(sender, **kwargs): # work around a bug in django-polymorphic that doesn't properly # handle cascades for reverse foreign keys on the polymorphic base model diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 4e1b9eceed..fd47d63a3b 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -1,5 +1,6 @@ # Python from collections import namedtuple +import itertools import functools import importlib import json @@ -13,7 +14,7 @@ from distutils.version import LooseVersion as Version # Django from django.conf import settings -from django.db import transaction, DatabaseError, IntegrityError +from django.db import connection, transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey from django.utils.timezone import now from django.utils.encoding import smart_str @@ -22,6 +23,7 @@ from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_noop from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist +from django.contrib.contenttypes.models import ContentType # Django-CRUM from crum import impersonate @@ -46,6 +48,7 @@ from awx.main.models import ( Inventory, SmartInventoryMembership, Job, + convert_jsonfields_to_jsonb, ) from awx.main.constants import ACTIVE_STATES from awx.main.dispatch.publish import task @@ -78,6 +81,9 @@ Try upgrading OpenSSH or providing your private key in an different format. \ def dispatch_startup(): startup_logger = logging.getLogger('awx.main.tasks') + + convert_jsonfields_to_jsonb() + startup_logger.debug("Syncing Schedules") for sch in Schedule.objects.all(): try: @@ -121,6 +127,123 @@ def inform_cluster_of_shutdown(): logger.exception('Encountered problem with normal shutdown signal.') +def migrate_json_fields_expensive(table, columns): + batchsize = 50000 + + ct = ContentType.objects.get_by_natural_key(*table.split('_', 1)) + model = ct.model_class() + + # Phase 1: add the new columns, making them nullable to avoid populating them + with connection.schema_editor() as schema_editor: + # See: https://docs.djangoproject.com/en/3.1/ref/schema-editor/ + + for colname in columns: + f = model._meta.get_field(colname) + _, _, args, kwargs = f.deconstruct() + kwargs['null'] = True + new_f = f.__class__(*args, **kwargs) + new_f.set_attributes_from_name(f'_{colname}') + + schema_editor.add_field(model, new_f) + + # Create a trigger to make sure new data automatically gets put in both fields. + with connection.cursor() as cursor: + # It's a little annoying, I think this trigger will re-do + # the same work as the update query in Phase 2 + cursor.execute( + f""" + create or replace function update_{table}_{colname}() + returns trigger as $body$ + begin + new._{colname} = new.{colname}::jsonb + return new; + end + $body$ language plpgsql; + """ + ) + cursor.execute( + f""" + create trigger {table}_{colname}_trigger + before insert or update + on {table} + for each row + execute procedure update_{table}_{colname}; + """ + ) + + # Phase 2: copy over the data + with connection.cursor() as cursor: + rows = 0 + for i in itertools.count(0, batchsize): + cursor.execute(f"select count(1) from {table} where id >= %s;", (i,)) + if not cursor.fetchone()[0]: + break + + column_expr = ', '.join(f"_{colname} = {colname}::jsonb" for colname in columns) + cursor.execute( + f""" + update {table} + set {column_expr} + where id >= %s and id < %s; + """, + (i, i + batchsize), + ) + rows += cursor.rowcount + logger.debug(f"Batch {i} to {i + batchsize} copied on {table}.") + + logger.warning(f"Data copied for {rows} rows on {table}.") + + # Phase 3: drop the old column and rename the new one + with connection.schema_editor() as schema_editor: + + # FIXME: Grab a lock explicitly here? + for colname in columns: + with connection.cursor() as cursor: + cursor.execute(f"drop trigger {table}_{colname}_trigger;") + cursor.execute(f"drop function update_{table}_{colname};") + + f = model._meta.get_field(colname) + _, _, args, kwargs = f.deconstruct() + kwargs['null'] = True + new_f = f.__class__(*args, **kwargs) + new_f.set_attributes_from_name(f'_{colname}') + + schema_editor.remove_field(model, f) + + _, _, args, kwargs = new_f.deconstruct() + f = new_f.__class__(*args, **kwargs) + f.set_attributes_from_name(colname) + + schema_editor.alter_field(model, new_f, f) + + +@task(queue=get_local_queuename) +def migrate_json_fields(table, expensive, columns): + logger.warning(f"Migrating json fields: {table} {columns}") + + with advisory_lock(f'json_migration_{table}', wait=False) as acquired: + if not acquired: + return + + from django.db.migrations.executor import MigrationExecutor + + # If Django is currently running migrations, wait until it is done. + while True: + executor = MigrationExecutor(connection) + if not executor.migration_plan(executor.loader.graph.leaf_nodes()): + break + time.sleep(60) + + if expensive: + migrate_json_fields_expensive(table, columns) + else: + with connection.cursor() as cursor: + column_expr = " ".join(f"ALTER {colname} TYPE jsonb" for colname in columns) + cursor.execute(f"ALTER TABLE {table} {column_expr};") + + logger.warning(f"Migration of {table} to jsonb is finished") + + @task(queue=get_local_queuename) def apply_cluster_membership_policies(): from awx.main.signals import disable_activity_stream