diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 9445d1c64f..e7784a81d8 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2,6 +2,7 @@ # All Rights Reserved. # Python +import datetime import dateutil import functools import html @@ -174,6 +175,9 @@ from awx.api.views.root import ( # noqa from awx.api.views.webhooks import WebhookKeyView, GithubWebhookReceiver, GitlabWebhookReceiver # noqa +EPOCH = datetime.datetime.utcfromtimestamp(0) + + logger = logging.getLogger('awx.api.views') @@ -887,7 +891,9 @@ class ProjectUpdateEventsList(SubListAPIView): job = self.get_parent_object() self.check_parent_access(job) qs = super(ProjectUpdateEventsList, self).get_queryset() - return qs.filter(job_created=job.created).order_by('start_line').all() + return qs.filter( + job_created__in=(job.created, EPOCH) + ).order_by('start_line').all() class SystemJobEventsList(SubListAPIView): @@ -905,7 +911,9 @@ class SystemJobEventsList(SubListAPIView): def get_queryset(self): job = self.get_parent_object() self.check_parent_access(job) - qs = job.system_job_events.select_related('host').filter(job_created=job.created).order_by('start_line') + qs = job.system_job_events.select_related('host').filter( + job_created__in=(job.created, EPOCH) + ).order_by('start_line') return qs.all() class ProjectUpdateCancel(RetrieveAPIView): @@ -3809,7 +3817,7 @@ class HostJobEventsList(BaseJobEventsList): def get_queryset(self): parent_obj = self.get_parent_object() self.check_parent_access(parent_obj) - qs = self.request.user.get_queryset(self.model).filter(host=parent_obj, job_created=parent_obj.created) + qs = self.request.user.get_queryset(self.model).filter(host=parent_obj) return qs @@ -3825,7 +3833,9 @@ class JobJobEventsList(BaseJobEventsList): def get_queryset(self): job = self.get_parent_object() self.check_parent_access(job) - qs = job.job_events.filter(job_created=job.created).select_related('host').order_by('start_line') + qs = job.job_events.filter( + job_created__in=(job.created, EPOCH) + ).select_related('host').order_by('start_line') return qs.all() @@ -4008,7 +4018,9 @@ class BaseAdHocCommandEventsList(NoTruncateMixin, SubListAPIView): def get_queryset(self): job = self.get_parent_object() self.check_parent_access(job) - qs = job.ad_hoc_command_events.select_related('host').filter(job_created=job.created).order_by('start_line') + qs = job.ad_hoc_command_events.select_related('host').filter( + job_created__in=(job.created, EPOCH) + ).order_by('start_line') return qs.all() diff --git a/awx/main/migrations/0124_event_partitions.py b/awx/main/migrations/0124_event_partitions.py index 138424c816..214aacfce6 100644 --- a/awx/main/migrations/0124_event_partitions.py +++ b/awx/main/migrations/0124_event_partitions.py @@ -30,11 +30,11 @@ def migrate_event_data(apps, schema_editor): 'main_systemjobevent' ): with connection.cursor() as cursor: - # mark existing table as *_old; + # mark existing table as _unpartitioned_* # we will drop this table after its data # has been moved over cursor.execute( - f'ALTER TABLE {tblname} RENAME TO {tblname}_old' + f'ALTER TABLE {tblname} RENAME TO _unpartitioned_{tblname}' ) # drop primary key constraint; in a partioned table @@ -42,16 +42,20 @@ def migrate_event_data(apps, schema_editor): # TODO: do more generic search for pkey constraints # instead of hardcoding this one that applies to main_jobevent cursor.execute( - f'ALTER TABLE {tblname}_old DROP CONSTRAINT {tblname}_pkey1' + f'ALTER TABLE _unpartitioned_{tblname} DROP CONSTRAINT {tblname}_pkey1' ) # create parent table cursor.execute( f'CREATE TABLE {tblname} ' - f'(LIKE {tblname}_old INCLUDING ALL, job_created TIMESTAMP WITH TIME ZONE NOT NULL) ' + f'(LIKE _unpartitioned_{tblname} INCLUDING ALL, job_created TIMESTAMP WITH TIME ZONE NOT NULL) ' f'PARTITION BY RANGE(job_created);' ) + # let's go ahead and add and subtract a few indexes while we're here + cursor.execute(f'CREATE INDEX {tblname}_modified_idx ON {tblname} (modified);') + cursor.execute(f'DROP INDEX IF EXISTS {tblname}_job_id_brin_idx;') + # recreate primary key constraint cursor.execute( f'ALTER TABLE ONLY {tblname} ' @@ -61,33 +65,13 @@ def migrate_event_data(apps, schema_editor): current_time = now() # .. as well as initial partition containing all existing events - awx_epoch = datetime(2000, 1, 1, 0, 0) # .. so to speak - create_partition(tblname, awx_epoch, current_time, 'old_events') + epoch = datetime.utcfromtimestamp(0) + create_partition(tblname, epoch, current_time, 'old_events') # .. and first partition # .. which is a special case, as it only covers remainder of current hour create_partition(tblname, current_time) - # copy over all job events into partitioned table - # TODO: bigint style migration (https://github.com/ansible/awx/issues/9257) - tblname_to_uj_fk = {'main_jobevent': 'job_id', - 'main_inventoryupdateevent': 'inventory_update_id', - 'main_projectupdateevent': 'project_update_id', - 'main_adhoccommandevent': 'ad_hoc_command_id', - 'main_systemjobevent': 'system_job_id'} - uj_fk_col = tblname_to_uj_fk[tblname] - cursor.execute( - f'INSERT INTO {tblname} ' - f'SELECT {tblname}_old.*, main_unifiedjob.created ' - f'FROM {tblname}_old ' - f'INNER JOIN main_unifiedjob ON {tblname}_old.{uj_fk_col} = main_unifiedjob.id;' - ) - - # drop old table - cursor.execute( - f'DROP TABLE {tblname}_old' - ) - class FakeAddField(migrations.AddField): diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 479cecb988..f67e192f0a 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -92,21 +92,10 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors) User.add_to_class('accessible_objects', user_accessible_objects) -def enforce_bigint_pk_migration(): - # - # NOTE: this function is not actually in use anymore, - # but has been intentionally kept for historical purposes, - # and to serve as an illustration if we ever need to perform - # bulk modification/migration of event data in the future. - # - # see: https://github.com/ansible/awx/issues/6010 - # look at all the event tables and verify that they have been fully migrated - # from the *old* int primary key table to the replacement bigint table - # if not, attempt to migrate them in the background - # +def migrate_events_to_partitions(): for tblname in ('main_jobevent', 'main_inventoryupdateevent', 'main_projectupdateevent', 'main_adhoccommandevent', 'main_systemjobevent'): with connection.cursor() as cursor: - cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_old_{tblname}',)) + cursor.execute('SELECT 1 FROM information_schema.tables WHERE table_name=%s', (f'_unpartitioned_{tblname}',)) if bool(cursor.rowcount): from awx.main.tasks import migrate_legacy_event_data diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 325710b11a..65dffc457c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -310,9 +310,8 @@ class TaskManager: def post_commit(): if task.status != 'failed' and type(task) is not WorkflowJob: - # Ensure that job event partition exists - create_partition('main_jobevent') - + # Before task is dispatched, ensure that job_event partitions exist + create_partition(task.event_class._meta.db_table, start=task.created) task_cls = task._get_task_class() task_cls.apply_async( [task.pk], diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 13cdfdbe37..84411b7c96 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -81,6 +81,7 @@ from awx.main.models import ( AdHocCommandEvent, SystemJobEvent, build_safe_env, + migrate_events_to_partitions ) from awx.main.constants import ACTIVE_STATES from awx.main.exceptions import AwxTaskError, PostRunError @@ -173,6 +174,12 @@ def dispatch_startup(): cluster_node_heartbeat() Metrics().clear_values() + # at process startup, detect the need to migrate old event records to + # partitions; at *some point* in the future, once certain versions of AWX + # and Tower fall out of use/support, we can probably just _assume_ that + # everybody has moved to partitions, and remove this code entirely + migrate_events_to_partitions() + # Update Tower's rsyslog.conf file based on loggins settings in the db reconfigure_rsyslog() @@ -684,22 +691,16 @@ def update_host_smart_inventory_memberships(): @task(queue=get_local_queuename) def migrate_legacy_event_data(tblname): - # - # NOTE: this function is not actually in use anymore, - # but has been intentionally kept for historical purposes, - # and to serve as an illustration if we ever need to perform - # bulk modification/migration of event data in the future. - # if 'event' not in tblname: return - with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired: + with advisory_lock(f'partition_migration_{tblname}', wait=False) as acquired: if acquired is False: return chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE def _remaining(): try: - cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};') + cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{tblname};') return cursor.fetchone()[0] except ProgrammingError: # the table is gone (migration is unnecessary) @@ -709,19 +710,19 @@ def migrate_legacy_event_data(tblname): total_rows = _remaining() while total_rows: with transaction.atomic(): - cursor.execute(f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;') + cursor.execute(f'''INSERT INTO {tblname} SELECT *, '1970-01-01' as job_created FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;''') last_insert_pk = cursor.fetchone() if last_insert_pk is None: # this means that the SELECT from the old table was # empty, and there was nothing to insert (so we're done) break last_insert_pk = last_insert_pk[0] - cursor.execute(f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});') - logger.warn(f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)') + cursor.execute(f'DELETE FROM _unpartitioned_{tblname} WHERE id IN (SELECT id FROM _unpartitioned_{tblname} ORDER BY id DESC LIMIT {chunk});') + logger.warn(f'migrated rows to partitioned {tblname} from _unpartitioned_{tblname}; # ({last_insert_pk} rows remaining)') if _remaining() is None: - cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}') - logger.warn(f'{tblname} primary key migration to bigint has finished') + cursor.execute(f'DROP TABLE IF EXISTS _unpartitioned_{tblname}') + logger.warn(f'{tblname} migration to partitions has finished') @task(queue=get_local_queuename)