mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 17:37:37 -02:30
Delete jobs without loading objects first
The commit is intended to speed up the cleanup_jobs command in awx. Old methods takes 7+ hours to delete 1 million old jobs. New method takes around 6 minutes. Leverages a sub-classed Collector, called AWXCollector, that does not load in objects before deleting them. Instead querysets, which are lazily evaluated, are used in places where Collector normally keeps a list of objects. Finally, a couple of tests to ensure parity between old Collector and AWXCollector. That is, any object that is updated/removed from the database using Collector should be have identical operations using AWXCollector. tower issue 1103
This commit is contained in:
@@ -21,6 +21,8 @@ from awx.main.signals import (
|
|||||||
disable_computed_fields
|
disable_computed_fields
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from awx.main.management.commands.deletion import AWXCollector, pre_delete
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
'''
|
'''
|
||||||
@@ -57,27 +59,37 @@ class Command(BaseCommand):
|
|||||||
action='store_true', dest='only_workflow_jobs',
|
action='store_true', dest='only_workflow_jobs',
|
||||||
help='Remove workflow jobs')
|
help='Remove workflow jobs')
|
||||||
|
|
||||||
def cleanup_jobs(self):
|
|
||||||
#jobs_qs = Job.objects.exclude(status__in=('pending', 'running'))
|
|
||||||
#jobs_qs = jobs_qs.filter(created__lte=self.cutoff)
|
|
||||||
skipped, deleted = 0, 0
|
|
||||||
jobs = Job.objects.filter(created__lt=self.cutoff)
|
|
||||||
for job in jobs.iterator():
|
|
||||||
job_display = '"%s" (%d host summaries, %d events)' % \
|
|
||||||
(str(job),
|
|
||||||
job.job_host_summaries.count(), job.job_events.count())
|
|
||||||
if job.status in ('pending', 'waiting', 'running'):
|
|
||||||
action_text = 'would skip' if self.dry_run else 'skipping'
|
|
||||||
self.logger.debug('%s %s job %s', action_text, job.status, job_display)
|
|
||||||
skipped += 1
|
|
||||||
else:
|
|
||||||
action_text = 'would delete' if self.dry_run else 'deleting'
|
|
||||||
self.logger.info('%s %s', action_text, job_display)
|
|
||||||
if not self.dry_run:
|
|
||||||
job.delete()
|
|
||||||
deleted += 1
|
|
||||||
|
|
||||||
skipped += Job.objects.filter(created__gte=self.cutoff).count()
|
def cleanup_jobs(self):
|
||||||
|
skipped, deleted = 0, 0
|
||||||
|
|
||||||
|
batch_size = 1000000
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# get queryset for available jobs to remove
|
||||||
|
qs = Job.objects.filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running'])
|
||||||
|
# get pk list for the first N (batch_size) objects
|
||||||
|
pk_list = qs[0:batch_size].values_list('pk')
|
||||||
|
# You cannot delete queries with sql LIMIT set, so we must
|
||||||
|
# create a new query from this pk_list
|
||||||
|
qs_batch = Job.objects.filter(pk__in=pk_list)
|
||||||
|
just_deleted = 0
|
||||||
|
if not self.dry_run:
|
||||||
|
del_query = pre_delete(qs_batch)
|
||||||
|
collector = AWXCollector(del_query.db)
|
||||||
|
collector.collect(del_query)
|
||||||
|
_, models_deleted = collector.delete()
|
||||||
|
if models_deleted:
|
||||||
|
just_deleted = models_deleted['main.Job']
|
||||||
|
deleted += just_deleted
|
||||||
|
else:
|
||||||
|
just_deleted = 0 # break from loop, this is dry run
|
||||||
|
deleted = qs.count()
|
||||||
|
|
||||||
|
if just_deleted == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
skipped += (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count()
|
||||||
return skipped, deleted
|
return skipped, deleted
|
||||||
|
|
||||||
def cleanup_ad_hoc_commands(self):
|
def cleanup_ad_hoc_commands(self):
|
||||||
|
|||||||
177
awx/main/management/commands/deletion.py
Normal file
177
awx/main/management/commands/deletion.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
from django.contrib.contenttypes.models import ContentType
|
||||||
|
from django.db.models.deletion import (
|
||||||
|
DO_NOTHING, Collector, get_candidate_relations_to_delete,
|
||||||
|
)
|
||||||
|
from collections import Counter, OrderedDict
|
||||||
|
from django.db import transaction
|
||||||
|
from django.db.models import sql
|
||||||
|
|
||||||
|
|
||||||
|
def bulk_related_objects(field, objs, using):
|
||||||
|
# This overrides the method in django.contrib.contenttypes.fields.py
|
||||||
|
"""
|
||||||
|
Return all objects related to ``objs`` via this ``GenericRelation``.
|
||||||
|
"""
|
||||||
|
return field.remote_field.model._base_manager.db_manager(using).filter(**{
|
||||||
|
"%s__pk" % field.content_type_field_name: ContentType.objects.db_manager(using).get_for_model(
|
||||||
|
field.model, for_concrete_model=field.for_concrete_model).pk,
|
||||||
|
"%s__in" % field.object_id_field_name: list(objs.values_list('pk', flat=True))
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def pre_delete(qs):
|
||||||
|
# taken from .delete method in django.db.models.query.py
|
||||||
|
assert qs.query.can_filter(), \
|
||||||
|
"Cannot use 'limit' or 'offset' with delete."
|
||||||
|
|
||||||
|
if qs._fields is not None:
|
||||||
|
raise TypeError("Cannot call delete() after .values() or .values_list()")
|
||||||
|
|
||||||
|
del_query = qs._chain()
|
||||||
|
|
||||||
|
# The delete is actually 2 queries - one to find related objects,
|
||||||
|
# and one to delete. Make sure that the discovery of related
|
||||||
|
# objects is performed on the same database as the deletion.
|
||||||
|
del_query._for_write = True
|
||||||
|
|
||||||
|
# Disable non-supported fields.
|
||||||
|
del_query.query.select_for_update = False
|
||||||
|
del_query.query.select_related = False
|
||||||
|
del_query.query.clear_ordering(force_empty=True)
|
||||||
|
return del_query
|
||||||
|
|
||||||
|
|
||||||
|
class AWXCollector(Collector):
|
||||||
|
|
||||||
|
def add(self, objs, source=None, nullable=False, reverse_dependency=False):
|
||||||
|
"""
|
||||||
|
Add 'objs' to the collection of objects to be deleted. If the call is
|
||||||
|
the result of a cascade, 'source' should be the model that caused it,
|
||||||
|
and 'nullable' should be set to True if the relation can be null.
|
||||||
|
|
||||||
|
Return a list of all objects that were not already collected.
|
||||||
|
"""
|
||||||
|
if not objs.exists():
|
||||||
|
return objs
|
||||||
|
model = objs.model
|
||||||
|
self.data.setdefault(model, [])
|
||||||
|
self.data[model].append(objs)
|
||||||
|
# Nullable relationships can be ignored -- they are nulled out before
|
||||||
|
# deleting, and therefore do not affect the order in which objects have
|
||||||
|
# to be deleted.
|
||||||
|
if source is not None and not nullable:
|
||||||
|
if reverse_dependency:
|
||||||
|
source, model = model, source
|
||||||
|
self.dependencies.setdefault(
|
||||||
|
source._meta.concrete_model, set()).add(model._meta.concrete_model)
|
||||||
|
return objs
|
||||||
|
|
||||||
|
def add_field_update(self, field, value, objs):
|
||||||
|
"""
|
||||||
|
Schedule a field update. 'objs' must be a homogeneous iterable
|
||||||
|
collection of model instances (e.g. a QuerySet).
|
||||||
|
"""
|
||||||
|
if not objs.exists():
|
||||||
|
return
|
||||||
|
model = objs.model
|
||||||
|
self.field_updates.setdefault(model, {})
|
||||||
|
self.field_updates[model].setdefault((field, value), [])
|
||||||
|
self.field_updates[model][(field, value)].append(objs)
|
||||||
|
|
||||||
|
def collect(self, objs, source=None, nullable=False, collect_related=True,
|
||||||
|
source_attr=None, reverse_dependency=False, keep_parents=False):
|
||||||
|
"""
|
||||||
|
Add 'objs' to the collection of objects to be deleted as well as all
|
||||||
|
parent instances. 'objs' must be a homogeneous iterable collection of
|
||||||
|
model instances (e.g. a QuerySet). If 'collect_related' is True,
|
||||||
|
related objects will be handled by their respective on_delete handler.
|
||||||
|
|
||||||
|
If the call is the result of a cascade, 'source' should be the model
|
||||||
|
that caused it and 'nullable' should be set to True, if the relation
|
||||||
|
can be null.
|
||||||
|
|
||||||
|
If 'reverse_dependency' is True, 'source' will be deleted before the
|
||||||
|
current model, rather than after. (Needed for cascading to parent
|
||||||
|
models, the one case in which the cascade follows the forwards
|
||||||
|
direction of an FK rather than the reverse direction.)
|
||||||
|
|
||||||
|
If 'keep_parents' is True, data of parent model's will be not deleted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if hasattr(objs, 'polymorphic_disabled'):
|
||||||
|
objs.polymorphic_disabled = True
|
||||||
|
|
||||||
|
if self.can_fast_delete(objs):
|
||||||
|
self.fast_deletes.append(objs)
|
||||||
|
return
|
||||||
|
new_objs = self.add(objs, source, nullable,
|
||||||
|
reverse_dependency=reverse_dependency)
|
||||||
|
if not new_objs.exists():
|
||||||
|
return
|
||||||
|
|
||||||
|
model = new_objs.model
|
||||||
|
|
||||||
|
if not keep_parents:
|
||||||
|
# Recursively collect concrete model's parent models, but not their
|
||||||
|
# related objects. These will be found by meta.get_fields()
|
||||||
|
concrete_model = model._meta.concrete_model
|
||||||
|
for ptr in concrete_model._meta.parents.keys():
|
||||||
|
if ptr:
|
||||||
|
parent_objs = ptr.objects.filter(pk__in = new_objs.values_list('pk', flat=True))
|
||||||
|
self.collect(parent_objs, source=model,
|
||||||
|
collect_related=False,
|
||||||
|
reverse_dependency=True)
|
||||||
|
if collect_related:
|
||||||
|
parents = model._meta.parents
|
||||||
|
for related in get_candidate_relations_to_delete(model._meta):
|
||||||
|
# Preserve parent reverse relationships if keep_parents=True.
|
||||||
|
if keep_parents and related.model in parents:
|
||||||
|
continue
|
||||||
|
field = related.field
|
||||||
|
if field.remote_field.on_delete == DO_NOTHING:
|
||||||
|
continue
|
||||||
|
related_qs = self.related_objects(related, new_objs)
|
||||||
|
if self.can_fast_delete(related_qs, from_field=field):
|
||||||
|
self.fast_deletes.append(related_qs)
|
||||||
|
elif related_qs:
|
||||||
|
field.remote_field.on_delete(self, field, related_qs, self.using)
|
||||||
|
for field in model._meta.private_fields:
|
||||||
|
if hasattr(field, 'bulk_related_objects'):
|
||||||
|
# It's something like generic foreign key.
|
||||||
|
sub_objs = bulk_related_objects(field, new_objs, self.using)
|
||||||
|
self.collect(sub_objs, source=model, nullable=True)
|
||||||
|
|
||||||
|
def delete(self):
|
||||||
|
self.sort()
|
||||||
|
|
||||||
|
# collect pk_list before deletion (once things start to delete
|
||||||
|
# queries might not be able to retreive pk list)
|
||||||
|
del_dict = OrderedDict()
|
||||||
|
for model, instances in self.data.items():
|
||||||
|
del_dict.setdefault(model, [])
|
||||||
|
for inst in instances:
|
||||||
|
del_dict[model] += list(inst.values_list('pk', flat=True))
|
||||||
|
|
||||||
|
deleted_counter = Counter()
|
||||||
|
|
||||||
|
with transaction.atomic(using=self.using, savepoint=False):
|
||||||
|
|
||||||
|
# update fields
|
||||||
|
for model, instances_for_fieldvalues in self.field_updates.items():
|
||||||
|
for (field, value), instances in instances_for_fieldvalues.items():
|
||||||
|
for inst in instances:
|
||||||
|
query = sql.UpdateQuery(model)
|
||||||
|
query.update_batch(inst.values_list('pk', flat=True),
|
||||||
|
{field.name: value}, self.using)
|
||||||
|
# fast deletes
|
||||||
|
for qs in self.fast_deletes:
|
||||||
|
count = qs._raw_delete(using=self.using)
|
||||||
|
deleted_counter[qs.model._meta.label] += count
|
||||||
|
|
||||||
|
# delete instances
|
||||||
|
for model, pk_list in del_dict.items():
|
||||||
|
query = sql.DeleteQuery(model)
|
||||||
|
count = query.delete_batch(pk_list, self.using)
|
||||||
|
deleted_counter[model._meta.label] += count
|
||||||
|
|
||||||
|
return sum(deleted_counter.values()), dict(deleted_counter)
|
||||||
179
awx/main/tests/functional/commands/test_cleanup_jobs.py
Normal file
179
awx/main/tests/functional/commands/test_cleanup_jobs.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
import pytest
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from pytz import timezone
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
from django.db.models.deletion import Collector, SET_NULL, CASCADE
|
||||||
|
from django.core.management import call_command
|
||||||
|
|
||||||
|
from awx.main.management.commands.deletion import AWXCollector
|
||||||
|
from awx.main.models import (
|
||||||
|
JobTemplate, User, Job, JobEvent, Notification,
|
||||||
|
WorkflowJobNode, JobHostSummary
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def setup_environment(inventory, project, machine_credential, host, notification_template, label):
|
||||||
|
'''
|
||||||
|
Create old jobs and new jobs, with various other objects to hit the
|
||||||
|
related fields of Jobs. This makes sure on_delete() effects are tested
|
||||||
|
properly.
|
||||||
|
'''
|
||||||
|
old_jobs = []
|
||||||
|
new_jobs = []
|
||||||
|
days = 10
|
||||||
|
days_str = str(days)
|
||||||
|
|
||||||
|
jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project)
|
||||||
|
jt.credentials.add(machine_credential)
|
||||||
|
jt_user = User.objects.create(username='jobtemplateuser')
|
||||||
|
jt.execute_role.members.add(jt_user)
|
||||||
|
|
||||||
|
notification = Notification()
|
||||||
|
notification.notification_template = notification_template
|
||||||
|
notification.save()
|
||||||
|
|
||||||
|
for i in range(3):
|
||||||
|
job1 = jt.create_job()
|
||||||
|
job1.created =datetime.now(tz=timezone('UTC'))
|
||||||
|
job1.save()
|
||||||
|
# create jobs with current time
|
||||||
|
JobEvent.create_from_data(job_id=job1.pk, uuid='abc123', event='runner_on_start',
|
||||||
|
stdout='a' * 1025).save()
|
||||||
|
new_jobs.append(job1)
|
||||||
|
|
||||||
|
job2 = jt.create_job()
|
||||||
|
# create jobs 10 days ago
|
||||||
|
job2.created = datetime.now(tz=timezone('UTC')) - timedelta(days=days)
|
||||||
|
job2.save()
|
||||||
|
job2.dependent_jobs.add(job1)
|
||||||
|
JobEvent.create_from_data(job_id=job2.pk, uuid='abc123', event='runner_on_start',
|
||||||
|
stdout='a' * 1025).save()
|
||||||
|
old_jobs.append(job2)
|
||||||
|
|
||||||
|
jt.last_job = job2
|
||||||
|
jt.current_job = job2
|
||||||
|
jt.save()
|
||||||
|
host.last_job = job2
|
||||||
|
host.save()
|
||||||
|
notification.unifiedjob_notifications.add(job2)
|
||||||
|
label.unifiedjob_labels.add(job2)
|
||||||
|
jn = WorkflowJobNode.objects.create(job=job2)
|
||||||
|
jn.save()
|
||||||
|
jh = JobHostSummary.objects.create(job=job2)
|
||||||
|
jh.save()
|
||||||
|
|
||||||
|
return (old_jobs, new_jobs, days_str)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_cleanup_jobs(setup_environment):
|
||||||
|
(old_jobs, new_jobs, days_str) = setup_environment
|
||||||
|
|
||||||
|
# related_fields
|
||||||
|
related = [f for f in Job._meta.get_fields(include_hidden=True)
|
||||||
|
if f.auto_created and not
|
||||||
|
f.concrete and
|
||||||
|
(f.one_to_one or f.one_to_many)]
|
||||||
|
|
||||||
|
job = old_jobs[-1] # last job
|
||||||
|
|
||||||
|
# gather related objects for job
|
||||||
|
related_should_be_removed = {}
|
||||||
|
related_should_be_null = {}
|
||||||
|
for r in related:
|
||||||
|
qs = r.related_model._base_manager.using('default').filter(
|
||||||
|
**{"%s__in" % r.field.name: [job.pk]}
|
||||||
|
)
|
||||||
|
if qs.exists():
|
||||||
|
if r.field.remote_field.on_delete == CASCADE:
|
||||||
|
related_should_be_removed[qs.model] = set(qs.values_list('pk', flat=True))
|
||||||
|
if r.field.remote_field.on_delete == SET_NULL:
|
||||||
|
related_should_be_null[(qs.model,r.field.name)] = set(qs.values_list('pk', flat=True))
|
||||||
|
|
||||||
|
assert related_should_be_removed
|
||||||
|
assert related_should_be_null
|
||||||
|
|
||||||
|
call_command('cleanup_jobs', '--days', days_str)
|
||||||
|
# make sure old jobs are removed
|
||||||
|
assert not Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]).exists()
|
||||||
|
|
||||||
|
# make sure new jobs are untouched
|
||||||
|
assert len(new_jobs) == Job.objects.filter(pk__in=[obj.pk for obj in new_jobs]).count()
|
||||||
|
|
||||||
|
# make sure related objects are destroyed or set to NULL (none)
|
||||||
|
for model, values in related_should_be_removed.items():
|
||||||
|
assert not model.objects.filter(pk__in=values).exists()
|
||||||
|
|
||||||
|
for (model,fieldname), values in related_should_be_null.items():
|
||||||
|
for v in values:
|
||||||
|
assert not getattr(model.objects.get(pk=v), fieldname)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_awxcollector(setup_environment):
|
||||||
|
'''
|
||||||
|
Efforts to improve the performance of cleanup_jobs involved
|
||||||
|
sub-classing the django Collector class. This unit test will
|
||||||
|
check for parity between the django Collector and the modified
|
||||||
|
AWXCollector class. AWXCollector is used in cleanup_jobs to
|
||||||
|
bulk-delete old jobs from the database.
|
||||||
|
|
||||||
|
Specifically, Collector has four dictionaries to check:
|
||||||
|
.dependencies, .data, .fast_deletes, and .field_updates
|
||||||
|
|
||||||
|
These tests will convert each dictionary from AWXCollector
|
||||||
|
(after running .collect on jobs), from querysets to sets of
|
||||||
|
objects. The final result should be a dictionary that is
|
||||||
|
equivalent to django's Collector.
|
||||||
|
'''
|
||||||
|
|
||||||
|
(old_jobs, new_jobs, days_str) = setup_environment
|
||||||
|
collector = Collector('default')
|
||||||
|
collector.collect(old_jobs)
|
||||||
|
|
||||||
|
awx_col = AWXCollector('default')
|
||||||
|
# awx_col accepts a queryset as input
|
||||||
|
awx_col.collect(Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]))
|
||||||
|
|
||||||
|
# check that dependencies are the same
|
||||||
|
assert awx_col.dependencies == collector.dependencies
|
||||||
|
|
||||||
|
# check that objects to delete are the same
|
||||||
|
awx_del_dict = OrderedDict()
|
||||||
|
for model, instances in awx_col.data.items():
|
||||||
|
awx_del_dict.setdefault(model, set())
|
||||||
|
for inst in instances:
|
||||||
|
# .update() will put each object in a queryset into the set
|
||||||
|
awx_del_dict[model].update(inst)
|
||||||
|
assert awx_del_dict == collector.data
|
||||||
|
|
||||||
|
# check that field updates are the same
|
||||||
|
awx_del_dict = OrderedDict()
|
||||||
|
for model, instances_for_fieldvalues in awx_col.field_updates.items():
|
||||||
|
awx_del_dict.setdefault(model, {})
|
||||||
|
for (field, value), instances in instances_for_fieldvalues.items():
|
||||||
|
awx_del_dict[model].setdefault((field,value), set())
|
||||||
|
for inst in instances:
|
||||||
|
awx_del_dict[model][(field,value)].update(inst)
|
||||||
|
|
||||||
|
# collector field updates don't use the base (polymorphic parent) model, e.g.
|
||||||
|
# it will use JobTemplate instead of UnifiedJobTemplate. Therefore,
|
||||||
|
# we need to rebuild the dictionary and grab the model from the field
|
||||||
|
collector_del_dict = OrderedDict()
|
||||||
|
for model, instances_for_fieldvalues in collector.field_updates.items():
|
||||||
|
for (field,value), instances in instances_for_fieldvalues.items():
|
||||||
|
collector_del_dict.setdefault(field.model, {})
|
||||||
|
collector_del_dict[field.model][(field, value)] = collector.field_updates[model][(field,value)]
|
||||||
|
assert awx_del_dict == collector_del_dict
|
||||||
|
|
||||||
|
# check that fast deletes are the same
|
||||||
|
collector_fast_deletes = set()
|
||||||
|
for q in collector.fast_deletes:
|
||||||
|
collector_fast_deletes.update(q)
|
||||||
|
|
||||||
|
awx_col_fast_deletes = set()
|
||||||
|
for q in awx_col.fast_deletes:
|
||||||
|
awx_col_fast_deletes.update(q)
|
||||||
|
assert collector_fast_deletes == awx_col_fast_deletes
|
||||||
Reference in New Issue
Block a user