mirror of
https://github.com/ansible/awx.git
synced 2026-04-11 04:59:22 -02:30
Add in an async task to migrate the data over
This commit is contained in:
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings # noqa
|
from django.conf import settings # noqa
|
||||||
|
from django.db import connection
|
||||||
from django.db.models.signals import pre_delete # noqa
|
from django.db.models.signals import pre_delete # noqa
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
@@ -99,6 +100,58 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
|
|||||||
User.add_to_class('accessible_objects', user_accessible_objects)
|
User.add_to_class('accessible_objects', user_accessible_objects)
|
||||||
|
|
||||||
|
|
||||||
|
def convert_jsonfields():
|
||||||
|
if connection.vendor != 'postgresql':
|
||||||
|
return
|
||||||
|
|
||||||
|
# fmt: off
|
||||||
|
fields = [
|
||||||
|
('main_activitystream', 'id', (
|
||||||
|
'deleted_actor',
|
||||||
|
'setting',
|
||||||
|
)),
|
||||||
|
('main_job', 'unifiedjob_ptr_id', (
|
||||||
|
'survey_passwords',
|
||||||
|
)),
|
||||||
|
('main_joblaunchconfig', 'id', (
|
||||||
|
'char_prompts',
|
||||||
|
'survey_passwords',
|
||||||
|
)),
|
||||||
|
('main_notification', 'id', (
|
||||||
|
'body',
|
||||||
|
)),
|
||||||
|
('main_unifiedjob', 'id', (
|
||||||
|
'job_env',
|
||||||
|
)),
|
||||||
|
('main_workflowjob', 'unifiedjob_ptr_id', (
|
||||||
|
'char_prompts',
|
||||||
|
'survey_passwords',
|
||||||
|
)),
|
||||||
|
('main_workflowjobnode', 'id', (
|
||||||
|
'char_prompts',
|
||||||
|
'survey_passwords',
|
||||||
|
)),
|
||||||
|
]
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
for table, pkfield, columns in fields:
|
||||||
|
# Do the renamed old columns still exist? If so, run the task.
|
||||||
|
old_columns = ','.join(f"'{column}_old'" for column in columns)
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
|
select count(1) from information_schema.columns
|
||||||
|
where
|
||||||
|
table_name = %s and column_name in ({old_columns});
|
||||||
|
""",
|
||||||
|
(table,),
|
||||||
|
)
|
||||||
|
if cursor.fetchone()[0]:
|
||||||
|
from awx.main.tasks.system import migrate_jsonfield
|
||||||
|
|
||||||
|
migrate_jsonfield.apply_async([table, pkfield, columns])
|
||||||
|
|
||||||
|
|
||||||
def cleanup_created_modified_by(sender, **kwargs):
|
def cleanup_created_modified_by(sender, **kwargs):
|
||||||
# work around a bug in django-polymorphic that doesn't properly
|
# work around a bug in django-polymorphic that doesn't properly
|
||||||
# handle cascades for reverse foreign keys on the polymorphic base model
|
# handle cascades for reverse foreign keys on the polymorphic base model
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
import functools
|
import functools
|
||||||
import importlib
|
import importlib
|
||||||
|
import itertools
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -14,7 +15,7 @@ from datetime import datetime
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import transaction, DatabaseError, IntegrityError
|
from django.db import connection, transaction, DatabaseError, IntegrityError
|
||||||
from django.db.models.fields.related import ForeignKey
|
from django.db.models.fields.related import ForeignKey
|
||||||
from django.utils.timezone import now, timedelta
|
from django.utils.timezone import now, timedelta
|
||||||
from django.utils.encoding import smart_str
|
from django.utils.encoding import smart_str
|
||||||
@@ -48,6 +49,7 @@ from awx.main.models import (
|
|||||||
SmartInventoryMembership,
|
SmartInventoryMembership,
|
||||||
Job,
|
Job,
|
||||||
HostMetric,
|
HostMetric,
|
||||||
|
convert_jsonfields,
|
||||||
)
|
)
|
||||||
from awx.main.constants import ACTIVE_STATES
|
from awx.main.constants import ACTIVE_STATES
|
||||||
from awx.main.dispatch.publish import task
|
from awx.main.dispatch.publish import task
|
||||||
@@ -86,6 +88,11 @@ def dispatch_startup():
|
|||||||
if settings.IS_K8S:
|
if settings.IS_K8S:
|
||||||
write_receptor_config()
|
write_receptor_config()
|
||||||
|
|
||||||
|
try:
|
||||||
|
convert_jsonfields()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed json field conversion, skipping.")
|
||||||
|
|
||||||
startup_logger.debug("Syncing Schedules")
|
startup_logger.debug("Syncing Schedules")
|
||||||
for sch in Schedule.objects.all():
|
for sch in Schedule.objects.all():
|
||||||
try:
|
try:
|
||||||
@@ -129,6 +136,52 @@ def inform_cluster_of_shutdown():
|
|||||||
logger.exception('Encountered problem with normal shutdown signal.')
|
logger.exception('Encountered problem with normal shutdown signal.')
|
||||||
|
|
||||||
|
|
||||||
|
@task(queue=get_task_queuename)
|
||||||
|
def migrate_jsonfield(table, pkfield, columns):
|
||||||
|
batchsize = 10000
|
||||||
|
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
|
||||||
|
if not acquired:
|
||||||
|
return
|
||||||
|
|
||||||
|
from django.db.migrations.executor import MigrationExecutor
|
||||||
|
|
||||||
|
# If Django is currently running migrations, wait until it is done.
|
||||||
|
while True:
|
||||||
|
executor = MigrationExecutor(connection)
|
||||||
|
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
|
||||||
|
break
|
||||||
|
time.sleep(120)
|
||||||
|
|
||||||
|
logger.warning(f"Migrating json fields for {table}: {', '.join(columns)}")
|
||||||
|
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
for i in itertools.count(0, batchsize):
|
||||||
|
# Are there even any rows in the table beyond this point?
|
||||||
|
cursor.execute(f"select count(1) from {table} where {pkfield} >= %s limit 1;", (i,))
|
||||||
|
if not cursor.fetchone()[0]:
|
||||||
|
break
|
||||||
|
|
||||||
|
column_expr = ', '.join(f"{colname} = {colname}_old::jsonb" for colname in columns)
|
||||||
|
# If any of the old columns have non-null values, the data needs to be cast and copied over.
|
||||||
|
empty_expr = ' or '.join(f"{colname}_old is not null" for colname in columns)
|
||||||
|
cursor.execute( # Only clobber the new fields if there is non-null data in the old ones.
|
||||||
|
f"""
|
||||||
|
update {table}
|
||||||
|
set {column_expr}
|
||||||
|
where {pkfield} >= %s and {pkfield} < %s
|
||||||
|
and {empty_expr};
|
||||||
|
""",
|
||||||
|
(i, i + batchsize),
|
||||||
|
)
|
||||||
|
rows = cursor.rowcount
|
||||||
|
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}, {rows} rows affected.")
|
||||||
|
|
||||||
|
column_expr = ', '.join(f"DROP COLUMN {column}_old" for column in columns)
|
||||||
|
cursor.execute(f"ALTER TABLE {table} {column_expr};")
|
||||||
|
|
||||||
|
logger.warning(f"Migration of {table} to jsonb is finished.")
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename)
|
@task(queue=get_task_queuename)
|
||||||
def apply_cluster_membership_policies():
|
def apply_cluster_membership_policies():
|
||||||
from awx.main.signals import disable_activity_stream
|
from awx.main.signals import disable_activity_stream
|
||||||
|
|||||||
Reference in New Issue
Block a user