mirror of
https://github.com/ansible/awx.git
synced 2026-04-13 22:19:27 -02:30
Updates to a given inventory should happen serially
This commit is contained in:
@@ -874,21 +874,20 @@ class Command(BaseCommand):
|
|||||||
Load inventory from in-memory groups to the database, overwriting or
|
Load inventory from in-memory groups to the database, overwriting or
|
||||||
merging as appropriate.
|
merging as appropriate.
|
||||||
'''
|
'''
|
||||||
with advisory_lock('inventory_{}_update'.format(self.inventory.id)):
|
# FIXME: Attribute changes to superuser?
|
||||||
# FIXME: Attribute changes to superuser?
|
# Perform __in queries in batches (mainly for unit tests using SQLite).
|
||||||
# Perform __in queries in batches (mainly for unit tests using SQLite).
|
self._batch_size = 500
|
||||||
self._batch_size = 500
|
self._build_db_instance_id_map()
|
||||||
self._build_db_instance_id_map()
|
self._build_mem_instance_id_map()
|
||||||
self._build_mem_instance_id_map()
|
if self.overwrite:
|
||||||
if self.overwrite:
|
self._delete_hosts()
|
||||||
self._delete_hosts()
|
self._delete_groups()
|
||||||
self._delete_groups()
|
self._delete_group_children_and_hosts()
|
||||||
self._delete_group_children_and_hosts()
|
self._update_inventory()
|
||||||
self._update_inventory()
|
self._create_update_groups()
|
||||||
self._create_update_groups()
|
self._create_update_hosts()
|
||||||
self._create_update_hosts()
|
self._create_update_group_children()
|
||||||
self._create_update_group_children()
|
self._create_update_group_hosts()
|
||||||
self._create_update_group_hosts()
|
|
||||||
|
|
||||||
def remote_tower_license_compare(self, local_license_type):
|
def remote_tower_license_compare(self, local_license_type):
|
||||||
# this requires https://github.com/ansible/ansible/pull/52747
|
# this requires https://github.com/ansible/ansible/pull/52747
|
||||||
@@ -998,143 +997,144 @@ class Command(BaseCommand):
|
|||||||
raise CommandError('invalid regular expression for --host-filter')
|
raise CommandError('invalid regular expression for --host-filter')
|
||||||
|
|
||||||
begin = time.time()
|
begin = time.time()
|
||||||
self.load_inventory_from_database()
|
with advisory_lock('inventory_{}_update'.format(self.inventory_id)):
|
||||||
|
self.load_inventory_from_database()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.check_license()
|
self.check_license()
|
||||||
except CommandError as e:
|
except CommandError as e:
|
||||||
self.mark_license_failure(save=True)
|
self.mark_license_failure(save=True)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Check the per-org host limits
|
# Check the per-org host limits
|
||||||
self.check_org_host_limit()
|
self.check_org_host_limit()
|
||||||
except CommandError as e:
|
except CommandError as e:
|
||||||
self.mark_org_limits_failure(save=True)
|
self.mark_org_limits_failure(save=True)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
status, tb, exc = 'error', '', None
|
|
||||||
try:
|
|
||||||
if settings.SQL_DEBUG:
|
|
||||||
queries_before = len(connection.queries)
|
|
||||||
|
|
||||||
# Update inventory update for this command line invocation.
|
|
||||||
with ignore_inventory_computed_fields():
|
|
||||||
iu = self.inventory_update
|
|
||||||
if iu.status != 'running':
|
|
||||||
with transaction.atomic():
|
|
||||||
self.inventory_update.status = 'running'
|
|
||||||
self.inventory_update.save()
|
|
||||||
|
|
||||||
source = self.get_source_absolute_path(self.source)
|
|
||||||
|
|
||||||
data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom,
|
|
||||||
venv_path=venv_path, verbosity=self.verbosity).load()
|
|
||||||
|
|
||||||
logger.debug('Finished loading from source: %s', source)
|
|
||||||
logger.info('Processing JSON output...')
|
|
||||||
inventory = MemInventory(
|
|
||||||
group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re)
|
|
||||||
inventory = dict_to_mem_data(data, inventory=inventory)
|
|
||||||
|
|
||||||
del data # forget dict from import, could be large
|
|
||||||
|
|
||||||
logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups),
|
|
||||||
len(inventory.all_group.all_hosts))
|
|
||||||
|
|
||||||
if self.exclude_empty_groups:
|
|
||||||
inventory.delete_empty_groups()
|
|
||||||
|
|
||||||
self.all_group = inventory.all_group
|
|
||||||
|
|
||||||
if settings.DEBUG:
|
|
||||||
# depending on inventory source, this output can be
|
|
||||||
# *exceedingly* verbose - crawling a deeply nested
|
|
||||||
# inventory/group data structure and printing metadata about
|
|
||||||
# each host and its memberships
|
|
||||||
#
|
|
||||||
# it's easy for this scale of data to overwhelm pexpect,
|
|
||||||
# (and it's likely only useful for purposes of debugging the
|
|
||||||
# actual inventory import code), so only print it if we have to:
|
|
||||||
# https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104
|
|
||||||
self.all_group.debug_tree()
|
|
||||||
|
|
||||||
with batch_role_ancestor_rebuilding():
|
|
||||||
# If using with transaction.atomic() with try ... catch,
|
|
||||||
# with transaction.atomic() must be inside the try section of the code as per Django docs
|
|
||||||
try:
|
|
||||||
# Ensure that this is managed as an atomic SQL transaction,
|
|
||||||
# and thus properly rolled back if there is an issue.
|
|
||||||
with transaction.atomic():
|
|
||||||
# Merge/overwrite inventory into database.
|
|
||||||
if settings.SQL_DEBUG:
|
|
||||||
logger.warning('loading into database...')
|
|
||||||
with ignore_inventory_computed_fields():
|
|
||||||
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
|
|
||||||
self.load_into_database()
|
|
||||||
else:
|
|
||||||
with disable_activity_stream():
|
|
||||||
self.load_into_database()
|
|
||||||
if settings.SQL_DEBUG:
|
|
||||||
queries_before2 = len(connection.queries)
|
|
||||||
self.inventory.update_computed_fields()
|
|
||||||
if settings.SQL_DEBUG:
|
|
||||||
logger.warning('update computed fields took %d queries',
|
|
||||||
len(connection.queries) - queries_before2)
|
|
||||||
# Check if the license is valid.
|
|
||||||
# If the license is not valid, a CommandError will be thrown,
|
|
||||||
# and inventory update will be marked as invalid.
|
|
||||||
# with transaction.atomic() will roll back the changes.
|
|
||||||
license_fail = True
|
|
||||||
self.check_license()
|
|
||||||
|
|
||||||
# Check the per-org host limits
|
|
||||||
license_fail = False
|
|
||||||
self.check_org_host_limit()
|
|
||||||
except CommandError as e:
|
|
||||||
if license_fail:
|
|
||||||
self.mark_license_failure()
|
|
||||||
else:
|
|
||||||
self.mark_org_limits_failure()
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
status, tb, exc = 'error', '', None
|
||||||
|
try:
|
||||||
if settings.SQL_DEBUG:
|
if settings.SQL_DEBUG:
|
||||||
logger.warning('Inventory import completed for %s in %0.1fs',
|
queries_before = len(connection.queries)
|
||||||
self.inventory_source.name, time.time() - begin)
|
|
||||||
|
# Update inventory update for this command line invocation.
|
||||||
|
with ignore_inventory_computed_fields():
|
||||||
|
iu = self.inventory_update
|
||||||
|
if iu.status != 'running':
|
||||||
|
with transaction.atomic():
|
||||||
|
self.inventory_update.status = 'running'
|
||||||
|
self.inventory_update.save()
|
||||||
|
|
||||||
|
source = self.get_source_absolute_path(self.source)
|
||||||
|
|
||||||
|
data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom,
|
||||||
|
venv_path=venv_path, verbosity=self.verbosity).load()
|
||||||
|
|
||||||
|
logger.debug('Finished loading from source: %s', source)
|
||||||
|
logger.info('Processing JSON output...')
|
||||||
|
inventory = MemInventory(
|
||||||
|
group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re)
|
||||||
|
inventory = dict_to_mem_data(data, inventory=inventory)
|
||||||
|
|
||||||
|
del data # forget dict from import, could be large
|
||||||
|
|
||||||
|
logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups),
|
||||||
|
len(inventory.all_group.all_hosts))
|
||||||
|
|
||||||
|
if self.exclude_empty_groups:
|
||||||
|
inventory.delete_empty_groups()
|
||||||
|
|
||||||
|
self.all_group = inventory.all_group
|
||||||
|
|
||||||
|
if settings.DEBUG:
|
||||||
|
# depending on inventory source, this output can be
|
||||||
|
# *exceedingly* verbose - crawling a deeply nested
|
||||||
|
# inventory/group data structure and printing metadata about
|
||||||
|
# each host and its memberships
|
||||||
|
#
|
||||||
|
# it's easy for this scale of data to overwhelm pexpect,
|
||||||
|
# (and it's likely only useful for purposes of debugging the
|
||||||
|
# actual inventory import code), so only print it if we have to:
|
||||||
|
# https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104
|
||||||
|
self.all_group.debug_tree()
|
||||||
|
|
||||||
|
with batch_role_ancestor_rebuilding():
|
||||||
|
# If using with transaction.atomic() with try ... catch,
|
||||||
|
# with transaction.atomic() must be inside the try section of the code as per Django docs
|
||||||
|
try:
|
||||||
|
# Ensure that this is managed as an atomic SQL transaction,
|
||||||
|
# and thus properly rolled back if there is an issue.
|
||||||
|
with transaction.atomic():
|
||||||
|
# Merge/overwrite inventory into database.
|
||||||
|
if settings.SQL_DEBUG:
|
||||||
|
logger.warning('loading into database...')
|
||||||
|
with ignore_inventory_computed_fields():
|
||||||
|
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
|
||||||
|
self.load_into_database()
|
||||||
|
else:
|
||||||
|
with disable_activity_stream():
|
||||||
|
self.load_into_database()
|
||||||
|
if settings.SQL_DEBUG:
|
||||||
|
queries_before2 = len(connection.queries)
|
||||||
|
self.inventory.update_computed_fields()
|
||||||
|
if settings.SQL_DEBUG:
|
||||||
|
logger.warning('update computed fields took %d queries',
|
||||||
|
len(connection.queries) - queries_before2)
|
||||||
|
# Check if the license is valid.
|
||||||
|
# If the license is not valid, a CommandError will be thrown,
|
||||||
|
# and inventory update will be marked as invalid.
|
||||||
|
# with transaction.atomic() will roll back the changes.
|
||||||
|
license_fail = True
|
||||||
|
self.check_license()
|
||||||
|
|
||||||
|
# Check the per-org host limits
|
||||||
|
license_fail = False
|
||||||
|
self.check_org_host_limit()
|
||||||
|
except CommandError as e:
|
||||||
|
if license_fail:
|
||||||
|
self.mark_license_failure()
|
||||||
|
else:
|
||||||
|
self.mark_org_limits_failure()
|
||||||
|
raise e
|
||||||
|
|
||||||
|
if settings.SQL_DEBUG:
|
||||||
|
logger.warning('Inventory import completed for %s in %0.1fs',
|
||||||
|
self.inventory_source.name, time.time() - begin)
|
||||||
|
else:
|
||||||
|
logger.info('Inventory import completed for %s in %0.1fs',
|
||||||
|
self.inventory_source.name, time.time() - begin)
|
||||||
|
status = 'successful'
|
||||||
|
|
||||||
|
# If we're in debug mode, then log the queries and time
|
||||||
|
# used to do the operation.
|
||||||
|
if settings.SQL_DEBUG:
|
||||||
|
queries_this_import = connection.queries[queries_before:]
|
||||||
|
sqltime = sum(float(x['time']) for x in queries_this_import)
|
||||||
|
logger.warning('Inventory import required %d queries '
|
||||||
|
'taking %0.3fs', len(queries_this_import),
|
||||||
|
sqltime)
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, KeyboardInterrupt):
|
||||||
|
status = 'canceled'
|
||||||
|
exc = e
|
||||||
|
elif isinstance(e, CommandError):
|
||||||
|
exc = e
|
||||||
else:
|
else:
|
||||||
logger.info('Inventory import completed for %s in %0.1fs',
|
tb = traceback.format_exc()
|
||||||
self.inventory_source.name, time.time() - begin)
|
exc = e
|
||||||
status = 'successful'
|
|
||||||
|
|
||||||
# If we're in debug mode, then log the queries and time
|
if not self.invoked_from_dispatcher:
|
||||||
# used to do the operation.
|
with ignore_inventory_computed_fields():
|
||||||
if settings.SQL_DEBUG:
|
self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk)
|
||||||
queries_this_import = connection.queries[queries_before:]
|
self.inventory_update.result_traceback = tb
|
||||||
sqltime = sum(float(x['time']) for x in queries_this_import)
|
self.inventory_update.status = status
|
||||||
logger.warning('Inventory import required %d queries '
|
self.inventory_update.save(update_fields=['status', 'result_traceback'])
|
||||||
'taking %0.3fs', len(queries_this_import),
|
self.inventory_source.status = status
|
||||||
sqltime)
|
self.inventory_source.save(update_fields=['status'])
|
||||||
except Exception as e:
|
|
||||||
if isinstance(e, KeyboardInterrupt):
|
|
||||||
status = 'canceled'
|
|
||||||
exc = e
|
|
||||||
elif isinstance(e, CommandError):
|
|
||||||
exc = e
|
|
||||||
else:
|
|
||||||
tb = traceback.format_exc()
|
|
||||||
exc = e
|
|
||||||
|
|
||||||
if not self.invoked_from_dispatcher:
|
if exc:
|
||||||
with ignore_inventory_computed_fields():
|
logger.error(str(exc))
|
||||||
self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk)
|
|
||||||
self.inventory_update.result_traceback = tb
|
|
||||||
self.inventory_update.status = status
|
|
||||||
self.inventory_update.save(update_fields=['status', 'result_traceback'])
|
|
||||||
self.inventory_source.status = status
|
|
||||||
self.inventory_source.save(update_fields=['status'])
|
|
||||||
|
|
||||||
if exc:
|
|
||||||
logger.error(str(exc))
|
|
||||||
|
|
||||||
if exc:
|
if exc:
|
||||||
if isinstance(exc, CommandError):
|
if isinstance(exc, CommandError):
|
||||||
|
|||||||
Reference in New Issue
Block a user