Start on refactor to hook in inventory programatic use

This commit is contained in:
Alan Rominger 2020-10-05 12:09:03 -04:00
parent a45f586599
commit 39fa70c58b
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559

View File

@ -33,9 +33,6 @@ from awx.main.utils.safe_yaml import sanitize_jinja
from awx.main.models.rbac import batch_role_ancestor_rebuilding
from awx.main.utils import (
ignore_inventory_computed_fields,
check_proot_installed,
wrap_args_with_proot,
build_proot_temp_dir,
get_licenser
)
from awx.main.signals import disable_activity_stream
@ -75,12 +72,8 @@ class AnsibleInventoryLoader(object):
/usr/bin/ansible/ansible-inventory -i hosts --list
'''
def __init__(self, source, is_custom=False, venv_path=None, verbosity=0):
def __init__(self, source, venv_path=None, verbosity=0):
self.source = source
self.source_dir = functioning_dir(self.source)
self.is_custom = is_custom
self.tmp_private_dir = None
self.method = 'ansible-inventory'
self.verbosity = verbosity
if venv_path:
self.venv_path = venv_path
@ -134,61 +127,26 @@ class AnsibleInventoryLoader(object):
# inside of /venv/ansible, so we override the specified interpreter
# https://github.com/ansible/ansible/issues/50714
bargs = ['python', ansible_inventory_path, '-i', self.source]
bargs.extend(['--playbook-dir', self.source_dir])
bargs.extend(['--playbook-dir', functioning_dir(self.source)])
if self.verbosity:
# INFO: -vvv, DEBUG: -vvvvv, for inventory, any more than 3 makes little difference
bargs.append('-{}'.format('v' * min(5, self.verbosity * 2 + 1)))
logger.debug('Using base command: {}'.format(' '.join(bargs)))
return bargs
def get_proot_args(self, cmd, env):
cwd = os.getcwd()
if not check_proot_installed():
raise RuntimeError("proot is not installed but is configured for use")
kwargs = {}
if self.is_custom:
# use source's tmp dir for proot, task manager will delete folder
logger.debug("Using provided directory '{}' for isolation.".format(self.source_dir))
kwargs['proot_temp_dir'] = self.source_dir
cwd = self.source_dir
else:
# we cannot safely store tmp data in source dir or trust script contents
if env['AWX_PRIVATE_DATA_DIR']:
# If this is non-blank, file credentials are being used and we need access
private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR'])
logger.debug("Using private credential data in '{}'.".format(private_data_dir))
kwargs['private_data_dir'] = private_data_dir
self.tmp_private_dir = build_proot_temp_dir()
logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir))
kwargs['proot_temp_dir'] = self.tmp_private_dir
kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS]
logger.debug("Running from `{}` working directory.".format(cwd))
if self.venv_path != settings.ANSIBLE_VENV_PATH:
kwargs['proot_custom_virtualenv'] = self.venv_path
return wrap_args_with_proot(cmd, cwd, **kwargs)
def command_to_json(self, cmd):
data = {}
stdout, stderr = '', ''
env = self.build_env()
if ((self.is_custom or 'AWX_PRIVATE_DATA_DIR' in env) and
getattr(settings, 'AWX_PROOT_ENABLED', False)):
cmd = self.get_proot_args(cmd, env)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
stdout, stderr = proc.communicate()
stdout = smart_text(stdout)
stderr = smart_text(stderr)
if self.tmp_private_dir:
shutil.rmtree(self.tmp_private_dir, True)
if proc.returncode != 0:
raise RuntimeError('%s failed (rc=%d) with stdout:\n%s\nstderr:\n%s' % (
self.method, proc.returncode, stdout, stderr))
'ansible-inventory', proc.returncode, stdout, stderr))
for line in stderr.splitlines():
logger.error(line)
@ -231,9 +189,9 @@ class Command(BaseCommand):
action='store_true', default=False,
help='overwrite (rather than merge) variables')
parser.add_argument('--keep-vars', dest='keep_vars', action='store_true', default=False,
help='use database variables if set')
help='DEPRECATED legacy option, has no effect')
parser.add_argument('--custom', dest='custom', action='store_true', default=False,
help='this is a custom inventory script')
help='DEPRECATED indicates a custom inventory script, no longer used')
parser.add_argument('--source', dest='source', type=str, default=None,
metavar='s', help='inventory directory, file, or script to load')
parser.add_argument('--enabled-var', dest='enabled_var', type=str,
@ -322,7 +280,8 @@ class Command(BaseCommand):
else:
raise NotImplementedError('Value of enabled {} not understood.'.format(enabled))
def get_source_absolute_path(self, source):
@staticmethod
def get_source_absolute_path(source):
if not os.path.exists(source):
raise IOError('Source does not exist: %s' % source)
source = os.path.join(os.getcwd(), os.path.dirname(source),
@ -330,61 +289,6 @@ class Command(BaseCommand):
source = os.path.normpath(os.path.abspath(source))
return source
def load_inventory_from_database(self):
'''
Load inventory and related objects from the database.
'''
# Load inventory object based on name or ID.
if self.inventory_id:
q = dict(id=self.inventory_id)
else:
q = dict(name=self.inventory_name)
try:
self.inventory = Inventory.objects.get(**q)
except Inventory.DoesNotExist:
raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0])
except Inventory.MultipleObjectsReturned:
raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0])
logger.info('Updating inventory %d: %s' % (self.inventory.pk,
self.inventory.name))
# Load inventory source if specified via environment variable (when
# inventory_import is called from an InventoryUpdate task).
inventory_source_id = os.getenv('INVENTORY_SOURCE_ID', None)
inventory_update_id = os.getenv('INVENTORY_UPDATE_ID', None)
if inventory_source_id:
try:
self.inventory_source = InventorySource.objects.get(pk=inventory_source_id,
inventory=self.inventory)
except InventorySource.DoesNotExist:
raise CommandError('Inventory source with id=%s not found' %
inventory_source_id)
try:
self.inventory_update = InventoryUpdate.objects.get(pk=inventory_update_id)
except InventoryUpdate.DoesNotExist:
raise CommandError('Inventory update with id=%s not found' %
inventory_update_id)
# Otherwise, create a new inventory source to capture this invocation
# via command line.
else:
with ignore_inventory_computed_fields():
self.inventory_source, created = InventorySource.objects.get_or_create(
inventory=self.inventory,
source='file',
source_path=os.path.abspath(self.source),
overwrite=self.overwrite,
overwrite_vars=self.overwrite_vars,
)
self.inventory_update = self.inventory_source.create_inventory_update(
_eager_fields=dict(
job_args=json.dumps(sys.argv),
job_env=dict(os.environ.items()),
job_cwd=os.getcwd())
)
# FIXME: Wait or raise error if inventory is being updated by another
# source.
def _batch_add_m2m(self, related_manager, *objs, **kwargs):
key = (related_manager.instance.pk, related_manager.through._meta.db_table)
flush = bool(kwargs.get('flush', False))
@ -959,16 +863,91 @@ class Command(BaseCommand):
self.inventory_update.save(update_fields=['org_host_limit_error'])
def handle(self, *args, **options):
# Load inventory and related objects from database.
inventory_name = options.get('inventory_name', None)
inventory_id = options.get('inventory_id', None)
if inventory_name and inventory_id:
raise CommandError('--inventory-name and --inventory-id are mutually exclusive')
elif not inventory_name and not inventory_id:
raise CommandError('--inventory-name or --inventory-id is required')
# Load inventory object based on name or ID.
if inventory_id:
q = dict(id=inventory_id)
else:
q = dict(name=inventory_name)
try:
inventory = Inventory.objects.get(**q)
except Inventory.DoesNotExist:
raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0])
except Inventory.MultipleObjectsReturned:
raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0])
logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name))
# Obtain rest of the options needed to run update
raw_source = options.get('source', None)
if not raw_source:
raise CommandError('--source is required')
source = Command.get_source_absolute_path(raw_source)
verbosity = int(options.get('verbosity', 1))
venv_path = options.get('venv', None)
# Create ad-hoc inventory source and inventory update objects
with ignore_inventory_computed_fields():
inventory_source, created = InventorySource.objects.get_or_create(
inventory=inventory,
source='file',
source_path=os.path.abspath(source),
overwrite=bool(options.get('overwrite', False)),
overwrite_vars=bool(options.get('overwrite_vars', False)),
)
inventory_update = inventory_source.create_inventory_update(
_eager_fields=dict(
job_args=json.dumps(sys.argv),
job_env=dict(os.environ.items()),
job_cwd=os.getcwd())
)
data = AnsibleInventoryLoader(
source=source, venv_path=venv_path, verbosity=verbosity
).load()
logger.debug('Finished loading from source: %s', source)
status, tb, exc = self.perform_update(options, data, inventory_update)
with ignore_inventory_computed_fields():
inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk)
inventory_update.result_traceback = tb
inventory_update.status = status
inventory_update.save(update_fields=['status', 'result_traceback'])
inventory_source.status = status
inventory_source.save(update_fields=['status'])
if exc:
logger.error(str(exc))
if exc:
if isinstance(exc, CommandError):
sys.exit(1)
raise exc
def perform_update(self, options, data, inventory_update):
"""Shared method for both awx-manage CLI updates and inventory updates
from the tasks system.
This saves the inventory data to the database, calling load_into_database
but also wraps that method in a host of options processing
"""
# outside of normal options, these are needed as part of programatic interface
self.inventory = inventory_update.inventory
self.inventory_source = inventory_update.inventory_source
self.inventory_update = inventory_update
# the update options, could be parser object or dict
self.verbosity = int(options.get('verbosity', 1))
self.set_logging_level()
self.inventory_name = options.get('inventory_name', None)
self.inventory_id = options.get('inventory_id', None)
venv_path = options.get('venv', None)
self.overwrite = bool(options.get('overwrite', False))
self.overwrite_vars = bool(options.get('overwrite_vars', False))
self.keep_vars = bool(options.get('keep_vars', False))
self.is_custom = bool(options.get('custom', False))
self.source = options.get('source', None)
self.enabled_var = options.get('enabled_var', None)
self.enabled_value = options.get('enabled_value', None)
self.group_filter = options.get('group_filter', None) or r'^.+$'
@ -976,17 +955,6 @@ class Command(BaseCommand):
self.exclude_empty_groups = bool(options.get('exclude_empty_groups', False))
self.instance_id_var = options.get('instance_id_var', None)
self.invoked_from_dispatcher = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True
# Load inventory and related objects from database.
if self.inventory_name and self.inventory_id:
raise CommandError('--inventory-name and --inventory-id are mutually exclusive')
elif not self.inventory_name and not self.inventory_id:
raise CommandError('--inventory-name or --inventory-id is required')
if (self.overwrite or self.overwrite_vars) and self.keep_vars:
raise CommandError('--overwrite/--overwrite-vars and --keep-vars are mutually exclusive')
if not self.source:
raise CommandError('--source is required')
try:
self.group_filter_re = re.compile(self.group_filter)
except re.error:
@ -997,7 +965,7 @@ class Command(BaseCommand):
raise CommandError('invalid regular expression for --host-filter')
begin = time.time()
with advisory_lock('inventory_{}_update'.format(self.inventory_id)):
with advisory_lock('inventory_{}_update'.format(self.inventory.id)):
self.load_inventory_from_database()
try:
@ -1026,19 +994,11 @@ class Command(BaseCommand):
self.inventory_update.status = 'running'
self.inventory_update.save()
source = self.get_source_absolute_path(self.source)
data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom,
venv_path=venv_path, verbosity=self.verbosity).load()
logger.debug('Finished loading from source: %s', source)
logger.info('Processing JSON output...')
inventory = MemInventory(
group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re)
inventory = dict_to_mem_data(data, inventory=inventory)
del data # forget dict from import, could be large
logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups),
len(inventory.all_group.all_hosts))
@ -1067,6 +1027,17 @@ class Command(BaseCommand):
# and thus properly rolled back if there is an issue.
with transaction.atomic():
# Merge/overwrite inventory into database.
if settings.SQL_DEBUG:
logger.warning('loading into database...')
with ignore_inventory_computed_fields():
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
self.load_into_database()
else:
with disable_activity_stream():
self.load_into_database()
if settings.SQL_DEBUG:
queries_before2 = len(connection.queries)
self.inventory.update_computed_fields()
if settings.SQL_DEBUG:
logger.warning('loading into database...')
with ignore_inventory_computed_fields():
@ -1124,19 +1095,4 @@ class Command(BaseCommand):
tb = traceback.format_exc()
exc = e
if not self.invoked_from_dispatcher:
with ignore_inventory_computed_fields():
self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk)
self.inventory_update.result_traceback = tb
self.inventory_update.status = status
self.inventory_update.save(update_fields=['status', 'result_traceback'])
self.inventory_source.status = status
self.inventory_source.save(update_fields=['status'])
if exc:
logger.error(str(exc))
if exc:
if isinstance(exc, CommandError):
sys.exit(1)
raise exc
return status, tb, exc