mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 19:30:39 -03:30
move the partition data migration to be a post-upgrade async process
this copies the approach we took with the bigint migration
This commit is contained in:
parent
67046513ae
commit
c7ab3ea86e
@ -2,6 +2,7 @@
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import datetime
|
||||
import dateutil
|
||||
import functools
|
||||
import html
|
||||
@ -174,6 +175,9 @@ from awx.api.views.root import ( # noqa
|
||||
from awx.api.views.webhooks import WebhookKeyView, GithubWebhookReceiver, GitlabWebhookReceiver # noqa
|
||||
|
||||
|
||||
EPOCH = datetime.datetime.utcfromtimestamp(0)
|
||||
|
||||
|
||||
logger = logging.getLogger('awx.api.views')
|
||||
|
||||
|
||||
@ -887,7 +891,9 @@ class ProjectUpdateEventsList(SubListAPIView):
|
||||
job = self.get_parent_object()
|
||||
self.check_parent_access(job)
|
||||
qs = super(ProjectUpdateEventsList, self).get_queryset()
|
||||
return qs.filter(job_created=job.created).order_by('start_line').all()
|
||||
return qs.filter(
|
||||
job_created__in=(job.created, EPOCH)
|
||||
).order_by('start_line').all()
|
||||
|
||||
class SystemJobEventsList(SubListAPIView):
|
||||
|
||||
@ -905,7 +911,9 @@ class SystemJobEventsList(SubListAPIView):
|
||||
def get_queryset(self):
|
||||
job = self.get_parent_object()
|
||||
self.check_parent_access(job)
|
||||
qs = job.system_job_events.select_related('host').filter(job_created=job.created).order_by('start_line')
|
||||
qs = job.system_job_events.select_related('host').filter(
|
||||
job_created__in=(job.created, EPOCH)
|
||||
).order_by('start_line')
|
||||
return qs.all()
|
||||
|
||||
class ProjectUpdateCancel(RetrieveAPIView):
|
||||
@ -3809,7 +3817,7 @@ class HostJobEventsList(BaseJobEventsList):
|
||||
def get_queryset(self):
|
||||
parent_obj = self.get_parent_object()
|
||||
self.check_parent_access(parent_obj)
|
||||
qs = self.request.user.get_queryset(self.model).filter(host=parent_obj, job_created=parent_obj.created)
|
||||
qs = self.request.user.get_queryset(self.model).filter(host=parent_obj)
|
||||
return qs
|
||||
|
||||
|
||||
@ -3825,7 +3833,9 @@ class JobJobEventsList(BaseJobEventsList):
|
||||
def get_queryset(self):
|
||||
job = self.get_parent_object()
|
||||
self.check_parent_access(job)
|
||||
qs = job.job_events.filter(job_created=job.created).select_related('host').order_by('start_line')
|
||||
qs = job.job_events.filter(
|
||||
job_created__in=(job.created, EPOCH)
|
||||
).select_related('host').order_by('start_line')
|
||||
return qs.all()
|
||||
|
||||
|
||||
@ -4008,7 +4018,9 @@ class BaseAdHocCommandEventsList(NoTruncateMixin, SubListAPIView):
|
||||
def get_queryset(self):
|
||||
job = self.get_parent_object()
|
||||
self.check_parent_access(job)
|
||||
qs = job.ad_hoc_command_events.select_related('host').filter(job_created=job.created).order_by('start_line')
|
||||
qs = job.ad_hoc_command_events.select_related('host').filter(
|
||||
job_created__in=(job.created, EPOCH)
|
||||
).order_by('start_line')
|
||||
return qs.all()
|
||||
|
||||
|
||||
|
||||
@ -30,11 +30,11 @@ def migrate_event_data(apps, schema_editor):
|
||||
'main_systemjobevent'
|
||||
):
|
||||
with connection.cursor() as cursor:
|
||||
# mark existing table as *_old;
|
||||
# mark existing table as _unpartitioned_*
|
||||
# we will drop this table after its data
|
||||
# has been moved over
|
||||
cursor.execute(
|
||||
f'ALTER TABLE {tblname} RENAME TO {tblname}_old'
|
||||
f'ALTER TABLE {tblname} RENAME TO _unpartitioned_{tblname}'
|
||||
)
|
||||
|
||||
# drop primary key constraint; in a partioned table
|
||||
@ -42,16 +42,20 @@ def migrate_event_data(apps, schema_editor):
|
||||
# TODO: do more generic search for pkey constraints
|
||||
# instead of hardcoding this one that applies to main_jobevent
|
||||
cursor.execute(
|
||||
f'ALTER TABLE {tblname}_old DROP CONSTRAINT {tblname}_pkey1'
|
||||
f'ALTER TABLE _unpartitioned_{tblname} DROP CONSTRAINT {tblname}_pkey1'
|
||||
)
|
||||
|
||||
# create parent table
|
||||
cursor.execute(
|
||||
f'CREATE TABLE {tblname} '
|
||||
f'(LIKE {tblname}_old INCLUDING ALL, job_created TIMESTAMP WITH TIME ZONE NOT NULL) '
|
||||
f'(LIKE _unpartitioned_{tblname} INCLUDING ALL, job_created TIMESTAMP WITH TIME ZONE NOT NULL) '
|
||||
f'PARTITION BY RANGE(job_created);'
|
||||
)
|
||||
|
||||
# let's go ahead and add and subtract a few indexes while we're here
|
||||
cursor.execute(f'CREATE INDEX {tblname}_modified_idx ON {tblname} (modified);')
|
||||
cursor.execute(f'DROP INDEX IF EXISTS {tblname}_job_id_brin_idx;')
|
||||
|
||||
# recreate primary key constraint
|
||||
cursor.execute(
|
||||
f'ALTER TABLE ONLY {tblname} '
|
||||
@ -61,33 +65,13 @@ def migrate_event_data(apps, schema_editor):
|
||||
current_time = now()
|
||||
|
||||
# .. as well as initial partition containing all existing events
|
||||
awx_epoch = datetime(2000, 1, 1, 0, 0) # .. so to speak
|
||||
create_partition(tblname, awx_epoch, current_time, 'old_events')
|
||||
epoch = datetime.utcfromtimestamp(0)
|
||||
create_partition(tblname, epoch, current_time, 'old_events')
|
||||
|
||||
# .. and first partition
|
||||
# .. which is a special case, as it only covers remainder of current hour
|
||||
create_partition(tblname, current_time)
|
||||
|
||||
# copy over all job events into partitioned table
|
||||
# TODO: bigint style migration (https://github.com/ansible/awx/issues/9257)
|
||||
tblname_to_uj_fk = {'main_jobevent': 'job_id',
|
||||
'main_inventoryupdateevent': 'inventory_update_id',
|
||||
'main_projectupdateevent': 'project_update_id',
|
||||
'main_adhoccommandevent': 'ad_hoc_command_id',
|
||||
'main_systemjobevent': 'system_job_id'}
|
||||
uj_fk_col = tblname_to_uj_fk[tblname]
|
||||
cursor.execute(
|
||||
f'INSERT INTO {tblname} '
|
||||
f'SELECT {tblname}_old.*, main_unifiedjob.created '
|
||||
f'FROM {tblname}_old '
|
||||
f'INNER JOIN main_unifiedjob ON {tblname}_old.{uj_fk_col} = main_unifiedjob.id;'
|
||||
)
|
||||
|
||||
# drop old table
|
||||
cursor.execute(
|
||||
f'DROP TABLE {tblname}_old'
|
||||
)
|
||||
|
||||
|
||||
class FakeAddField(migrations.AddField):
|
||||
|
||||
|
||||
@ -92,21 +92,10 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
|
||||
User.add_to_class('accessible_objects', user_accessible_objects)
|
||||
|
||||
|
||||
def enforce_bigint_pk_migration():
|
||||
#
|
||||
# NOTE: this function is not actually in use anymore,
|
||||
# but has been intentionally kept for historical purposes,
|
||||
# and to serve as an illustration if we ever need to perform
|
||||
# bulk modification/migration of event data in the future.
|
||||
#
|
||||
# see: https://github.com/ansible/awx/issues/6010
|
||||
# look at all the event tables and verify that they have been fully migrated
|
||||
# from the *old* int primary key table to the replacement bigint table
|
||||
# if not, attempt to migrate them in the background
|
||||
#
|
||||
def migrate_events_to_partitions():
|
||||
for tblname in ('main_jobevent', 'main_inventoryupdateevent', 'main_projectupdateevent', 'main_adhoccommandevent', 'main_systemjobevent'):
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_old_{tblname}',))
|
||||
cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_unpartitioned_{tblname}',))
|
||||
if bool(cursor.rowcount):
|
||||
from awx.main.tasks import migrate_legacy_event_data
|
||||
|
||||
|
||||
@ -310,9 +310,8 @@ class TaskManager:
|
||||
|
||||
def post_commit():
|
||||
if task.status != 'failed' and type(task) is not WorkflowJob:
|
||||
# Ensure that job event partition exists
|
||||
create_partition('main_jobevent')
|
||||
|
||||
# Before task is dispatched, ensure that job_event partitions exist
|
||||
create_partition(task.event_class._meta.db_table, start=task.created)
|
||||
task_cls = task._get_task_class()
|
||||
task_cls.apply_async(
|
||||
[task.pk],
|
||||
|
||||
@ -81,6 +81,7 @@ from awx.main.models import (
|
||||
AdHocCommandEvent,
|
||||
SystemJobEvent,
|
||||
build_safe_env,
|
||||
migrate_events_to_partitions
|
||||
)
|
||||
from awx.main.constants import ACTIVE_STATES
|
||||
from awx.main.exceptions import AwxTaskError, PostRunError
|
||||
@ -173,6 +174,12 @@ def dispatch_startup():
|
||||
cluster_node_heartbeat()
|
||||
Metrics().clear_values()
|
||||
|
||||
# at process startup, detect the need to migrate old event records to
|
||||
# partitions; at *some point* in the future, once certain versions of AWX
|
||||
# and Tower fall out of use/support, we can probably just _assume_ that
|
||||
# everybody has moved to partitions, and remove this code entirely
|
||||
migrate_events_to_partitions()
|
||||
|
||||
# Update Tower's rsyslog.conf file based on loggins settings in the db
|
||||
reconfigure_rsyslog()
|
||||
|
||||
@ -684,22 +691,16 @@ def update_host_smart_inventory_memberships():
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def migrate_legacy_event_data(tblname):
|
||||
#
|
||||
# NOTE: this function is not actually in use anymore,
|
||||
# but has been intentionally kept for historical purposes,
|
||||
# and to serve as an illustration if we ever need to perform
|
||||
# bulk modification/migration of event data in the future.
|
||||
#
|
||||
if 'event' not in tblname:
|
||||
return
|
||||
with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired:
|
||||
with advisory_lock(f'partition_migration_{tblname}', wait=False) as acquired:
|
||||
if acquired is False:
|
||||
return
|
||||
chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE
|
||||
|
||||
def _remaining():
|
||||
try:
|
||||
cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};')
|
||||
cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{tblname};')
|
||||
return cursor.fetchone()[0]
|
||||
except ProgrammingError:
|
||||
# the table is gone (migration is unnecessary)
|
||||
@ -709,19 +710,19 @@ def migrate_legacy_event_data(tblname):
|
||||
total_rows = _remaining()
|
||||
while total_rows:
|
||||
with transaction.atomic():
|
||||
cursor.execute(f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;')
|
||||
cursor.execute(f'''INSERT INTO {tblname} SELECT *, '1970-01-01' as job_created FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;''')
|
||||
last_insert_pk = cursor.fetchone()
|
||||
if last_insert_pk is None:
|
||||
# this means that the SELECT from the old table was
|
||||
# empty, and there was nothing to insert (so we're done)
|
||||
break
|
||||
last_insert_pk = last_insert_pk[0]
|
||||
cursor.execute(f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});')
|
||||
logger.warn(f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)')
|
||||
cursor.execute(f'DELETE FROM _unpartitioned_{tblname} WHERE id IN (SELECT id FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk});')
|
||||
logger.warn(f'migrated rows to partitioned {tblname} from _unpartitioned_{tblname}; # ({last_insert_pk} rows remaining)')
|
||||
|
||||
if _remaining() is None:
|
||||
cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}')
|
||||
logger.warn(f'{tblname} primary key migration to bigint has finished')
|
||||
cursor.execute(f'DROP TABLE IF EXISTS _unpartitioned_{tblname}')
|
||||
logger.warn(f'{tblname} migration to partitions has finished')
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user