diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 19a422740c..fc779f9cf5 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -3,6 +3,7 @@ # Django from django.conf import settings # noqa +from django.db import connection from django.db.models.signals import pre_delete # noqa # AWX @@ -99,6 +100,58 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors) User.add_to_class('accessible_objects', user_accessible_objects) +def convert_jsonfields(): + if connection.vendor != 'postgresql': + return + + # fmt: off + fields = [ + ('main_activitystream', 'id', ( + 'deleted_actor', + 'setting', + )), + ('main_job', 'unifiedjob_ptr_id', ( + 'survey_passwords', + )), + ('main_joblaunchconfig', 'id', ( + 'char_prompts', + 'survey_passwords', + )), + ('main_notification', 'id', ( + 'body', + )), + ('main_unifiedjob', 'id', ( + 'job_env', + )), + ('main_workflowjob', 'unifiedjob_ptr_id', ( + 'char_prompts', + 'survey_passwords', + )), + ('main_workflowjobnode', 'id', ( + 'char_prompts', + 'survey_passwords', + )), + ] + # fmt: on + + with connection.cursor() as cursor: + for table, pkfield, columns in fields: + # Do the renamed old columns still exist? If so, run the task. + old_columns = ','.join(f"'{column}_old'" for column in columns) + cursor.execute( + f""" + select count(1) from information_schema.columns + where + table_name = %s and column_name in ({old_columns}); + """, + (table,), + ) + if cursor.fetchone()[0]: + from awx.main.tasks.system import migrate_jsonfield + + migrate_jsonfield.apply_async([table, pkfield, columns]) + + def cleanup_created_modified_by(sender, **kwargs): # work around a bug in django-polymorphic that doesn't properly # handle cascades for reverse foreign keys on the polymorphic base model diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 83b66c959b..9e834f273e 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -2,6 +2,7 @@ from collections import namedtuple import functools import importlib +import itertools import json import logging import os @@ -14,7 +15,7 @@ from datetime import datetime # Django from django.conf import settings -from django.db import transaction, DatabaseError, IntegrityError +from django.db import connection, transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str @@ -48,6 +49,7 @@ from awx.main.models import ( SmartInventoryMembership, Job, HostMetric, + convert_jsonfields, ) from awx.main.constants import ACTIVE_STATES from awx.main.dispatch.publish import task @@ -86,6 +88,11 @@ def dispatch_startup(): if settings.IS_K8S: write_receptor_config() + try: + convert_jsonfields() + except Exception: + logger.exception("Failed json field conversion, skipping.") + startup_logger.debug("Syncing Schedules") for sch in Schedule.objects.all(): try: @@ -129,6 +136,52 @@ def inform_cluster_of_shutdown(): logger.exception('Encountered problem with normal shutdown signal.') +@task(queue=get_task_queuename) +def migrate_jsonfield(table, pkfield, columns): + batchsize = 10000 + with advisory_lock(f'json_migration_{table}', wait=False) as acquired: + if not acquired: + return + + from django.db.migrations.executor import MigrationExecutor + + # If Django is currently running migrations, wait until it is done. + while True: + executor = MigrationExecutor(connection) + if not executor.migration_plan(executor.loader.graph.leaf_nodes()): + break + time.sleep(120) + + logger.warning(f"Migrating json fields for {table}: {', '.join(columns)}") + + with connection.cursor() as cursor: + for i in itertools.count(0, batchsize): + # Are there even any rows in the table beyond this point? + cursor.execute(f"select count(1) from {table} where {pkfield} >= %s limit 1;", (i,)) + if not cursor.fetchone()[0]: + break + + column_expr = ', '.join(f"{colname} = {colname}_old::jsonb" for colname in columns) + # If any of the old columns have non-null values, the data needs to be cast and copied over. + empty_expr = ' or '.join(f"{colname}_old is not null" for colname in columns) + cursor.execute( # Only clobber the new fields if there is non-null data in the old ones. + f""" + update {table} + set {column_expr} + where {pkfield} >= %s and {pkfield} < %s + and {empty_expr}; + """, + (i, i + batchsize), + ) + rows = cursor.rowcount + logger.debug(f"Batch {i} to {i + batchsize} copied on {table}, {rows} rows affected.") + + column_expr = ', '.join(f"DROP COLUMN {column}_old" for column in columns) + cursor.execute(f"ALTER TABLE {table} {column_expr};") + + logger.warning(f"Migration of {table} to jsonb is finished.") + + @task(queue=get_task_queuename) def apply_cluster_membership_policies(): from awx.main.signals import disable_activity_stream