diff --git a/awx/main/management/commands/cleanup_jobs.py b/awx/main/management/commands/cleanup_jobs.py index c9c508c6e8..dec5ca6e50 100644 --- a/awx/main/management/commands/cleanup_jobs.py +++ b/awx/main/management/commands/cleanup_jobs.py @@ -11,13 +11,12 @@ import re # Django from django.core.management.base import BaseCommand, CommandError from django.db import transaction, connection +from django.db.models import Min, Max +from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed from django.utils.timezone import now # AWX from awx.main.models import Job, AdHocCommand, ProjectUpdate, InventoryUpdate, SystemJob, WorkflowJob, Notification -from awx.main.signals import disable_activity_stream, disable_computed_fields - -from awx.main.utils.deletion import AWXCollector, pre_delete def unified_job_class_to_event_table_name(job_class): @@ -80,7 +79,6 @@ class DeleteMeta: ).count() def identify_excluded_partitions(self): - part_drop = {} for pk, status, created in self.jobs_qs: @@ -94,7 +92,7 @@ class DeleteMeta: # Note that parts_no_drop _may_ contain the names of partitions that don't exist # This can happen when the cleanup of _unpartitioned_* logic leaves behind jobs with status pending, waiting, running. The find_jobs_to_delete() will # pick these jobs up. - self.parts_no_drop = set([k for k, v in part_drop.items() if v is False]) + self.parts_no_drop = {k for k, v in part_drop.items() if v is False} def delete_jobs(self): if not self.dry_run: @@ -116,7 +114,7 @@ class DeleteMeta: partitions_dt = [p for p in partitions_dt if not None] # convert datetime partition back to string partition - partitions_maybe_drop = set([dt_to_partition_name(tbl_name, dt) for dt in partitions_dt]) + partitions_maybe_drop = {dt_to_partition_name(tbl_name, dt) for dt in partitions_dt} # Do not drop partition if there is a job that will not be deleted pointing at it self.parts_to_drop = partitions_maybe_drop - self.parts_no_drop @@ -164,6 +162,15 @@ class Command(BaseCommand): parser.add_argument('--notifications', dest='only_notifications', action='store_true', default=False, help='Remove notifications') parser.add_argument('--workflow-jobs', default=False, action='store_true', dest='only_workflow_jobs', help='Remove workflow jobs') + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.cleanup_jobs') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + def cleanup(self, job_class): delete_meta = DeleteMeta(self.logger, job_class, self.cutoff, self.dry_run) skipped, deleted = delete_meta.delete() @@ -193,7 +200,7 @@ class Command(BaseCommand): return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count) def _cascade_delete_job_events(self, model, pk_list): - if len(pk_list) > 0: + if pk_list: with connection.cursor() as cursor: tblname = unified_job_class_to_event_table_name(model) @@ -202,37 +209,30 @@ class Command(BaseCommand): cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})") def cleanup_jobs(self): - skipped, deleted = 0, 0 + batch_size = 100000 - batch_size = 1000000 + # Hack to avoid doing N+1 queries as each item in the Job query set does + # an individual query to get the underlying UnifiedJob. + Job.polymorphic_super_sub_accessors_replaced = True - while True: - # get queryset for available jobs to remove - qs = Job.objects.filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running']) - # get pk list for the first N (batch_size) objects - pk_list = qs[0:batch_size].values_list('pk', flat=True) - # You cannot delete queries with sql LIMIT set, so we must - # create a new query from this pk_list - qs_batch = Job.objects.filter(pk__in=pk_list) - just_deleted = 0 - if not self.dry_run: + skipped = (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count() + + qs = Job.objects.select_related('unifiedjob_ptr').filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running']) + if self.dry_run: + deleted = qs.count() + return skipped, deleted + + deleted = 0 + info = qs.aggregate(min=Min('id'), max=Max('id')) + if info['min'] is not None: + for start in range(info['min'], info['max'] + 1, batch_size): + qs_batch = qs.filter(id__gte=start, id__lte=start + batch_size) + pk_list = qs_batch.values_list('id', flat=True) + + _, results = qs_batch.delete() + deleted += results['main.Job'] self._cascade_delete_job_events(Job, pk_list) - del_query = pre_delete(qs_batch) - collector = AWXCollector(del_query.db) - collector.collect(del_query) - _, models_deleted = collector.delete() - if models_deleted: - just_deleted = models_deleted['main.Job'] - deleted += just_deleted - else: - just_deleted = 0 # break from loop, this is dry run - deleted = qs.count() - - if just_deleted == 0: - break - - skipped += (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count() return skipped, deleted def cleanup_ad_hoc_commands(self): @@ -339,15 +339,6 @@ class Command(BaseCommand): skipped += SystemJob.objects.filter(created__gte=self.cutoff).count() return skipped, deleted - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.cleanup_jobs') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False - def cleanup_workflow_jobs(self): skipped, deleted = 0, 0 workflow_jobs = WorkflowJob.objects.filter(created__lt=self.cutoff) @@ -398,6 +389,7 @@ class Command(BaseCommand): self.cutoff = now() - datetime.timedelta(days=self.days) except OverflowError: raise CommandError('--days specified is too large. Try something less than 99999 (about 270 years).') + model_names = ('jobs', 'ad_hoc_commands', 'project_updates', 'inventory_updates', 'management_jobs', 'workflow_jobs', 'notifications') models_to_cleanup = set() for m in model_names: @@ -405,18 +397,28 @@ class Command(BaseCommand): models_to_cleanup.add(m) if not models_to_cleanup: models_to_cleanup.update(model_names) - with disable_activity_stream(), disable_computed_fields(): - for m in model_names: - if m in models_to_cleanup: - skipped, deleted = getattr(self, 'cleanup_%s' % m)() - func = getattr(self, 'cleanup_%s_partition' % m, None) - if func: - skipped_partition, deleted_partition = func() - skipped += skipped_partition - deleted += deleted_partition + # Completely disconnect all signal handlers. This is very aggressive, + # but it will be ok since this command is run in its own process. The + # core of the logic is borrowed from Signal.disconnect(). + for s in (pre_save, post_save, pre_delete, post_delete, m2m_changed): + with s.lock: + del s.receivers[:] + s.sender_receivers_cache.clear() - if self.dry_run: - self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped) - else: - self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped) + for m in model_names: + if m not in models_to_cleanup: + continue + + skipped, deleted = getattr(self, 'cleanup_%s' % m)() + + func = getattr(self, 'cleanup_%s_partition' % m, None) + if func: + skipped_partition, deleted_partition = func() + skipped += skipped_partition + deleted += deleted_partition + + if self.dry_run: + self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped) + else: + self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped) diff --git a/awx/main/registrar.py b/awx/main/registrar.py index 07e721a953..31133f936b 100644 --- a/awx/main/registrar.py +++ b/awx/main/registrar.py @@ -32,7 +32,7 @@ class ActivityStreamRegistrar(object): post_save.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_create") pre_save.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_update") pre_delete.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_delete") - self.models.pop(model) + self.models.remove(model) for m2mfield in model._meta.many_to_many: m2m_attr = getattr(model, m2mfield.name) diff --git a/awx/main/tests/functional/commands/test_cleanup_jobs.py b/awx/main/tests/functional/commands/test_cleanup_jobs.py deleted file mode 100644 index 612895559a..0000000000 --- a/awx/main/tests/functional/commands/test_cleanup_jobs.py +++ /dev/null @@ -1,178 +0,0 @@ -import pytest -from datetime import datetime, timedelta -from pytz import timezone -from collections import OrderedDict -from unittest import mock - -from django.db.models.deletion import Collector, SET_NULL, CASCADE -from django.core.management import call_command - -from awx.main.management.commands import cleanup_jobs -from awx.main.utils.deletion import AWXCollector -from awx.main.models import JobTemplate, User, Job, Notification, WorkflowJobNode, JobHostSummary - - -@pytest.fixture -def setup_environment(inventory, project, machine_credential, host, notification_template, label): - """ - Create old jobs and new jobs, with various other objects to hit the - related fields of Jobs. This makes sure on_delete() effects are tested - properly. - """ - old_jobs = [] - new_jobs = [] - days = 10 - days_str = str(days) - - jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project) - jt.credentials.add(machine_credential) - jt_user = User.objects.create(username='jobtemplateuser') - jt.execute_role.members.add(jt_user) - - notification = Notification() - notification.notification_template = notification_template - notification.save() - - for i in range(3): - # create jobs with current time - job1 = jt.create_job() - job1.created = datetime.now(tz=timezone('UTC')) - job1.save() - # sqlite does not support partitioning so we cannot test partition-based jobevent cleanup - # JobEvent.create_from_data(job_id=job1.pk, uuid='abc123', event='runner_on_start', stdout='a' * 1025).save() - new_jobs.append(job1) - - # create jobs 10 days ago - job2 = jt.create_job() - job2.created = datetime.now(tz=timezone('UTC')) - timedelta(days=days) - job2.save() - job2.dependent_jobs.add(job1) - # JobEvent.create_from_data(job_id=job2.pk, uuid='abc123', event='runner_on_start', stdout='a' * 1025).save() - old_jobs.append(job2) - - jt.last_job = job2 - jt.current_job = job2 - jt.save() - host.last_job = job2 - host.save() - notification.unifiedjob_notifications.add(job2) - label.unifiedjob_labels.add(job2) - jn = WorkflowJobNode.objects.create(job=job2) - jn.save() - jh = JobHostSummary.objects.create(job=job2) - jh.save() - - return (old_jobs, new_jobs, days_str) - - -# sqlite does not support table partitioning so we mock out the methods responsible for pruning -# job event partitions during the job cleanup task -# https://github.com/ansible/awx/issues/9039 -@pytest.mark.django_db -@mock.patch.object(cleanup_jobs.DeleteMeta, 'identify_excluded_partitions', mock.MagicMock()) -@mock.patch.object(cleanup_jobs.DeleteMeta, 'find_partitions_to_drop', mock.MagicMock()) -@mock.patch.object(cleanup_jobs.DeleteMeta, 'drop_partitions', mock.MagicMock()) -def test_cleanup_jobs(setup_environment): - (old_jobs, new_jobs, days_str) = setup_environment - - # related_fields - related = [f for f in Job._meta.get_fields(include_hidden=True) if f.auto_created and not f.concrete and (f.one_to_one or f.one_to_many)] - - job = old_jobs[-1] # last job - - # gather related objects for job - related_should_be_removed = {} - related_should_be_null = {} - for r in related: - qs = r.related_model._base_manager.using('default').filter(**{"%s__in" % r.field.name: [job.pk]}) - if qs.exists(): - if r.field.remote_field.on_delete == CASCADE: - related_should_be_removed[qs.model] = set(qs.values_list('pk', flat=True)) - if r.field.remote_field.on_delete == SET_NULL: - related_should_be_null[(qs.model, r.field.name)] = set(qs.values_list('pk', flat=True)) - - assert related_should_be_removed - assert related_should_be_null - - call_command('cleanup_jobs', '--days', days_str) - # make sure old jobs are removed - assert not Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]).exists() - - # make sure new jobs are untouched - assert len(new_jobs) == Job.objects.filter(pk__in=[obj.pk for obj in new_jobs]).count() - - # make sure related objects are destroyed or set to NULL (none) - for model, values in related_should_be_removed.items(): - assert not model.objects.filter(pk__in=values).exists() - - for (model, fieldname), values in related_should_be_null.items(): - for v in values: - assert not getattr(model.objects.get(pk=v), fieldname) - - -@pytest.mark.django_db -def test_awxcollector(setup_environment): - """ - Efforts to improve the performance of cleanup_jobs involved - sub-classing the django Collector class. This unit test will - check for parity between the django Collector and the modified - AWXCollector class. AWXCollector is used in cleanup_jobs to - bulk-delete old jobs from the database. - - Specifically, Collector has four dictionaries to check: - .dependencies, .data, .fast_deletes, and .field_updates - - These tests will convert each dictionary from AWXCollector - (after running .collect on jobs), from querysets to sets of - objects. The final result should be a dictionary that is - equivalent to django's Collector. - """ - - (old_jobs, new_jobs, days_str) = setup_environment - collector = Collector('default') - collector.collect(old_jobs) - - awx_col = AWXCollector('default') - # awx_col accepts a queryset as input - awx_col.collect(Job.objects.filter(pk__in=[obj.pk for obj in old_jobs])) - - # check that dependencies are the same - assert awx_col.dependencies == collector.dependencies - - # check that objects to delete are the same - awx_del_dict = OrderedDict() - for model, instances in awx_col.data.items(): - awx_del_dict.setdefault(model, set()) - for inst in instances: - # .update() will put each object in a queryset into the set - awx_del_dict[model].update(inst) - assert awx_del_dict == collector.data - - # check that field updates are the same - awx_del_dict = OrderedDict() - for model, instances_for_fieldvalues in awx_col.field_updates.items(): - awx_del_dict.setdefault(model, {}) - for (field, value), instances in instances_for_fieldvalues.items(): - awx_del_dict[model].setdefault((field, value), set()) - for inst in instances: - awx_del_dict[model][(field, value)].update(inst) - - # collector field updates don't use the base (polymorphic parent) model, e.g. - # it will use JobTemplate instead of UnifiedJobTemplate. Therefore, - # we need to rebuild the dictionary and grab the model from the field - collector_del_dict = OrderedDict() - for model, instances_for_fieldvalues in collector.field_updates.items(): - for (field, value), instances in instances_for_fieldvalues.items(): - collector_del_dict.setdefault(field.model, {}) - collector_del_dict[field.model][(field, value)] = collector.field_updates[model][(field, value)] - assert awx_del_dict == collector_del_dict - - # check that fast deletes are the same - collector_fast_deletes = set() - for q in collector.fast_deletes: - collector_fast_deletes.update(q) - - awx_col_fast_deletes = set() - for q in awx_col.fast_deletes: - awx_col_fast_deletes.update(q) - assert collector_fast_deletes == awx_col_fast_deletes diff --git a/awx/main/utils/deletion.py b/awx/main/utils/deletion.py deleted file mode 100644 index d17bc0b710..0000000000 --- a/awx/main/utils/deletion.py +++ /dev/null @@ -1,173 +0,0 @@ -from django.contrib.contenttypes.models import ContentType -from django.db.models.deletion import ( - DO_NOTHING, - Collector, - get_candidate_relations_to_delete, -) -from collections import Counter, OrderedDict -from django.db import transaction -from django.db.models import sql - - -def bulk_related_objects(field, objs, using): - # This overrides the method in django.contrib.contenttypes.fields.py - """ - Return all objects related to ``objs`` via this ``GenericRelation``. - """ - return field.remote_field.model._base_manager.db_manager(using).filter( - **{ - "%s__pk" - % field.content_type_field_name: ContentType.objects.db_manager(using).get_for_model(field.model, for_concrete_model=field.for_concrete_model).pk, - "%s__in" % field.object_id_field_name: list(objs.values_list('pk', flat=True)), - } - ) - - -def pre_delete(qs): - # taken from .delete method in django.db.models.query.py - assert qs.query.can_filter(), "Cannot use 'limit' or 'offset' with delete." - - if qs._fields is not None: - raise TypeError("Cannot call delete() after .values() or .values_list()") - - del_query = qs._chain() - - # The delete is actually 2 queries - one to find related objects, - # and one to delete. Make sure that the discovery of related - # objects is performed on the same database as the deletion. - del_query._for_write = True - - # Disable non-supported fields. - del_query.query.select_for_update = False - del_query.query.select_related = False - del_query.query.clear_ordering(force_empty=True) - return del_query - - -class AWXCollector(Collector): - def add(self, objs, source=None, nullable=False, reverse_dependency=False): - """ - Add 'objs' to the collection of objects to be deleted. If the call is - the result of a cascade, 'source' should be the model that caused it, - and 'nullable' should be set to True if the relation can be null. - - Return a list of all objects that were not already collected. - """ - if not objs.exists(): - return objs - model = objs.model - self.data.setdefault(model, []) - self.data[model].append(objs) - # Nullable relationships can be ignored -- they are nulled out before - # deleting, and therefore do not affect the order in which objects have - # to be deleted. - if source is not None and not nullable: - if reverse_dependency: - source, model = model, source - self.dependencies.setdefault(source._meta.concrete_model, set()).add(model._meta.concrete_model) - return objs - - def add_field_update(self, field, value, objs): - """ - Schedule a field update. 'objs' must be a homogeneous iterable - collection of model instances (e.g. a QuerySet). - """ - if not objs.exists(): - return - model = objs.model - self.field_updates.setdefault(model, {}) - self.field_updates[model].setdefault((field, value), []) - self.field_updates[model][(field, value)].append(objs) - - def collect(self, objs, source=None, nullable=False, collect_related=True, source_attr=None, reverse_dependency=False, keep_parents=False): - """ - Add 'objs' to the collection of objects to be deleted as well as all - parent instances. 'objs' must be a homogeneous iterable collection of - model instances (e.g. a QuerySet). If 'collect_related' is True, - related objects will be handled by their respective on_delete handler. - - If the call is the result of a cascade, 'source' should be the model - that caused it and 'nullable' should be set to True, if the relation - can be null. - - If 'reverse_dependency' is True, 'source' will be deleted before the - current model, rather than after. (Needed for cascading to parent - models, the one case in which the cascade follows the forwards - direction of an FK rather than the reverse direction.) - - If 'keep_parents' is True, data of parent model's will be not deleted. - """ - - if hasattr(objs, 'polymorphic_disabled'): - objs.polymorphic_disabled = True - - if self.can_fast_delete(objs): - self.fast_deletes.append(objs) - return - new_objs = self.add(objs, source, nullable, reverse_dependency=reverse_dependency) - if not new_objs.exists(): - return - - model = new_objs.model - - if not keep_parents: - # Recursively collect concrete model's parent models, but not their - # related objects. These will be found by meta.get_fields() - concrete_model = model._meta.concrete_model - for ptr in concrete_model._meta.parents.keys(): - if ptr: - parent_objs = ptr.objects.filter(pk__in=new_objs.values_list('pk', flat=True)) - self.collect(parent_objs, source=model, collect_related=False, reverse_dependency=True) - if collect_related: - parents = model._meta.parents - for related in get_candidate_relations_to_delete(model._meta): - # Preserve parent reverse relationships if keep_parents=True. - if keep_parents and related.model in parents: - continue - field = related.field - if field.remote_field.on_delete == DO_NOTHING: - continue - related_qs = self.related_objects(related, new_objs) - if self.can_fast_delete(related_qs, from_field=field): - self.fast_deletes.append(related_qs) - elif related_qs: - field.remote_field.on_delete(self, field, related_qs, self.using) - for field in model._meta.private_fields: - if hasattr(field, 'bulk_related_objects'): - # It's something like generic foreign key. - sub_objs = bulk_related_objects(field, new_objs, self.using) - self.collect(sub_objs, source=model, nullable=True) - - def delete(self): - self.sort() - - # collect pk_list before deletion (once things start to delete - # queries might not be able to retreive pk list) - del_dict = OrderedDict() - for model, instances in self.data.items(): - del_dict.setdefault(model, []) - for inst in instances: - del_dict[model] += list(inst.values_list('pk', flat=True)) - - deleted_counter = Counter() - - with transaction.atomic(using=self.using, savepoint=False): - - # update fields - for model, instances_for_fieldvalues in self.field_updates.items(): - for (field, value), instances in instances_for_fieldvalues.items(): - for inst in instances: - query = sql.UpdateQuery(model) - query.update_batch(inst.values_list('pk', flat=True), {field.name: value}, self.using) - # fast deletes - for qs in self.fast_deletes: - count = qs._raw_delete(using=self.using) - deleted_counter[qs.model._meta.label] += count - - # delete instances - for model, pk_list in del_dict.items(): - query = sql.DeleteQuery(model) - count = query.delete_batch(pk_list, self.using) - deleted_counter[model._meta.label] += count - - return sum(deleted_counter.values()), dict(deleted_counter) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index cd5930315a..2eeeb5da7b 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -318,8 +318,9 @@ if __name__ == '__main__': for j_hour in range(24): time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0) created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) - for k_id in created_job_ids: - generate_events(events, str(k_id), time_delta) + if events > 0: + for k_id in created_job_ids: + generate_events(events, str(k_id), time_delta) print(datetime.datetime.utcnow().isoformat()) conn.close()