Fix the cleanup_jobs management command

It previously depended on a private Django internal class that changed
with Django 3.1.

I've switched here instead to disabling the django-polymorphic
accessors to get the underlying UnifiedJob object for a Job, which due
to the way they implement those was resulting in N+1 behavior on
deletes.  This gets us back most of the way to the performance gains
we achieved with the custom collector class.  See
https://github.com/django-polymorphic/django-polymorphic/issues/198.
This commit is contained in:
Jeff Bradberry 2022-02-10 14:00:11 -05:00
parent 0500512c3c
commit 028f09002f
5 changed files with 62 additions and 410 deletions

View File

@ -11,13 +11,12 @@ import re
# Django
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connection
from django.db.models import Min, Max
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed
from django.utils.timezone import now
# AWX
from awx.main.models import Job, AdHocCommand, ProjectUpdate, InventoryUpdate, SystemJob, WorkflowJob, Notification
from awx.main.signals import disable_activity_stream, disable_computed_fields
from awx.main.utils.deletion import AWXCollector, pre_delete
def unified_job_class_to_event_table_name(job_class):
@ -80,7 +79,6 @@ class DeleteMeta:
).count()
def identify_excluded_partitions(self):
part_drop = {}
for pk, status, created in self.jobs_qs:
@ -94,7 +92,7 @@ class DeleteMeta:
# Note that parts_no_drop _may_ contain the names of partitions that don't exist
# This can happen when the cleanup of _unpartitioned_* logic leaves behind jobs with status pending, waiting, running. The find_jobs_to_delete() will
# pick these jobs up.
self.parts_no_drop = set([k for k, v in part_drop.items() if v is False])
self.parts_no_drop = {k for k, v in part_drop.items() if v is False}
def delete_jobs(self):
if not self.dry_run:
@ -116,7 +114,7 @@ class DeleteMeta:
partitions_dt = [p for p in partitions_dt if not None]
# convert datetime partition back to string partition
partitions_maybe_drop = set([dt_to_partition_name(tbl_name, dt) for dt in partitions_dt])
partitions_maybe_drop = {dt_to_partition_name(tbl_name, dt) for dt in partitions_dt}
# Do not drop partition if there is a job that will not be deleted pointing at it
self.parts_to_drop = partitions_maybe_drop - self.parts_no_drop
@ -164,6 +162,15 @@ class Command(BaseCommand):
parser.add_argument('--notifications', dest='only_notifications', action='store_true', default=False, help='Remove notifications')
parser.add_argument('--workflow-jobs', default=False, action='store_true', dest='only_workflow_jobs', help='Remove workflow jobs')
def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0]))
self.logger = logging.getLogger('awx.main.commands.cleanup_jobs')
self.logger.setLevel(log_levels.get(self.verbosity, 0))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.propagate = False
def cleanup(self, job_class):
delete_meta = DeleteMeta(self.logger, job_class, self.cutoff, self.dry_run)
skipped, deleted = delete_meta.delete()
@ -193,7 +200,7 @@ class Command(BaseCommand):
return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count)
def _cascade_delete_job_events(self, model, pk_list):
if len(pk_list) > 0:
if pk_list:
with connection.cursor() as cursor:
tblname = unified_job_class_to_event_table_name(model)
@ -202,37 +209,30 @@ class Command(BaseCommand):
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})")
def cleanup_jobs(self):
skipped, deleted = 0, 0
batch_size = 100000
batch_size = 1000000
# Hack to avoid doing N+1 queries as each item in the Job query set does
# an individual query to get the underlying UnifiedJob.
Job.polymorphic_super_sub_accessors_replaced = True
while True:
# get queryset for available jobs to remove
qs = Job.objects.filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running'])
# get pk list for the first N (batch_size) objects
pk_list = qs[0:batch_size].values_list('pk', flat=True)
# You cannot delete queries with sql LIMIT set, so we must
# create a new query from this pk_list
qs_batch = Job.objects.filter(pk__in=pk_list)
just_deleted = 0
if not self.dry_run:
skipped = (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count()
qs = Job.objects.select_related('unifiedjob_ptr').filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running'])
if self.dry_run:
deleted = qs.count()
return skipped, deleted
deleted = 0
info = qs.aggregate(min=Min('id'), max=Max('id'))
if info['min'] is not None:
for start in range(info['min'], info['max'] + 1, batch_size):
qs_batch = qs.filter(id__gte=start, id__lte=start + batch_size)
pk_list = qs_batch.values_list('id', flat=True)
_, results = qs_batch.delete()
deleted += results['main.Job']
self._cascade_delete_job_events(Job, pk_list)
del_query = pre_delete(qs_batch)
collector = AWXCollector(del_query.db)
collector.collect(del_query)
_, models_deleted = collector.delete()
if models_deleted:
just_deleted = models_deleted['main.Job']
deleted += just_deleted
else:
just_deleted = 0 # break from loop, this is dry run
deleted = qs.count()
if just_deleted == 0:
break
skipped += (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count()
return skipped, deleted
def cleanup_ad_hoc_commands(self):
@ -339,15 +339,6 @@ class Command(BaseCommand):
skipped += SystemJob.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0]))
self.logger = logging.getLogger('awx.main.commands.cleanup_jobs')
self.logger.setLevel(log_levels.get(self.verbosity, 0))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.propagate = False
def cleanup_workflow_jobs(self):
skipped, deleted = 0, 0
workflow_jobs = WorkflowJob.objects.filter(created__lt=self.cutoff)
@ -398,6 +389,7 @@ class Command(BaseCommand):
self.cutoff = now() - datetime.timedelta(days=self.days)
except OverflowError:
raise CommandError('--days specified is too large. Try something less than 99999 (about 270 years).')
model_names = ('jobs', 'ad_hoc_commands', 'project_updates', 'inventory_updates', 'management_jobs', 'workflow_jobs', 'notifications')
models_to_cleanup = set()
for m in model_names:
@ -405,18 +397,28 @@ class Command(BaseCommand):
models_to_cleanup.add(m)
if not models_to_cleanup:
models_to_cleanup.update(model_names)
with disable_activity_stream(), disable_computed_fields():
for m in model_names:
if m in models_to_cleanup:
skipped, deleted = getattr(self, 'cleanup_%s' % m)()
func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition
# Completely disconnect all signal handlers. This is very aggressive,
# but it will be ok since this command is run in its own process. The
# core of the logic is borrowed from Signal.disconnect().
for s in (pre_save, post_save, pre_delete, post_delete, m2m_changed):
with s.lock:
del s.receivers[:]
s.sender_receivers_cache.clear()
if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)
for m in model_names:
if m not in models_to_cleanup:
continue
skipped, deleted = getattr(self, 'cleanup_%s' % m)()
func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition
if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)

View File

@ -32,7 +32,7 @@ class ActivityStreamRegistrar(object):
post_save.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_create")
pre_save.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_update")
pre_delete.disconnect(dispatch_uid=str(self.__class__) + str(model) + "_delete")
self.models.pop(model)
self.models.remove(model)
for m2mfield in model._meta.many_to_many:
m2m_attr = getattr(model, m2mfield.name)

View File

@ -1,178 +0,0 @@
import pytest
from datetime import datetime, timedelta
from pytz import timezone
from collections import OrderedDict
from unittest import mock
from django.db.models.deletion import Collector, SET_NULL, CASCADE
from django.core.management import call_command
from awx.main.management.commands import cleanup_jobs
from awx.main.utils.deletion import AWXCollector
from awx.main.models import JobTemplate, User, Job, Notification, WorkflowJobNode, JobHostSummary
@pytest.fixture
def setup_environment(inventory, project, machine_credential, host, notification_template, label):
"""
Create old jobs and new jobs, with various other objects to hit the
related fields of Jobs. This makes sure on_delete() effects are tested
properly.
"""
old_jobs = []
new_jobs = []
days = 10
days_str = str(days)
jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project)
jt.credentials.add(machine_credential)
jt_user = User.objects.create(username='jobtemplateuser')
jt.execute_role.members.add(jt_user)
notification = Notification()
notification.notification_template = notification_template
notification.save()
for i in range(3):
# create jobs with current time
job1 = jt.create_job()
job1.created = datetime.now(tz=timezone('UTC'))
job1.save()
# sqlite does not support partitioning so we cannot test partition-based jobevent cleanup
# JobEvent.create_from_data(job_id=job1.pk, uuid='abc123', event='runner_on_start', stdout='a' * 1025).save()
new_jobs.append(job1)
# create jobs 10 days ago
job2 = jt.create_job()
job2.created = datetime.now(tz=timezone('UTC')) - timedelta(days=days)
job2.save()
job2.dependent_jobs.add(job1)
# JobEvent.create_from_data(job_id=job2.pk, uuid='abc123', event='runner_on_start', stdout='a' * 1025).save()
old_jobs.append(job2)
jt.last_job = job2
jt.current_job = job2
jt.save()
host.last_job = job2
host.save()
notification.unifiedjob_notifications.add(job2)
label.unifiedjob_labels.add(job2)
jn = WorkflowJobNode.objects.create(job=job2)
jn.save()
jh = JobHostSummary.objects.create(job=job2)
jh.save()
return (old_jobs, new_jobs, days_str)
# sqlite does not support table partitioning so we mock out the methods responsible for pruning
# job event partitions during the job cleanup task
# https://github.com/ansible/awx/issues/9039
@pytest.mark.django_db
@mock.patch.object(cleanup_jobs.DeleteMeta, 'identify_excluded_partitions', mock.MagicMock())
@mock.patch.object(cleanup_jobs.DeleteMeta, 'find_partitions_to_drop', mock.MagicMock())
@mock.patch.object(cleanup_jobs.DeleteMeta, 'drop_partitions', mock.MagicMock())
def test_cleanup_jobs(setup_environment):
(old_jobs, new_jobs, days_str) = setup_environment
# related_fields
related = [f for f in Job._meta.get_fields(include_hidden=True) if f.auto_created and not f.concrete and (f.one_to_one or f.one_to_many)]
job = old_jobs[-1] # last job
# gather related objects for job
related_should_be_removed = {}
related_should_be_null = {}
for r in related:
qs = r.related_model._base_manager.using('default').filter(**{"%s__in" % r.field.name: [job.pk]})
if qs.exists():
if r.field.remote_field.on_delete == CASCADE:
related_should_be_removed[qs.model] = set(qs.values_list('pk', flat=True))
if r.field.remote_field.on_delete == SET_NULL:
related_should_be_null[(qs.model, r.field.name)] = set(qs.values_list('pk', flat=True))
assert related_should_be_removed
assert related_should_be_null
call_command('cleanup_jobs', '--days', days_str)
# make sure old jobs are removed
assert not Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]).exists()
# make sure new jobs are untouched
assert len(new_jobs) == Job.objects.filter(pk__in=[obj.pk for obj in new_jobs]).count()
# make sure related objects are destroyed or set to NULL (none)
for model, values in related_should_be_removed.items():
assert not model.objects.filter(pk__in=values).exists()
for (model, fieldname), values in related_should_be_null.items():
for v in values:
assert not getattr(model.objects.get(pk=v), fieldname)
@pytest.mark.django_db
def test_awxcollector(setup_environment):
"""
Efforts to improve the performance of cleanup_jobs involved
sub-classing the django Collector class. This unit test will
check for parity between the django Collector and the modified
AWXCollector class. AWXCollector is used in cleanup_jobs to
bulk-delete old jobs from the database.
Specifically, Collector has four dictionaries to check:
.dependencies, .data, .fast_deletes, and .field_updates
These tests will convert each dictionary from AWXCollector
(after running .collect on jobs), from querysets to sets of
objects. The final result should be a dictionary that is
equivalent to django's Collector.
"""
(old_jobs, new_jobs, days_str) = setup_environment
collector = Collector('default')
collector.collect(old_jobs)
awx_col = AWXCollector('default')
# awx_col accepts a queryset as input
awx_col.collect(Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]))
# check that dependencies are the same
assert awx_col.dependencies == collector.dependencies
# check that objects to delete are the same
awx_del_dict = OrderedDict()
for model, instances in awx_col.data.items():
awx_del_dict.setdefault(model, set())
for inst in instances:
# .update() will put each object in a queryset into the set
awx_del_dict[model].update(inst)
assert awx_del_dict == collector.data
# check that field updates are the same
awx_del_dict = OrderedDict()
for model, instances_for_fieldvalues in awx_col.field_updates.items():
awx_del_dict.setdefault(model, {})
for (field, value), instances in instances_for_fieldvalues.items():
awx_del_dict[model].setdefault((field, value), set())
for inst in instances:
awx_del_dict[model][(field, value)].update(inst)
# collector field updates don't use the base (polymorphic parent) model, e.g.
# it will use JobTemplate instead of UnifiedJobTemplate. Therefore,
# we need to rebuild the dictionary and grab the model from the field
collector_del_dict = OrderedDict()
for model, instances_for_fieldvalues in collector.field_updates.items():
for (field, value), instances in instances_for_fieldvalues.items():
collector_del_dict.setdefault(field.model, {})
collector_del_dict[field.model][(field, value)] = collector.field_updates[model][(field, value)]
assert awx_del_dict == collector_del_dict
# check that fast deletes are the same
collector_fast_deletes = set()
for q in collector.fast_deletes:
collector_fast_deletes.update(q)
awx_col_fast_deletes = set()
for q in awx_col.fast_deletes:
awx_col_fast_deletes.update(q)
assert collector_fast_deletes == awx_col_fast_deletes

View File

@ -1,173 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models.deletion import (
DO_NOTHING,
Collector,
get_candidate_relations_to_delete,
)
from collections import Counter, OrderedDict
from django.db import transaction
from django.db.models import sql
def bulk_related_objects(field, objs, using):
# This overrides the method in django.contrib.contenttypes.fields.py
"""
Return all objects related to ``objs`` via this ``GenericRelation``.
"""
return field.remote_field.model._base_manager.db_manager(using).filter(
**{
"%s__pk"
% field.content_type_field_name: ContentType.objects.db_manager(using).get_for_model(field.model, for_concrete_model=field.for_concrete_model).pk,
"%s__in" % field.object_id_field_name: list(objs.values_list('pk', flat=True)),
}
)
def pre_delete(qs):
# taken from .delete method in django.db.models.query.py
assert qs.query.can_filter(), "Cannot use 'limit' or 'offset' with delete."
if qs._fields is not None:
raise TypeError("Cannot call delete() after .values() or .values_list()")
del_query = qs._chain()
# The delete is actually 2 queries - one to find related objects,
# and one to delete. Make sure that the discovery of related
# objects is performed on the same database as the deletion.
del_query._for_write = True
# Disable non-supported fields.
del_query.query.select_for_update = False
del_query.query.select_related = False
del_query.query.clear_ordering(force_empty=True)
return del_query
class AWXCollector(Collector):
def add(self, objs, source=None, nullable=False, reverse_dependency=False):
"""
Add 'objs' to the collection of objects to be deleted. If the call is
the result of a cascade, 'source' should be the model that caused it,
and 'nullable' should be set to True if the relation can be null.
Return a list of all objects that were not already collected.
"""
if not objs.exists():
return objs
model = objs.model
self.data.setdefault(model, [])
self.data[model].append(objs)
# Nullable relationships can be ignored -- they are nulled out before
# deleting, and therefore do not affect the order in which objects have
# to be deleted.
if source is not None and not nullable:
if reverse_dependency:
source, model = model, source
self.dependencies.setdefault(source._meta.concrete_model, set()).add(model._meta.concrete_model)
return objs
def add_field_update(self, field, value, objs):
"""
Schedule a field update. 'objs' must be a homogeneous iterable
collection of model instances (e.g. a QuerySet).
"""
if not objs.exists():
return
model = objs.model
self.field_updates.setdefault(model, {})
self.field_updates[model].setdefault((field, value), [])
self.field_updates[model][(field, value)].append(objs)
def collect(self, objs, source=None, nullable=False, collect_related=True, source_attr=None, reverse_dependency=False, keep_parents=False):
"""
Add 'objs' to the collection of objects to be deleted as well as all
parent instances. 'objs' must be a homogeneous iterable collection of
model instances (e.g. a QuerySet). If 'collect_related' is True,
related objects will be handled by their respective on_delete handler.
If the call is the result of a cascade, 'source' should be the model
that caused it and 'nullable' should be set to True, if the relation
can be null.
If 'reverse_dependency' is True, 'source' will be deleted before the
current model, rather than after. (Needed for cascading to parent
models, the one case in which the cascade follows the forwards
direction of an FK rather than the reverse direction.)
If 'keep_parents' is True, data of parent model's will be not deleted.
"""
if hasattr(objs, 'polymorphic_disabled'):
objs.polymorphic_disabled = True
if self.can_fast_delete(objs):
self.fast_deletes.append(objs)
return
new_objs = self.add(objs, source, nullable, reverse_dependency=reverse_dependency)
if not new_objs.exists():
return
model = new_objs.model
if not keep_parents:
# Recursively collect concrete model's parent models, but not their
# related objects. These will be found by meta.get_fields()
concrete_model = model._meta.concrete_model
for ptr in concrete_model._meta.parents.keys():
if ptr:
parent_objs = ptr.objects.filter(pk__in=new_objs.values_list('pk', flat=True))
self.collect(parent_objs, source=model, collect_related=False, reverse_dependency=True)
if collect_related:
parents = model._meta.parents
for related in get_candidate_relations_to_delete(model._meta):
# Preserve parent reverse relationships if keep_parents=True.
if keep_parents and related.model in parents:
continue
field = related.field
if field.remote_field.on_delete == DO_NOTHING:
continue
related_qs = self.related_objects(related, new_objs)
if self.can_fast_delete(related_qs, from_field=field):
self.fast_deletes.append(related_qs)
elif related_qs:
field.remote_field.on_delete(self, field, related_qs, self.using)
for field in model._meta.private_fields:
if hasattr(field, 'bulk_related_objects'):
# It's something like generic foreign key.
sub_objs = bulk_related_objects(field, new_objs, self.using)
self.collect(sub_objs, source=model, nullable=True)
def delete(self):
self.sort()
# collect pk_list before deletion (once things start to delete
# queries might not be able to retreive pk list)
del_dict = OrderedDict()
for model, instances in self.data.items():
del_dict.setdefault(model, [])
for inst in instances:
del_dict[model] += list(inst.values_list('pk', flat=True))
deleted_counter = Counter()
with transaction.atomic(using=self.using, savepoint=False):
# update fields
for model, instances_for_fieldvalues in self.field_updates.items():
for (field, value), instances in instances_for_fieldvalues.items():
for inst in instances:
query = sql.UpdateQuery(model)
query.update_batch(inst.values_list('pk', flat=True), {field.name: value}, self.using)
# fast deletes
for qs in self.fast_deletes:
count = qs._raw_delete(using=self.using)
deleted_counter[qs.model._meta.label] += count
# delete instances
for model, pk_list in del_dict.items():
query = sql.DeleteQuery(model)
count = query.delete_batch(pk_list, self.using)
deleted_counter[model._meta.label] += count
return sum(deleted_counter.values()), dict(deleted_counter)

View File

@ -318,8 +318,9 @@ if __name__ == '__main__':
for j_hour in range(24):
time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0)
created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
for k_id in created_job_ids:
generate_events(events, str(k_id), time_delta)
if events > 0:
for k_id in created_job_ids:
generate_events(events, str(k_id), time_delta)
print(datetime.datetime.utcnow().isoformat())
conn.close()