AC-1235 Fix large inventory import tests by querying in batches.

This commit is contained in:
Chris Church
2014-05-19 00:45:21 -04:00
parent 3086ec4930
commit 6be2e07cbc
2 changed files with 198 additions and 116 deletions

View File

@@ -585,7 +585,7 @@ class Command(NoArgsCommand):
self._batch_add_m2m_cache = {} self._batch_add_m2m_cache = {}
cached_objs = self._batch_add_m2m_cache.setdefault(key, []) cached_objs = self._batch_add_m2m_cache.setdefault(key, [])
cached_objs.extend(objs) cached_objs.extend(objs)
if len(cached_objs) > 100 or flush: if len(cached_objs) > self._batch_size or flush:
if len(cached_objs): if len(cached_objs):
related_manager.add(*cached_objs) related_manager.add(*cached_objs)
self._batch_add_m2m_cache[key] = [] self._batch_add_m2m_cache[key] = []
@@ -635,25 +635,39 @@ class Command(NoArgsCommand):
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
queries_before = len(connection.queries) queries_before = len(connection.queries)
if self.inventory_source.group: if self.inventory_source.group:
del_hosts = self.inventory_source.group.all_hosts hosts_qs = self.inventory_source.group.all_hosts
# FIXME: Also include hosts from inventory_source.managed_hosts? # FIXME: Also include hosts from inventory_source.managed_hosts?
else: else:
del_hosts = self.inventory.hosts.filter(active=True) hosts_qs = self.inventory.hosts.filter(active=True)
# Build list of all host pks, remove all that should not be deleted.
del_host_pks = set(hosts_qs.values_list('pk', flat=True))
if self.instance_id_var: if self.instance_id_var:
instance_ids = set(self.mem_instance_id_map.keys()) all_instance_ids = self.mem_instance_id_map.keys()
host_pks = set([v for k,v in self.db_instance_id_map.items() if k in instance_ids]) for offset in xrange(0, len(all_instance_ids), self._batch_size):
host_names = set(self.mem_instance_id_map.values()) - set(self.all_group.all_hosts.keys()) instance_ids = all_instance_ids[offset:(offset + self._batch_size)]
del_hosts = del_hosts.exclude(Q(name__in=host_names) | Q(instance_id__in=instance_ids) | Q(pk__in=host_pks)) for host_pk in hosts_qs.filter(instance_id__in=instance_ids).values_list('pk', flat=True):
del_host_pks.discard(host_pk)
for host_pk in set([v for k,v in self.db_instance_id_map.items() if k in instance_ids]):
del_host_pks.discard(host_pk)
all_host_names = list(set(self.mem_instance_id_map.values()) - set(self.all_group.all_hosts.keys()))
else: else:
del_hosts = del_hosts.exclude(name__in=self.all_group.all_hosts.keys()) all_host_names = self.all_group.all_hosts.keys()
for host in del_hosts: for offset in xrange(0, len(all_host_names), self._batch_size):
host_name = host.name host_names = all_host_names[offset:(offset + self._batch_size)]
host.mark_inactive()#from_inventory_import=True) for host_pk in hosts_qs.filter(name__in=host_names).values_list('pk', flat=True):
self.logger.info('Deleted host "%s"', host_name) del_host_pks.discard(host_pk)
# Now delete all remaining hosts in batches.
all_del_pks = list(del_host_pks)
for offset in xrange(0, len(all_del_pks), self._batch_size):
del_pks = all_del_pks[offset:(offset + self._batch_size)]
for host in hosts_qs.filter(pk__in=del_pks):
host_name = host.name
host.mark_inactive()
self.logger.info('Deleted host "%s"', host_name)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
self.logger.warning('host deletions took %d queries for %d hosts', self.logger.warning('host deletions took %d queries for %d hosts',
len(connection.queries) - queries_before, len(connection.queries) - queries_before,
del_hosts.count()) len(all_del_pks))
def _delete_groups(self): def _delete_groups(self):
''' '''
@@ -665,20 +679,29 @@ class Command(NoArgsCommand):
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
queries_before = len(connection.queries) queries_before = len(connection.queries)
if self.inventory_source.group: if self.inventory_source.group:
del_groups = self.inventory_source.group.all_children groups_qs = self.inventory_source.group.all_children
# FIXME: Also include groups from inventory_source.managed_groups? # FIXME: Also include groups from inventory_source.managed_groups?
else: else:
del_groups = self.inventory.groups.filter(active=True) groups_qs = self.inventory.groups.filter(active=True)
group_names = set(self.all_group.all_groups.keys()) # Build list of all group pks, remove those that should not be deleted.
del_groups = del_groups.exclude(name__in=group_names) del_group_pks = set(groups_qs.values_list('pk', flat=True))
for group in del_groups: all_group_names = self.all_group.all_groups.keys()
group_name = group.name for offset in xrange(0, len(all_group_names), self._batch_size):
group.mark_inactive(recompute=False)#from_inventory_import=True) group_names = all_group_names[offset:(offset + self._batch_size)]
self.logger.info('Group "%s" deleted', group_name) for group_pk in groups_qs.filter(name__in=group_names).values_list('pk', flat=True):
del_group_pks.discard(group_pk)
# Now delete all remaining groups in batches.
all_del_pks = list(del_group_pks)
for offset in xrange(0, len(all_del_pks), self._batch_size):
del_pks = all_del_pks[offset:(offset + self._batch_size)]
for group in groups_qs.filter(pk__in=del_pks):
group_name = group.name
group.mark_inactive(recompute=False)
self.logger.info('Group "%s" deleted', group_name)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
self.logger.warning('group deletions took %d queries for %d groups', self.logger.warning('group deletions took %d queries for %d groups',
len(connection.queries) - queries_before, len(connection.queries) - queries_before,
del_groups.count()) len(all_del_pks))
def _delete_group_children_and_hosts(self): def _delete_group_children_and_hosts(self):
''' '''
@@ -697,28 +720,47 @@ class Command(NoArgsCommand):
else: else:
db_groups = self.inventory.groups.filter(active=True) db_groups = self.inventory.groups.filter(active=True)
for db_group in db_groups: for db_group in db_groups:
# Delete child group relationships not present in imported data.
db_children = db_group.children.filter(active=True) db_children = db_group.children.filter(active=True)
db_children_name_pk_map = dict(db_children.values_list('name', 'pk'))
mem_children = self.all_group.all_groups[db_group.name].children mem_children = self.all_group.all_groups[db_group.name].children
mem_children_names = [g.name for g in mem_children] for mem_group in mem_children:
for db_child in db_children.exclude(name__in=mem_children_names): db_children_name_pk_map.pop(mem_group.name, None)
group_group_count += 1 del_child_group_pks = list(set(db_children_name_pk_map.values()))
if db_child not in db_group.children.filter(active=True): for offset in xrange(0, len(del_child_group_pks), self._batch_size):
continue child_group_pks = del_child_group_pks[offset:(offset + self._batch_size)]
db_group.children.remove(db_child) for db_child in db_children.filter(pk__in=child_group_pks):
self.logger.info('Group "%s" removed from group "%s"', group_group_count += 1
db_child.name, db_group.name) db_group.children.remove(db_child)
self.logger.info('Group "%s" removed from group "%s"',
db_child.name, db_group.name)
# Delete group/host relationships not present in imported data.
db_hosts = db_group.hosts.filter(active=True) db_hosts = db_group.hosts.filter(active=True)
del_host_pks = set(db_hosts.values_list('pk', flat=True))
mem_hosts = self.all_group.all_groups[db_group.name].hosts mem_hosts = self.all_group.all_groups[db_group.name].hosts
mem_host_names = set([h.name for h in mem_hosts if not h.instance_id]) all_mem_host_names = [h.name for h in mem_hosts if not h.instance_id]
mem_instance_ids = set([h.instance_id for h in mem_hosts if h.instance_id]) for offset in xrange(0, len(all_mem_host_names), self._batch_size):
db_host_pks = set([v for k,v in self.db_instance_id_map.items() if k in mem_instance_ids]) mem_host_names = all_mem_host_names[offset:(offset + self._batch_size)]
for db_host in db_hosts.exclude(Q(name__in=mem_host_names) | Q(instance_id__in=mem_instance_ids) | Q(pk__in=db_host_pks)): for db_host_pk in db_hosts.filter(name__in=mem_host_names).values_list('pk', flat=True):
group_host_count += 1 del_host_pks.discard(db_host_pk)
if db_host not in db_group.hosts.filter(active=True): all_mem_instance_ids = [h.instance_id for h in mem_hosts if h.instance_id]
continue for offset in xrange(0, len(all_mem_instance_ids), self._batch_size):
db_group.hosts.remove(db_host) mem_instance_ids = all_mem_instance_ids[offset:(offset + self._batch_size)]
self.logger.info('Host "%s" removed from group "%s"', for db_host_pk in db_hosts.filter(instance_id__in=mem_instance_ids).values_list('pk', flat=True):
db_host.name, db_group.name) del_host_pks.discard(db_host_pk)
all_db_host_pks = [v for k,v in self.db_instance_id_map.items() if k in mem_instance_ids]
for db_host_pk in all_db_host_pks:
del_host_pks.discard(db_host_pk)
del_host_pks = list(del_host_pks)
for offset in xrange(0, len(del_host_pks), self._batch_size):
del_pks = del_host_pks[offset:(offset + self._batch_size)]
for db_host in db_hosts.filter(pk__in=del_pks):
group_host_count += 1
if db_host not in db_group.hosts.filter(active=True):
continue
db_group.hosts.remove(db_host)
self.logger.info('Host "%s" removed from group "%s"',
db_host.name, db_group.name)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
self.logger.warning('group-group and group-host deletions took %d queries for %d relationships', self.logger.warning('group-group and group-host deletions took %d queries for %d relationships',
len(connection.queries) - queries_before, len(connection.queries) - queries_before,
@@ -762,28 +804,30 @@ class Command(NoArgsCommand):
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
queries_before = len(connection.queries) queries_before = len(connection.queries)
inv_src_group = self.inventory_source.group inv_src_group = self.inventory_source.group
group_names = set(self.all_group.all_groups.keys()) all_group_names = self.all_group.all_groups.keys()
for group in self.inventory.groups.filter(name__in=group_names): for offset in xrange(0, len(all_group_names), self._batch_size):
mem_group = self.all_group.all_groups[group.name] group_names = all_group_names[offset:(offset + self._batch_size)]
db_variables = group.variables_dict for group in self.inventory.groups.filter(name__in=group_names):
if self.overwrite_vars or self.overwrite: mem_group = self.all_group.all_groups[group.name]
db_variables = mem_group.variables db_variables = group.variables_dict
else:
db_variables.update(mem_group.variables)
if db_variables != group.variables_dict:
group.variables = json.dumps(db_variables)
group.save(update_fields=['variables'])
if self.overwrite_vars or self.overwrite: if self.overwrite_vars or self.overwrite:
self.logger.info('Group "%s" variables replaced', group.name) db_variables = mem_group.variables
else: else:
self.logger.info('Group "%s" variables updated', group.name) db_variables.update(mem_group.variables)
else: if db_variables != group.variables_dict:
self.logger.info('Group "%s" variables unmodified', group.name) group.variables = json.dumps(db_variables)
group_names.remove(group.name) group.save(update_fields=['variables'])
if inv_src_group and inv_src_group != group: if self.overwrite_vars or self.overwrite:
self._batch_add_m2m(inv_src_group.children, group) self.logger.info('Group "%s" variables replaced', group.name)
self._batch_add_m2m(self.inventory_source.groups, group) else:
for group_name in group_names: self.logger.info('Group "%s" variables updated', group.name)
else:
self.logger.info('Group "%s" variables unmodified', group.name)
all_group_names.remove(group.name)
if inv_src_group and inv_src_group != group:
self._batch_add_m2m(inv_src_group.children, group)
self._batch_add_m2m(self.inventory_source.groups, group)
for group_name in all_group_names:
mem_group = self.all_group.all_groups[group_name] mem_group = self.all_group.all_groups[group_name]
group = self.inventory.groups.create(name=group_name, variables=json.dumps(mem_group.variables), description='imported') group = self.inventory.groups.create(name=group_name, variables=json.dumps(mem_group.variables), description='imported')
# Access auto one-to-one attribute to create related object. # Access auto one-to-one attribute to create related object.
@@ -890,25 +934,40 @@ class Command(NoArgsCommand):
mem_host_name_map[k] = v mem_host_name_map[k] = v
# Update all existing hosts where we know the PK based on instance_id. # Update all existing hosts where we know the PK based on instance_id.
for db_host in self.inventory.hosts.filter(active=True, pk__in=mem_host_pk_map.keys()): all_host_pks = mem_host_pk_map.keys()
mem_host = mem_host_pk_map[db_host.pk] for offset in xrange(0, len(all_host_pks), self._batch_size):
self._update_db_host_from_mem_host(db_host, mem_host) host_pks = all_host_pks[offset:(offset + self._batch_size)]
host_pks_updated.add(db_host.pk) for db_host in self.inventory.hosts.filter(active=True, pk__in=host_pks):
mem_host_names_to_update.discard(mem_host.name) if db_host.pk in host_pks_updated:
continue
mem_host = mem_host_pk_map[db_host.pk]
self._update_db_host_from_mem_host(db_host, mem_host)
host_pks_updated.add(db_host.pk)
mem_host_names_to_update.discard(mem_host.name)
# Update all existing hosts where we know the instance_id. # Update all existing hosts where we know the instance_id.
for db_host in self.inventory.hosts.filter(active=True, instance_id__in=mem_host_instance_id_map.keys()).exclude(pk__in=host_pks_updated): all_instance_ids = mem_host_instance_id_map.keys()
mem_host = mem_host_instance_id_map[db_host.instance_id] for offset in xrange(0, len(all_instance_ids), self._batch_size):
self._update_db_host_from_mem_host(db_host, mem_host) instance_ids = all_instance_ids[offset:(offset + self._batch_size)]
host_pks_updated.add(db_host.pk) for db_host in self.inventory.hosts.filter(active=True, instance_id__in=instance_ids):
mem_host_names_to_update.discard(mem_host.name) if db_host.pk in host_pks_updated:
continue
mem_host = mem_host_instance_id_map[db_host.instance_id]
self._update_db_host_from_mem_host(db_host, mem_host)
host_pks_updated.add(db_host.pk)
mem_host_names_to_update.discard(mem_host.name)
# Update all existing hosts by name. # Update all existing hosts by name.
for db_host in self.inventory.hosts.filter(active=True, name__in=mem_host_name_map.keys()).exclude(pk__in=host_pks_updated): all_host_names = mem_host_name_map.keys()
mem_host = mem_host_name_map[db_host.name] for offset in xrange(0, len(all_host_names), self._batch_size):
self._update_db_host_from_mem_host(db_host, mem_host) host_names = all_host_names[offset:(offset + self._batch_size)]
host_pks_updated.add(db_host.pk) for db_host in self.inventory.hosts.filter(active=True, name__in=host_names):
mem_host_names_to_update.discard(mem_host.name) if db_host.pk in host_pks_updated:
continue
mem_host = mem_host_name_map[db_host.name]
self._update_db_host_from_mem_host(db_host, mem_host)
host_pks_updated.add(db_host.pk)
mem_host_names_to_update.discard(mem_host.name)
# Create any new hosts. # Create any new hosts.
for mem_host_name in mem_host_names_to_update: for mem_host_name in mem_host_names_to_update:
@@ -951,19 +1010,22 @@ class Command(NoArgsCommand):
''' '''
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
queries_before = len(connection.queries) queries_before = len(connection.queries)
group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.children] all_group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.children]
group_group_count = 0 group_group_count = 0
for db_group in self.inventory.groups.filter(name__in=group_names): for offset in xrange(0, len(all_group_names), self._batch_size):
mem_group = self.all_group.all_groups[db_group.name] group_names = all_group_names[offset:(offset + self._batch_size)]
group_group_count += len(mem_group.children) for db_group in self.inventory.groups.filter(name__in=group_names):
child_names = set([g.name for g in mem_group.children]) mem_group = self.all_group.all_groups[db_group.name]
db_children_qs = self.inventory.groups.filter(name__in=child_names) group_group_count += len(mem_group.children)
for db_child in db_children_qs.filter(children__id=db_group.id): child_names = set([g.name for g in mem_group.children])
self.logger.info('Group "%s" already child of group "%s"', db_child.name, db_group.name) db_children_qs = self.inventory.groups.filter(name__in=child_names)
for db_child in db_children_qs.exclude(children__id=db_group.id): # FIXME: May fail unit tests when len(child_names) > 1000.
self._batch_add_m2m(db_group.children, db_child) for db_child in db_children_qs.filter(children__id=db_group.id):
self.logger.info('Group "%s" added as child of "%s"', db_child.name, db_group.name) self.logger.info('Group "%s" already child of group "%s"', db_child.name, db_group.name)
self._batch_add_m2m(db_group.children, flush=True) for db_child in db_children_qs.exclude(children__id=db_group.id):
self._batch_add_m2m(db_group.children, db_child)
self.logger.info('Group "%s" added as child of "%s"', db_child.name, db_group.name)
self._batch_add_m2m(db_group.children, flush=True)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
self.logger.warning('Group-group updates took %d queries for %d group-group relationships', self.logger.warning('Group-group updates took %d queries for %d group-group relationships',
len(connection.queries) - queries_before, group_group_count) len(connection.queries) - queries_before, group_group_count)
@@ -973,20 +1035,32 @@ class Command(NoArgsCommand):
# belongs. # belongs.
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
queries_before = len(connection.queries) queries_before = len(connection.queries)
group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.hosts] all_group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.hosts]
group_host_count = 0 group_host_count = 0
for db_group in self.inventory.groups.filter(name__in=group_names): for offset in xrange(0, len(all_group_names), self._batch_size):
mem_group = self.all_group.all_groups[db_group.name] group_names = all_group_names[offset:(offset + self._batch_size)]
group_host_count += len(mem_group.hosts) for db_group in self.inventory.groups.filter(name__in=group_names):
host_names = set([h.name for h in mem_group.hosts if not h.instance_id]) mem_group = self.all_group.all_groups[db_group.name]
host_instance_ids = set([h.instance_id for h in mem_group.hosts if h.instance_id]) group_host_count += len(mem_group.hosts)
db_hosts_qs = self.inventory.hosts.filter(Q(name__in=host_names) | Q(instance_id__in=host_instance_ids)) all_host_names = [h.name for h in mem_group.hosts if not h.instance_id]
for db_host in db_hosts_qs.filter(groups__id=db_group.id): for offset2 in xrange(0, len(all_host_names), self._batch_size):
self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name) host_names = all_host_names[offset2:(offset2 + self._batch_size)]
for db_host in db_hosts_qs.exclude(groups__id=db_group.id): db_hosts_qs = self.inventory.hosts.filter(name__in=host_names)
self._batch_add_m2m(db_group.hosts, db_host) for db_host in db_hosts_qs.filter(groups__id=db_group.id):
self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name) self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name)
self._batch_add_m2m(db_group.hosts, flush=True) for db_host in db_hosts_qs.exclude(groups__id=db_group.id):
self._batch_add_m2m(db_group.hosts, db_host)
self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name)
all_instance_ids = [h.instance_id for h in mem_group.hosts if h.instance_id]
for offset2 in xrange(0, len(all_instance_ids), self._batch_size):
instance_ids = all_instance_ids[offset2:(offset2 + self._batch_size)]
db_hosts_qs = self.inventory.hosts.filter(instance_id__in=instance_ids)
for db_host in db_hosts_qs.filter(groups__id=db_group.id):
self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name)
for db_host in db_hosts_qs.exclude(groups__id=db_group.id):
self._batch_add_m2m(db_group.hosts, db_host)
self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name)
self._batch_add_m2m(db_group.hosts, flush=True)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
self.logger.warning('Group-host updates took %d queries for %d group-host relationships', self.logger.warning('Group-host updates took %d queries for %d group-host relationships',
len(connection.queries) - queries_before, group_host_count) len(connection.queries) - queries_before, group_host_count)
@@ -997,6 +1071,8 @@ class Command(NoArgsCommand):
merging as appropriate. merging as appropriate.
''' '''
# FIXME: Attribute changes to superuser? # FIXME: Attribute changes to superuser?
# Perform __in queries in batches (mainly for unit tests using SQLite).
self._batch_size = 500
self._build_db_instance_id_map() self._build_db_instance_id_map()
self._build_mem_instance_id_map() self._build_mem_instance_id_map()
if self.overwrite: if self.overwrite:

View File

@@ -202,12 +202,15 @@ class Inventory(CommonModel):
for host_pk in hosts_to_clear.values_list('pk', flat=True): for host_pk in hosts_to_clear.values_list('pk', flat=True):
host_updates = hosts_to_updates.setdefault(host_pk, {}) host_updates = hosts_to_updates.setdefault(host_pk, {})
host_updates['has_inventory_sources'] = False host_updates['has_inventory_sources'] = False
# Now apply updates to hosts where needed. # Now apply updates to hosts where needed (in batches).
for host in hosts_qs.filter(pk__in=hosts_to_update.keys()): all_update_pks = hosts_to_update.keys()
host_updates = hosts_to_update[host.pk] for offset in xrange(0, len(all_update_pks), 500):
for field, value in host_updates.items(): update_pks = all_update_pks[offset:(offset + 500)]
setattr(host, field, value) for host in hosts_qs.filter(pk__in=update_pks):
host.save(update_fields=host_updates.keys()) host_updates = hosts_to_update[host.pk]
for field, value in host_updates.items():
setattr(host, field, value)
host.save(update_fields=host_updates.keys())
def update_group_computed_fields(self): def update_group_computed_fields(self):
''' '''
@@ -264,16 +267,19 @@ class Inventory(CommonModel):
if group_updates['has_active_failures']: if group_updates['has_active_failures']:
failed_group_pks.add(group_pk) failed_group_pks.add(group_pk)
# Now apply updates to each group as needed. # Now apply updates to each group as needed (in batches).
for group in self.groups.filter(pk__in=groups_to_update.keys()): all_update_pks = groups_to_update.keys()
group_updates = groups_to_update[group.pk] for offset in xrange(0, len(all_update_pks), 500):
for field, value in group_updates.items(): update_pks = all_update_pks[offset:(offset + 500)]
if getattr(group, field) != value: for group in self.groups.filter(pk__in=update_pks):
setattr(group, field, value) group_updates = groups_to_update[group.pk]
else: for field, value in group_updates.items():
group_updates.pop(field) if getattr(group, field) != value:
if group_updates: setattr(group, field, value)
group.save(update_fields=group_updates.keys()) else:
group_updates.pop(field)
if group_updates:
group.save(update_fields=group_updates.keys())
def update_computed_fields(self, update_groups=True, update_hosts=True): def update_computed_fields(self, update_groups=True, update_hosts=True):
''' '''