diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 53a1660c4f..f4431b2705 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -874,21 +874,20 @@ class Command(BaseCommand): Load inventory from in-memory groups to the database, overwriting or merging as appropriate. ''' - with advisory_lock('inventory_{}_update'.format(self.inventory.id)): - # FIXME: Attribute changes to superuser? - # Perform __in queries in batches (mainly for unit tests using SQLite). - self._batch_size = 500 - self._build_db_instance_id_map() - self._build_mem_instance_id_map() - if self.overwrite: - self._delete_hosts() - self._delete_groups() - self._delete_group_children_and_hosts() - self._update_inventory() - self._create_update_groups() - self._create_update_hosts() - self._create_update_group_children() - self._create_update_group_hosts() + # FIXME: Attribute changes to superuser? + # Perform __in queries in batches (mainly for unit tests using SQLite). + self._batch_size = 500 + self._build_db_instance_id_map() + self._build_mem_instance_id_map() + if self.overwrite: + self._delete_hosts() + self._delete_groups() + self._delete_group_children_and_hosts() + self._update_inventory() + self._create_update_groups() + self._create_update_hosts() + self._create_update_group_children() + self._create_update_group_hosts() def remote_tower_license_compare(self, local_license_type): # this requires https://github.com/ansible/ansible/pull/52747 @@ -998,143 +997,144 @@ class Command(BaseCommand): raise CommandError('invalid regular expression for --host-filter') begin = time.time() - self.load_inventory_from_database() + with advisory_lock('inventory_{}_update'.format(self.inventory_id)): + self.load_inventory_from_database() - try: - self.check_license() - except CommandError as e: - self.mark_license_failure(save=True) - raise e + try: + self.check_license() + except CommandError as e: + self.mark_license_failure(save=True) + raise e - try: - # Check the per-org host limits - self.check_org_host_limit() - except CommandError as e: - self.mark_org_limits_failure(save=True) - raise e - - status, tb, exc = 'error', '', None - try: - if settings.SQL_DEBUG: - queries_before = len(connection.queries) - - # Update inventory update for this command line invocation. - with ignore_inventory_computed_fields(): - iu = self.inventory_update - if iu.status != 'running': - with transaction.atomic(): - self.inventory_update.status = 'running' - self.inventory_update.save() - - source = self.get_source_absolute_path(self.source) - - data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom, - venv_path=venv_path, verbosity=self.verbosity).load() - - logger.debug('Finished loading from source: %s', source) - logger.info('Processing JSON output...') - inventory = MemInventory( - group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) - inventory = dict_to_mem_data(data, inventory=inventory) - - del data # forget dict from import, could be large - - logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), - len(inventory.all_group.all_hosts)) - - if self.exclude_empty_groups: - inventory.delete_empty_groups() - - self.all_group = inventory.all_group - - if settings.DEBUG: - # depending on inventory source, this output can be - # *exceedingly* verbose - crawling a deeply nested - # inventory/group data structure and printing metadata about - # each host and its memberships - # - # it's easy for this scale of data to overwhelm pexpect, - # (and it's likely only useful for purposes of debugging the - # actual inventory import code), so only print it if we have to: - # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 - self.all_group.debug_tree() - - with batch_role_ancestor_rebuilding(): - # If using with transaction.atomic() with try ... catch, - # with transaction.atomic() must be inside the try section of the code as per Django docs - try: - # Ensure that this is managed as an atomic SQL transaction, - # and thus properly rolled back if there is an issue. - with transaction.atomic(): - # Merge/overwrite inventory into database. - if settings.SQL_DEBUG: - logger.warning('loading into database...') - with ignore_inventory_computed_fields(): - if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): - self.load_into_database() - else: - with disable_activity_stream(): - self.load_into_database() - if settings.SQL_DEBUG: - queries_before2 = len(connection.queries) - self.inventory.update_computed_fields() - if settings.SQL_DEBUG: - logger.warning('update computed fields took %d queries', - len(connection.queries) - queries_before2) - # Check if the license is valid. - # If the license is not valid, a CommandError will be thrown, - # and inventory update will be marked as invalid. - # with transaction.atomic() will roll back the changes. - license_fail = True - self.check_license() - - # Check the per-org host limits - license_fail = False - self.check_org_host_limit() - except CommandError as e: - if license_fail: - self.mark_license_failure() - else: - self.mark_org_limits_failure() - raise e + try: + # Check the per-org host limits + self.check_org_host_limit() + except CommandError as e: + self.mark_org_limits_failure(save=True) + raise e + status, tb, exc = 'error', '', None + try: if settings.SQL_DEBUG: - logger.warning('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) + queries_before = len(connection.queries) + + # Update inventory update for this command line invocation. + with ignore_inventory_computed_fields(): + iu = self.inventory_update + if iu.status != 'running': + with transaction.atomic(): + self.inventory_update.status = 'running' + self.inventory_update.save() + + source = self.get_source_absolute_path(self.source) + + data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom, + venv_path=venv_path, verbosity=self.verbosity).load() + + logger.debug('Finished loading from source: %s', source) + logger.info('Processing JSON output...') + inventory = MemInventory( + group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) + inventory = dict_to_mem_data(data, inventory=inventory) + + del data # forget dict from import, could be large + + logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), + len(inventory.all_group.all_hosts)) + + if self.exclude_empty_groups: + inventory.delete_empty_groups() + + self.all_group = inventory.all_group + + if settings.DEBUG: + # depending on inventory source, this output can be + # *exceedingly* verbose - crawling a deeply nested + # inventory/group data structure and printing metadata about + # each host and its memberships + # + # it's easy for this scale of data to overwhelm pexpect, + # (and it's likely only useful for purposes of debugging the + # actual inventory import code), so only print it if we have to: + # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 + self.all_group.debug_tree() + + with batch_role_ancestor_rebuilding(): + # If using with transaction.atomic() with try ... catch, + # with transaction.atomic() must be inside the try section of the code as per Django docs + try: + # Ensure that this is managed as an atomic SQL transaction, + # and thus properly rolled back if there is an issue. + with transaction.atomic(): + # Merge/overwrite inventory into database. + if settings.SQL_DEBUG: + logger.warning('loading into database...') + with ignore_inventory_computed_fields(): + if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + self.load_into_database() + else: + with disable_activity_stream(): + self.load_into_database() + if settings.SQL_DEBUG: + queries_before2 = len(connection.queries) + self.inventory.update_computed_fields() + if settings.SQL_DEBUG: + logger.warning('update computed fields took %d queries', + len(connection.queries) - queries_before2) + # Check if the license is valid. + # If the license is not valid, a CommandError will be thrown, + # and inventory update will be marked as invalid. + # with transaction.atomic() will roll back the changes. + license_fail = True + self.check_license() + + # Check the per-org host limits + license_fail = False + self.check_org_host_limit() + except CommandError as e: + if license_fail: + self.mark_license_failure() + else: + self.mark_org_limits_failure() + raise e + + if settings.SQL_DEBUG: + logger.warning('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) + else: + logger.info('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) + status = 'successful' + + # If we're in debug mode, then log the queries and time + # used to do the operation. + if settings.SQL_DEBUG: + queries_this_import = connection.queries[queries_before:] + sqltime = sum(float(x['time']) for x in queries_this_import) + logger.warning('Inventory import required %d queries ' + 'taking %0.3fs', len(queries_this_import), + sqltime) + except Exception as e: + if isinstance(e, KeyboardInterrupt): + status = 'canceled' + exc = e + elif isinstance(e, CommandError): + exc = e else: - logger.info('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) - status = 'successful' + tb = traceback.format_exc() + exc = e - # If we're in debug mode, then log the queries and time - # used to do the operation. - if settings.SQL_DEBUG: - queries_this_import = connection.queries[queries_before:] - sqltime = sum(float(x['time']) for x in queries_this_import) - logger.warning('Inventory import required %d queries ' - 'taking %0.3fs', len(queries_this_import), - sqltime) - except Exception as e: - if isinstance(e, KeyboardInterrupt): - status = 'canceled' - exc = e - elif isinstance(e, CommandError): - exc = e - else: - tb = traceback.format_exc() - exc = e + if not self.invoked_from_dispatcher: + with ignore_inventory_computed_fields(): + self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk) + self.inventory_update.result_traceback = tb + self.inventory_update.status = status + self.inventory_update.save(update_fields=['status', 'result_traceback']) + self.inventory_source.status = status + self.inventory_source.save(update_fields=['status']) - if not self.invoked_from_dispatcher: - with ignore_inventory_computed_fields(): - self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk) - self.inventory_update.result_traceback = tb - self.inventory_update.status = status - self.inventory_update.save(update_fields=['status', 'result_traceback']) - self.inventory_source.status = status - self.inventory_source.save(update_fields=['status']) - - if exc: - logger.error(str(exc)) + if exc: + logger.error(str(exc)) if exc: if isinstance(exc, CommandError):