Merge pull request #8323 from AlanCoding/prepare_your_containers

Refactor to hook in programmatic use of inventory import saving-to-DB code

Reviewed-by: Jim Ladd
             https://github.com/jladdjr
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-11-30 20:41:58 +00:00 committed by GitHub
commit b65d9ede81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 419 additions and 310 deletions

View File

@ -30,3 +30,10 @@ class _AwxTaskError():
AwxTaskError = _AwxTaskError()
class PostRunError(Exception):
def __init__(self, msg, status='failed', tb=''):
self.status = status
self.tb = tb
super(PostRunError, self).__init__(msg)

View File

@ -19,6 +19,9 @@ from django.core.management.base import BaseCommand, CommandError
from django.db import connection, transaction
from django.utils.encoding import smart_text
# DRF error class to distinguish license exceptions
from rest_framework.exceptions import PermissionDenied
# AWX inventory imports
from awx.main.models.inventory import (
Inventory,
@ -31,11 +34,12 @@ from awx.main.utils.safe_yaml import sanitize_jinja
# other AWX imports
from awx.main.models.rbac import batch_role_ancestor_rebuilding
# TODO: remove proot utils once we move to running inv. updates in containers
from awx.main.utils import (
ignore_inventory_computed_fields,
check_proot_installed,
wrap_args_with_proot,
build_proot_temp_dir,
ignore_inventory_computed_fields,
get_licenser
)
from awx.main.signals import disable_activity_stream
@ -75,13 +79,11 @@ class AnsibleInventoryLoader(object):
/usr/bin/ansible/ansible-inventory -i hosts --list
'''
def __init__(self, source, is_custom=False, venv_path=None, verbosity=0):
def __init__(self, source, venv_path=None, verbosity=0):
self.source = source
self.source_dir = functioning_dir(self.source)
self.is_custom = is_custom
self.tmp_private_dir = None
self.method = 'ansible-inventory'
self.verbosity = verbosity
# TODO: remove once proot has been removed
self.tmp_private_dir = None
if venv_path:
self.venv_path = venv_path
else:
@ -134,35 +136,31 @@ class AnsibleInventoryLoader(object):
# inside of /venv/ansible, so we override the specified interpreter
# https://github.com/ansible/ansible/issues/50714
bargs = ['python', ansible_inventory_path, '-i', self.source]
bargs.extend(['--playbook-dir', self.source_dir])
bargs.extend(['--playbook-dir', functioning_dir(self.source)])
if self.verbosity:
# INFO: -vvv, DEBUG: -vvvvv, for inventory, any more than 3 makes little difference
bargs.append('-{}'.format('v' * min(5, self.verbosity * 2 + 1)))
logger.debug('Using base command: {}'.format(' '.join(bargs)))
return bargs
# TODO: Remove this once we move to running ansible-inventory in containers
# and don't need proot for process isolation anymore
def get_proot_args(self, cmd, env):
cwd = os.getcwd()
if not check_proot_installed():
raise RuntimeError("proot is not installed but is configured for use")
kwargs = {}
if self.is_custom:
# use source's tmp dir for proot, task manager will delete folder
logger.debug("Using provided directory '{}' for isolation.".format(self.source_dir))
kwargs['proot_temp_dir'] = self.source_dir
cwd = self.source_dir
else:
# we cannot safely store tmp data in source dir or trust script contents
if env['AWX_PRIVATE_DATA_DIR']:
# If this is non-blank, file credentials are being used and we need access
private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR'])
logger.debug("Using private credential data in '{}'.".format(private_data_dir))
kwargs['private_data_dir'] = private_data_dir
self.tmp_private_dir = build_proot_temp_dir()
logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir))
kwargs['proot_temp_dir'] = self.tmp_private_dir
kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS]
# we cannot safely store tmp data in source dir or trust script contents
if env['AWX_PRIVATE_DATA_DIR']:
# If this is non-blank, file credentials are being used and we need access
private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR'])
logger.debug("Using private credential data in '{}'.".format(private_data_dir))
kwargs['private_data_dir'] = private_data_dir
self.tmp_private_dir = build_proot_temp_dir()
logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir))
kwargs['proot_temp_dir'] = self.tmp_private_dir
kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS]
logger.debug("Running from `{}` working directory.".format(cwd))
if self.venv_path != settings.ANSIBLE_VENV_PATH:
@ -170,12 +168,14 @@ class AnsibleInventoryLoader(object):
return wrap_args_with_proot(cmd, cwd, **kwargs)
def command_to_json(self, cmd):
data = {}
stdout, stderr = '', ''
env = self.build_env()
if ((self.is_custom or 'AWX_PRIVATE_DATA_DIR' in env) and
# TODO: remove proot args once inv. updates run in containers
if (('AWX_PRIVATE_DATA_DIR' in env) and
getattr(settings, 'AWX_PROOT_ENABLED', False)):
cmd = self.get_proot_args(cmd, env)
@ -184,11 +184,13 @@ class AnsibleInventoryLoader(object):
stdout = smart_text(stdout)
stderr = smart_text(stderr)
# TODO: can be removed when proot is removed
if self.tmp_private_dir:
shutil.rmtree(self.tmp_private_dir, True)
if proc.returncode != 0:
raise RuntimeError('%s failed (rc=%d) with stdout:\n%s\nstderr:\n%s' % (
self.method, proc.returncode, stdout, stderr))
'ansible-inventory', proc.returncode, stdout, stderr))
for line in stderr.splitlines():
logger.error(line)
@ -231,9 +233,9 @@ class Command(BaseCommand):
action='store_true', default=False,
help='overwrite (rather than merge) variables')
parser.add_argument('--keep-vars', dest='keep_vars', action='store_true', default=False,
help='use database variables if set')
help='DEPRECATED legacy option, has no effect')
parser.add_argument('--custom', dest='custom', action='store_true', default=False,
help='this is a custom inventory script')
help='DEPRECATED indicates a custom inventory script, no longer used')
parser.add_argument('--source', dest='source', type=str, default=None,
metavar='s', help='inventory directory, file, or script to load')
parser.add_argument('--enabled-var', dest='enabled_var', type=str,
@ -259,10 +261,10 @@ class Command(BaseCommand):
'specifies the unique, immutable instance ID, may be '
'specified as "foo.bar" to traverse nested dicts.')
def set_logging_level(self):
def set_logging_level(self, verbosity):
log_levels = dict(enumerate([logging.WARNING, logging.INFO,
logging.DEBUG, 0]))
logger.setLevel(log_levels.get(self.verbosity, 0))
logger.setLevel(log_levels.get(verbosity, 0))
def _get_instance_id(self, variables, default=''):
'''
@ -322,7 +324,8 @@ class Command(BaseCommand):
else:
raise NotImplementedError('Value of enabled {} not understood.'.format(enabled))
def get_source_absolute_path(self, source):
@staticmethod
def get_source_absolute_path(source):
if not os.path.exists(source):
raise IOError('Source does not exist: %s' % source)
source = os.path.join(os.getcwd(), os.path.dirname(source),
@ -330,61 +333,6 @@ class Command(BaseCommand):
source = os.path.normpath(os.path.abspath(source))
return source
def load_inventory_from_database(self):
'''
Load inventory and related objects from the database.
'''
# Load inventory object based on name or ID.
if self.inventory_id:
q = dict(id=self.inventory_id)
else:
q = dict(name=self.inventory_name)
try:
self.inventory = Inventory.objects.get(**q)
except Inventory.DoesNotExist:
raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0])
except Inventory.MultipleObjectsReturned:
raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0])
logger.info('Updating inventory %d: %s' % (self.inventory.pk,
self.inventory.name))
# Load inventory source if specified via environment variable (when
# inventory_import is called from an InventoryUpdate task).
inventory_source_id = os.getenv('INVENTORY_SOURCE_ID', None)
inventory_update_id = os.getenv('INVENTORY_UPDATE_ID', None)
if inventory_source_id:
try:
self.inventory_source = InventorySource.objects.get(pk=inventory_source_id,
inventory=self.inventory)
except InventorySource.DoesNotExist:
raise CommandError('Inventory source with id=%s not found' %
inventory_source_id)
try:
self.inventory_update = InventoryUpdate.objects.get(pk=inventory_update_id)
except InventoryUpdate.DoesNotExist:
raise CommandError('Inventory update with id=%s not found' %
inventory_update_id)
# Otherwise, create a new inventory source to capture this invocation
# via command line.
else:
with ignore_inventory_computed_fields():
self.inventory_source, created = InventorySource.objects.get_or_create(
inventory=self.inventory,
source='file',
source_path=os.path.abspath(self.source),
overwrite=self.overwrite,
overwrite_vars=self.overwrite_vars,
)
self.inventory_update = self.inventory_source.create_inventory_update(
_eager_fields=dict(
job_args=json.dumps(sys.argv),
job_env=dict(os.environ.items()),
job_cwd=os.getcwd())
)
# FIXME: Wait or raise error if inventory is being updated by another
# source.
def _batch_add_m2m(self, related_manager, *objs, **kwargs):
key = (related_manager.instance.pk, related_manager.through._meta.db_table)
flush = bool(kwargs.get('flush', False))
@ -894,9 +842,9 @@ class Command(BaseCommand):
source_vars = self.all_group.variables
remote_license_type = source_vars.get('tower_metadata', {}).get('license_type', None)
if remote_license_type is None:
raise CommandError('Unexpected Error: Tower inventory plugin missing needed metadata!')
raise PermissionDenied('Unexpected Error: Tower inventory plugin missing needed metadata!')
if local_license_type != remote_license_type:
raise CommandError('Tower server licenses must match: source: {} local: {}'.format(
raise PermissionDenied('Tower server licenses must match: source: {} local: {}'.format(
remote_license_type, local_license_type
))
@ -905,7 +853,7 @@ class Command(BaseCommand):
local_license_type = license_info.get('license_type', 'UNLICENSED')
if local_license_type == 'UNLICENSED':
logger.error(LICENSE_NON_EXISTANT_MESSAGE)
raise CommandError('No license found!')
raise PermissionDenied('No license found!')
elif local_license_type == 'open':
return
available_instances = license_info.get('available_instances', 0)
@ -916,13 +864,13 @@ class Command(BaseCommand):
if time_remaining <= 0:
if hard_error:
logger.error(LICENSE_EXPIRED_MESSAGE)
raise CommandError("License has expired!")
raise PermissionDenied("License has expired!")
else:
logger.warning(LICENSE_EXPIRED_MESSAGE)
# special check for tower-type inventory sources
# but only if running the plugin
TOWER_SOURCE_FILES = ['tower.yml', 'tower.yaml']
if self.inventory_source.source == 'tower' and any(f in self.source for f in TOWER_SOURCE_FILES):
if self.inventory_source.source == 'tower' and any(f in self.inventory_source.source_path for f in TOWER_SOURCE_FILES):
# only if this is the 2nd call to license check, we cannot compare before running plugin
if hasattr(self, 'all_group'):
self.remote_tower_license_compare(local_license_type)
@ -933,7 +881,7 @@ class Command(BaseCommand):
}
if hard_error:
logger.error(LICENSE_MESSAGE % d)
raise CommandError('License count exceeded!')
raise PermissionDenied('License count exceeded!')
else:
logger.warning(LICENSE_MESSAGE % d)
@ -948,7 +896,7 @@ class Command(BaseCommand):
active_count = Host.objects.org_active_count(org.id)
if active_count > org.max_hosts:
raise CommandError('Host limit for organization exceeded!')
raise PermissionDenied('Host limit for organization exceeded!')
def mark_license_failure(self, save=True):
self.inventory_update.license_error = True
@ -959,16 +907,103 @@ class Command(BaseCommand):
self.inventory_update.save(update_fields=['org_host_limit_error'])
def handle(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1))
self.set_logging_level()
self.inventory_name = options.get('inventory_name', None)
self.inventory_id = options.get('inventory_id', None)
venv_path = options.get('venv', None)
# Load inventory and related objects from database.
inventory_name = options.get('inventory_name', None)
inventory_id = options.get('inventory_id', None)
if inventory_name and inventory_id:
raise CommandError('--inventory-name and --inventory-id are mutually exclusive')
elif not inventory_name and not inventory_id:
raise CommandError('--inventory-name or --inventory-id is required')
with advisory_lock('inventory_{}_import'.format(inventory_id)):
# Obtain rest of the options needed to run update
raw_source = options.get('source', None)
if not raw_source:
raise CommandError('--source is required')
verbosity = int(options.get('verbosity', 1))
self.set_logging_level(verbosity)
venv_path = options.get('venv', None)
# Load inventory object based on name or ID.
if inventory_id:
q = dict(id=inventory_id)
else:
q = dict(name=inventory_name)
try:
inventory = Inventory.objects.get(**q)
except Inventory.DoesNotExist:
raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0])
except Inventory.MultipleObjectsReturned:
raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0])
logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name))
# Create ad-hoc inventory source and inventory update objects
with ignore_inventory_computed_fields():
source = Command.get_source_absolute_path(raw_source)
inventory_source, created = InventorySource.objects.get_or_create(
inventory=inventory,
source='file',
source_path=os.path.abspath(source),
overwrite=bool(options.get('overwrite', False)),
overwrite_vars=bool(options.get('overwrite_vars', False)),
)
inventory_update = inventory_source.create_inventory_update(
_eager_fields=dict(
job_args=json.dumps(sys.argv),
job_env=dict(os.environ.items()),
job_cwd=os.getcwd())
)
data = AnsibleInventoryLoader(
source=source, venv_path=venv_path, verbosity=verbosity
).load()
logger.debug('Finished loading from source: %s', source)
status, tb, exc = 'error', '', None
try:
self.perform_update(options, data, inventory_update)
status = 'successful'
except Exception as e:
exc = e
if isinstance(e, KeyboardInterrupt):
status = 'canceled'
else:
tb = traceback.format_exc()
with ignore_inventory_computed_fields():
inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk)
inventory_update.result_traceback = tb
inventory_update.status = status
inventory_update.save(update_fields=['status', 'result_traceback'])
inventory_source.status = status
inventory_source.save(update_fields=['status'])
if exc:
logger.error(str(exc))
if exc:
if isinstance(exc, CommandError):
sys.exit(1)
raise exc
def perform_update(self, options, data, inventory_update):
"""Shared method for both awx-manage CLI updates and inventory updates
from the tasks system.
This saves the inventory data to the database, calling load_into_database
but also wraps that method in a host of options processing
"""
# outside of normal options, these are needed as part of programatic interface
self.inventory = inventory_update.inventory
self.inventory_source = inventory_update.inventory_source
self.inventory_update = inventory_update
# the update options, could be parser object or dict
self.overwrite = bool(options.get('overwrite', False))
self.overwrite_vars = bool(options.get('overwrite_vars', False))
self.keep_vars = bool(options.get('keep_vars', False))
self.is_custom = bool(options.get('custom', False))
self.source = options.get('source', None)
self.enabled_var = options.get('enabled_var', None)
self.enabled_value = options.get('enabled_value', None)
self.group_filter = options.get('group_filter', None) or r'^.+$'
@ -976,17 +1011,6 @@ class Command(BaseCommand):
self.exclude_empty_groups = bool(options.get('exclude_empty_groups', False))
self.instance_id_var = options.get('instance_id_var', None)
self.invoked_from_dispatcher = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True
# Load inventory and related objects from database.
if self.inventory_name and self.inventory_id:
raise CommandError('--inventory-name and --inventory-id are mutually exclusive')
elif not self.inventory_name and not self.inventory_id:
raise CommandError('--inventory-name or --inventory-id is required')
if (self.overwrite or self.overwrite_vars) and self.keep_vars:
raise CommandError('--overwrite/--overwrite-vars and --keep-vars are mutually exclusive')
if not self.source:
raise CommandError('--source is required')
try:
self.group_filter_re = re.compile(self.group_filter)
except re.error:
@ -997,146 +1021,115 @@ class Command(BaseCommand):
raise CommandError('invalid regular expression for --host-filter')
begin = time.time()
with advisory_lock('inventory_{}_update'.format(self.inventory_id)):
self.load_inventory_from_database()
# Since perform_update can be invoked either through the awx-manage CLI
# or from the task system, we need to create a new lock at this level
# (even though inventory_import.Command.handle -- which calls
# perform_update -- has its own lock, inventory_ID_import)
with advisory_lock('inventory_{}_perform_update'.format(self.inventory.id)):
try:
self.check_license()
except CommandError as e:
except PermissionDenied as e:
self.mark_license_failure(save=True)
raise e
try:
# Check the per-org host limits
self.check_org_host_limit()
except CommandError as e:
except PermissionDenied as e:
self.mark_org_limits_failure(save=True)
raise e
status, tb, exc = 'error', '', None
try:
if settings.SQL_DEBUG:
queries_before = len(connection.queries)
if settings.SQL_DEBUG:
queries_before = len(connection.queries)
# Update inventory update for this command line invocation.
with ignore_inventory_computed_fields():
iu = self.inventory_update
if iu.status != 'running':
with transaction.atomic():
self.inventory_update.status = 'running'
self.inventory_update.save()
# Update inventory update for this command line invocation.
with ignore_inventory_computed_fields():
# TODO: move this to before perform_update
iu = self.inventory_update
if iu.status != 'running':
with transaction.atomic():
self.inventory_update.status = 'running'
self.inventory_update.save()
source = self.get_source_absolute_path(self.source)
logger.info('Processing JSON output...')
inventory = MemInventory(
group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re)
inventory = dict_to_mem_data(data, inventory=inventory)
data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom,
venv_path=venv_path, verbosity=self.verbosity).load()
logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups),
len(inventory.all_group.all_hosts))
logger.debug('Finished loading from source: %s', source)
logger.info('Processing JSON output...')
inventory = MemInventory(
group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re)
inventory = dict_to_mem_data(data, inventory=inventory)
if self.exclude_empty_groups:
inventory.delete_empty_groups()
del data # forget dict from import, could be large
self.all_group = inventory.all_group
logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups),
len(inventory.all_group.all_hosts))
if settings.DEBUG:
# depending on inventory source, this output can be
# *exceedingly* verbose - crawling a deeply nested
# inventory/group data structure and printing metadata about
# each host and its memberships
#
# it's easy for this scale of data to overwhelm pexpect,
# (and it's likely only useful for purposes of debugging the
# actual inventory import code), so only print it if we have to:
# https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104
self.all_group.debug_tree()
if self.exclude_empty_groups:
inventory.delete_empty_groups()
self.all_group = inventory.all_group
if settings.DEBUG:
# depending on inventory source, this output can be
# *exceedingly* verbose - crawling a deeply nested
# inventory/group data structure and printing metadata about
# each host and its memberships
#
# it's easy for this scale of data to overwhelm pexpect,
# (and it's likely only useful for purposes of debugging the
# actual inventory import code), so only print it if we have to:
# https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104
self.all_group.debug_tree()
with batch_role_ancestor_rebuilding():
# If using with transaction.atomic() with try ... catch,
# with transaction.atomic() must be inside the try section of the code as per Django docs
try:
# Ensure that this is managed as an atomic SQL transaction,
# and thus properly rolled back if there is an issue.
with transaction.atomic():
# Merge/overwrite inventory into database.
if settings.SQL_DEBUG:
logger.warning('loading into database...')
with ignore_inventory_computed_fields():
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
with batch_role_ancestor_rebuilding():
# If using with transaction.atomic() with try ... catch,
# with transaction.atomic() must be inside the try section of the code as per Django docs
try:
# Ensure that this is managed as an atomic SQL transaction,
# and thus properly rolled back if there is an issue.
with transaction.atomic():
# Merge/overwrite inventory into database.
if settings.SQL_DEBUG:
logger.warning('loading into database...')
with ignore_inventory_computed_fields():
if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True):
self.load_into_database()
else:
with disable_activity_stream():
self.load_into_database()
else:
with disable_activity_stream():
self.load_into_database()
if settings.SQL_DEBUG:
queries_before2 = len(connection.queries)
self.inventory.update_computed_fields()
if settings.SQL_DEBUG:
logger.warning('update computed fields took %d queries',
len(connection.queries) - queries_before2)
# Check if the license is valid.
# If the license is not valid, a CommandError will be thrown,
# and inventory update will be marked as invalid.
# with transaction.atomic() will roll back the changes.
license_fail = True
self.check_license()
if settings.SQL_DEBUG:
queries_before2 = len(connection.queries)
self.inventory.update_computed_fields()
if settings.SQL_DEBUG:
logger.warning('update computed fields took %d queries',
len(connection.queries) - queries_before2)
# Check the per-org host limits
license_fail = False
self.check_org_host_limit()
except CommandError as e:
if license_fail:
self.mark_license_failure()
else:
self.mark_org_limits_failure()
raise e
# Check if the license is valid.
# If the license is not valid, a CommandError will be thrown,
# and inventory update will be marked as invalid.
# with transaction.atomic() will roll back the changes.
license_fail = True
self.check_license()
if settings.SQL_DEBUG:
logger.warning('Inventory import completed for %s in %0.1fs',
self.inventory_source.name, time.time() - begin)
# Check the per-org host limits
license_fail = False
self.check_org_host_limit()
except PermissionDenied as e:
if license_fail:
self.mark_license_failure(save=True)
else:
logger.info('Inventory import completed for %s in %0.1fs',
self.inventory_source.name, time.time() - begin)
status = 'successful'
self.mark_org_limits_failure(save=True)
raise e
# If we're in debug mode, then log the queries and time
# used to do the operation.
if settings.SQL_DEBUG:
queries_this_import = connection.queries[queries_before:]
sqltime = sum(float(x['time']) for x in queries_this_import)
logger.warning('Inventory import required %d queries '
'taking %0.3fs', len(queries_this_import),
sqltime)
except Exception as e:
if isinstance(e, KeyboardInterrupt):
status = 'canceled'
exc = e
elif isinstance(e, CommandError):
exc = e
logger.warning('Inventory import completed for %s in %0.1fs',
self.inventory_source.name, time.time() - begin)
else:
tb = traceback.format_exc()
exc = e
logger.info('Inventory import completed for %s in %0.1fs',
self.inventory_source.name, time.time() - begin)
if not self.invoked_from_dispatcher:
with ignore_inventory_computed_fields():
self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk)
self.inventory_update.result_traceback = tb
self.inventory_update.status = status
self.inventory_update.save(update_fields=['status', 'result_traceback'])
self.inventory_source.status = status
self.inventory_source.save(update_fields=['status'])
if exc:
logger.error(str(exc))
if exc:
if isinstance(exc, CommandError):
sys.exit(1)
raise exc
# If we're in debug mode, then log the queries and time
# used to do the operation.
if settings.SQL_DEBUG:
queries_this_import = connection.queries[queries_before:]
sqltime = sum(float(x['time']) for x in queries_this_import)
logger.warning('Inventory import required %d queries '
'taking %0.3fs', len(queries_this_import),
sqltime)

View File

@ -23,7 +23,6 @@ import fcntl
from pathlib import Path
from uuid import uuid4
import urllib.parse as urlparse
import shlex
# Django
from django.conf import settings
@ -64,7 +63,7 @@ from awx.main.models import (
build_safe_env, enforce_bigint_pk_migration
)
from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError
from awx.main.exceptions import AwxTaskError, PostRunError
from awx.main.queue import CallbackQueueDispatcher
from awx.main.isolated import manager as isolated_manager
from awx.main.dispatch.publish import task
@ -79,6 +78,7 @@ from awx.main.utils.external_logging import reconfigure_rsyslog
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
@ -1225,6 +1225,13 @@ class BaseTask(object):
Ansible runner puts a parent_uuid on each event, no matter what the type.
AWX only saves the parent_uuid if the event is for a Job.
'''
# cache end_line locally for RunInventoryUpdate tasks
# which generate job events from two 'streams':
# ansible-inventory and the awx.main.commands.inventory_import
# logger
if isinstance(self, RunInventoryUpdate):
self.end_line = event_data['end_line']
if event_data.get(self.event_data_key, None):
if self.event_data_key != 'job_id':
event_data.pop('parent_uuid', None)
@ -1521,6 +1528,12 @@ class BaseTask(object):
try:
self.post_run_hook(self.instance, status)
except PostRunError as exc:
if status == 'successful':
status = exc.status
extra_update_fields['job_explanation'] = exc.args[0]
if exc.tb:
extra_update_fields['result_traceback'] = exc.tb
except Exception:
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))
@ -2462,6 +2475,14 @@ class RunInventoryUpdate(BaseTask):
event_model = InventoryUpdateEvent
event_data_key = 'inventory_update_id'
# TODO: remove once inv updates run in containers
def should_use_proot(self, inventory_update):
'''
Return whether this task should use proot.
'''
return getattr(settings, 'AWX_PROOT_ENABLED', False)
# TODO: remove once inv updates run in containers
@property
def proot_show_paths(self):
return [settings.AWX_ANSIBLE_COLLECTIONS_PATHS]
@ -2486,15 +2507,11 @@ class RunInventoryUpdate(BaseTask):
return injector.build_private_data(inventory_update, private_data_dir)
def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None):
"""Build environment dictionary for inventory import.
"""Build environment dictionary for ansible-inventory.
This used to be the mechanism by which any data that needs to be passed
to the inventory update script is set up. In particular, this is how
inventory update is aware of its proper credentials.
Most environment injection is now accomplished by the credential
injectors. The primary purpose this still serves is to
still point to the inventory update INI or config file.
Most environment variables related to credentials or configuration
are accomplished by the inventory source injectors (in this method)
or custom credential type injectors (in main run method).
"""
env = super(RunInventoryUpdate, self).build_env(inventory_update,
private_data_dir,
@ -2502,8 +2519,11 @@ class RunInventoryUpdate(BaseTask):
private_data_files=private_data_files)
if private_data_files is None:
private_data_files = {}
self.add_awx_venv(env)
# Pass inventory source ID to inventory script.
# TODO: remove once containers replace custom venvs
self.add_ansible_venv(inventory_update.ansible_virtualenv_path, env, isolated=isolated)
# Legacy environment variables, were used as signal to awx-manage command
# now they are provided in case some scripts may be relying on them
env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id)
env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk)
env.update(STANDARD_INVENTORY_UPDATE_ENV)
@ -2566,47 +2586,25 @@ class RunInventoryUpdate(BaseTask):
if inventory is None:
raise RuntimeError('Inventory Source is not associated with an Inventory.')
# Piece together the initial command to run via. the shell.
args = ['awx-manage', 'inventory_import']
args.extend(['--inventory-id', str(inventory.pk)])
args = ['ansible-inventory', '--list', '--export']
# Add appropriate arguments for overwrite if the inventory_update
# object calls for it.
if inventory_update.overwrite:
args.append('--overwrite')
if inventory_update.overwrite_vars:
args.append('--overwrite-vars')
# Add arguments for the source inventory file/script/thing
source_location = self.pseudo_build_inventory(inventory_update, private_data_dir)
args.append('-i')
args.append(source_location)
# Declare the virtualenv the management command should activate
# as it calls ansible-inventory
args.extend(['--venv', inventory_update.ansible_virtualenv_path])
args.append('--output')
args.append(os.path.join(private_data_dir, 'artifacts', 'output.json'))
src = inventory_update.source
if inventory_update.enabled_var:
args.extend(['--enabled-var', shlex.quote(inventory_update.enabled_var)])
args.extend(['--enabled-value', shlex.quote(inventory_update.enabled_value)])
if os.path.isdir(source_location):
playbook_dir = source_location
else:
if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False):
args.extend(['--enabled-var',
getattr(settings, '%s_ENABLED_VAR' % src.upper())])
if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False):
args.extend(['--enabled-value',
getattr(settings, '%s_ENABLED_VALUE' % src.upper())])
if inventory_update.host_filter:
args.extend(['--host-filter', shlex.quote(inventory_update.host_filter)])
if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()):
args.append('--exclude-empty-groups')
if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False):
args.extend(['--instance-id-var',
"'{}'".format(getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())),])
# Add arguments for the source inventory script
args.append('--source')
args.append(self.pseudo_build_inventory(inventory_update, private_data_dir))
if src == 'custom':
args.append("--custom")
args.append('-v%d' % inventory_update.verbosity)
if settings.DEBUG:
args.append('--traceback')
playbook_dir = os.path.dirname(source_location)
args.extend(['--playbook-dir', playbook_dir])
if inventory_update.verbosity:
args.append('-' + 'v' * min(5, inventory_update.verbosity * 2 + 1))
return args
def build_inventory(self, inventory_update, private_data_dir):
@ -2646,11 +2644,9 @@ class RunInventoryUpdate(BaseTask):
def build_cwd(self, inventory_update, private_data_dir):
'''
There are two cases where the inventory "source" is in a different
There is one case where the inventory "source" is in a different
location from the private data:
- deprecated vendored inventory scripts in awx/plugins/inventory
- SCM, where source needs to live in the project folder
in these cases, the inventory does not exist in the standard tempdir
'''
src = inventory_update.source
if src == 'scm' and inventory_update.source_project_update:
@ -2708,6 +2704,75 @@ class RunInventoryUpdate(BaseTask):
# This follows update, not sync, so make copy here
RunProjectUpdate.make_local_copy(source_project, private_data_dir)
def post_run_hook(self, inventory_update, status):
if status != 'successful':
return # nothing to save, step out of the way to allow error reporting
private_data_dir = inventory_update.job_env['AWX_PRIVATE_DATA_DIR']
expected_output = os.path.join(private_data_dir, 'artifacts', 'output.json')
with open(expected_output) as f:
data = json.load(f)
# build inventory save options
options = dict(
overwrite=inventory_update.overwrite,
overwrite_vars=inventory_update.overwrite_vars,
)
src = inventory_update.source
if inventory_update.enabled_var:
options['enabled_var'] = inventory_update.enabled_var
options['enabled_value'] = inventory_update.enabled_value
else:
if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False):
options['enabled_var'] = getattr(settings, '%s_ENABLED_VAR' % src.upper())
if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False):
options['enabled_value'] = getattr(settings, '%s_ENABLED_VALUE' % src.upper())
if inventory_update.host_filter:
options['host_filter'] = inventory_update.host_filter
if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()):
options['exclude_empty_groups'] = True
if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False):
options['instance_id_var'] = getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())
# Verbosity is applied to saving process, as well as ansible-inventory CLI option
if inventory_update.verbosity:
options['verbosity'] = inventory_update.verbosity
handler = SpecialInventoryHandler(
self.event_handler, self.cancel_callback,
verbosity=inventory_update.verbosity,
job_timeout=self.get_instance_timeout(self.instance),
start_time=inventory_update.started,
counter=self.event_ct, initial_line=self.end_line
)
inv_logger = logging.getLogger('awx.main.commands.inventory_import')
formatter = inv_logger.handlers[0].formatter
formatter.job_start = inventory_update.started
handler.formatter = formatter
inv_logger.handlers[0] = handler
from awx.main.management.commands.inventory_import import Command as InventoryImportCommand
cmd = InventoryImportCommand()
try:
# save the inventory data to database.
# canceling exceptions will be handled in the global post_run_hook
cmd.perform_update(options, data, inventory_update)
except PermissionDenied as exc:
logger.exception('License error saving {} content'.format(inventory_update.log_format))
raise PostRunError(str(exc), status='error')
except PostRunError:
logger.exception('Error saving {} content, rolling back changes'.format(inventory_update.log_format))
raise
except Exception:
logger.exception('Exception saving {} content, rolling back changes.'.format(
inventory_update.log_format))
raise PostRunError(
'Error occured while saving inventory data, see traceback or server logs',
status='error', tb=traceback.format_exc())
@task(queue=get_local_queuename)
class RunAdHocCommand(BaseTask):

View File

@ -9,6 +9,9 @@ import os
# Django
from django.core.management.base import CommandError
# for license errors
from rest_framework.exceptions import PermissionDenied
# AWX
from awx.main.management.commands import inventory_import
from awx.main.models import Inventory, Host, Group, InventorySource
@ -83,7 +86,7 @@ class MockLoader:
return self._data
def mock_logging(self):
def mock_logging(self, level):
pass
@ -322,6 +325,6 @@ def test_tower_version_compare():
"version": "2.0.1-1068-g09684e2c41"
}
}
with pytest.raises(CommandError):
with pytest.raises(PermissionDenied):
cmd.remote_tower_license_compare('very_supported')
cmd.remote_tower_license_compare('open')

View File

@ -214,6 +214,9 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential
f"'{inventory_filename}' file not found in inventory update runtime files {content.keys()}"
env.pop('ANSIBLE_COLLECTIONS_PATHS', None) # collection paths not relevant to this test
env.pop('PYTHONPATH')
env.pop('VIRTUAL_ENV')
env.pop('PROOT_TMP_DIR')
base_dir = os.path.join(DATA, 'plugins')
if not os.path.exists(base_dir):
os.mkdir(base_dir)

View File

@ -33,32 +33,6 @@ class TestInvalidOptions:
assert 'inventory-id' in str(err.value)
assert 'exclusive' in str(err.value)
def test_invalid_options_id_and_keep_vars(self):
# You can't overwrite and keep_vars at the same time, that wouldn't make sense
cmd = Command()
with pytest.raises(CommandError) as err:
cmd.handle(
inventory_id=42, overwrite=True, keep_vars=True
)
assert 'overwrite-vars' in str(err.value)
assert 'exclusive' in str(err.value)
def test_invalid_options_id_but_no_source(self):
# Need a source to import
cmd = Command()
with pytest.raises(CommandError) as err:
cmd.handle(
inventory_id=42, overwrite=True, keep_vars=True
)
assert 'overwrite-vars' in str(err.value)
assert 'exclusive' in str(err.value)
with pytest.raises(CommandError) as err:
cmd.handle(
inventory_id=42, overwrite_vars=True, keep_vars=True
)
assert 'overwrite-vars' in str(err.value)
assert 'exclusive' in str(err.value)
def test_invalid_options_missing_source(self):
cmd = Command()
with pytest.raises(CommandError) as err:

View File

@ -2061,8 +2061,8 @@ class TestInventoryUpdateCredentials(TestJobExecution):
credential, env, {}, [], private_data_dir
)
assert '--custom' in ' '.join(args)
script = args[args.index('--source') + 1]
assert '-i' in ' '.join(args)
script = args[args.index('-i') + 1]
with open(script, 'r') as f:
assert f.read() == inventory_update.source_script.script
assert env['FOO'] == 'BAR'

View File

@ -9,6 +9,7 @@ import socket
from datetime import datetime
from dateutil.tz import tzutc
from django.utils.timezone import now
from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings
@ -17,8 +18,15 @@ class TimeFormatter(logging.Formatter):
'''
Custom log formatter used for inventory imports
'''
def __init__(self, start_time=None, **kwargs):
if start_time is None:
self.job_start = now()
else:
self.job_start = start_time
super(TimeFormatter, self).__init__(**kwargs)
def format(self, record):
record.relativeSeconds = record.relativeCreated / 1000.0
record.relativeSeconds = (now() - self.job_start).total_seconds()
return logging.Formatter.format(self, record)

View File

@ -7,6 +7,10 @@ import os.path
# Django
from django.conf import settings
from django.utils.timezone import now
# AWX
from awx.main.exceptions import PostRunError
class RSysLogHandler(logging.handlers.SysLogHandler):
@ -40,6 +44,58 @@ class RSysLogHandler(logging.handlers.SysLogHandler):
pass
class SpecialInventoryHandler(logging.Handler):
"""Logging handler used for the saving-to-database part of inventory updates
ran by the task system
this dispatches events directly to be processed by the callback receiver,
as opposed to ansible-runner
"""
def __init__(self, event_handler, cancel_callback, job_timeout, verbosity,
start_time=None, counter=0, initial_line=0, **kwargs):
self.event_handler = event_handler
self.cancel_callback = cancel_callback
self.job_timeout = job_timeout
if start_time is None:
self.job_start = now()
else:
self.job_start = start_time
self.last_check = self.job_start
self.counter = counter
self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity]
self._current_line = initial_line
super(SpecialInventoryHandler, self).__init__(**kwargs)
def emit(self, record):
# check cancel and timeout status regardless of log level
this_time = now()
if (this_time - self.last_check).total_seconds() > 0.5: # cancel callback is expensive
self.last_check = this_time
if self.cancel_callback():
raise PostRunError('Inventory update has been canceled', status='canceled')
if self.job_timeout and ((this_time - self.job_start).total_seconds() > self.job_timeout):
raise PostRunError('Inventory update has timed out', status='canceled')
# skip logging for low severity logs
if record.levelno < self.skip_level:
return
self.counter += 1
msg = self.format(record)
n_lines = len(msg.strip().split('\n')) # don't count line breaks at boundry of text
dispatch_data = dict(
created=now().isoformat(),
event='verbose',
counter=self.counter,
stdout=msg,
start_line=self._current_line,
end_line=self._current_line + n_lines
)
self._current_line += n_lines
self.event_handler(dispatch_data)
ColorHandler = logging.StreamHandler
if settings.COLOR_LOGS is True: