From 39fa70c58b44be5506d67273fff5a180300cae1d Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 5 Oct 2020 12:09:03 -0400 Subject: [PATCH 01/20] Start on refactor to hook in inventory programatic use --- .../management/commands/inventory_import.py | 246 +++++++----------- 1 file changed, 101 insertions(+), 145 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index c92215560e..3434869c42 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -33,9 +33,6 @@ from awx.main.utils.safe_yaml import sanitize_jinja from awx.main.models.rbac import batch_role_ancestor_rebuilding from awx.main.utils import ( ignore_inventory_computed_fields, - check_proot_installed, - wrap_args_with_proot, - build_proot_temp_dir, get_licenser ) from awx.main.signals import disable_activity_stream @@ -75,12 +72,8 @@ class AnsibleInventoryLoader(object): /usr/bin/ansible/ansible-inventory -i hosts --list ''' - def __init__(self, source, is_custom=False, venv_path=None, verbosity=0): + def __init__(self, source, venv_path=None, verbosity=0): self.source = source - self.source_dir = functioning_dir(self.source) - self.is_custom = is_custom - self.tmp_private_dir = None - self.method = 'ansible-inventory' self.verbosity = verbosity if venv_path: self.venv_path = venv_path @@ -134,61 +127,26 @@ class AnsibleInventoryLoader(object): # inside of /venv/ansible, so we override the specified interpreter # https://github.com/ansible/ansible/issues/50714 bargs = ['python', ansible_inventory_path, '-i', self.source] - bargs.extend(['--playbook-dir', self.source_dir]) + bargs.extend(['--playbook-dir', functioning_dir(self.source)]) if self.verbosity: # INFO: -vvv, DEBUG: -vvvvv, for inventory, any more than 3 makes little difference bargs.append('-{}'.format('v' * min(5, self.verbosity * 2 + 1))) logger.debug('Using base command: {}'.format(' '.join(bargs))) return bargs - def get_proot_args(self, cmd, env): - cwd = os.getcwd() - if not check_proot_installed(): - raise RuntimeError("proot is not installed but is configured for use") - - kwargs = {} - if self.is_custom: - # use source's tmp dir for proot, task manager will delete folder - logger.debug("Using provided directory '{}' for isolation.".format(self.source_dir)) - kwargs['proot_temp_dir'] = self.source_dir - cwd = self.source_dir - else: - # we cannot safely store tmp data in source dir or trust script contents - if env['AWX_PRIVATE_DATA_DIR']: - # If this is non-blank, file credentials are being used and we need access - private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR']) - logger.debug("Using private credential data in '{}'.".format(private_data_dir)) - kwargs['private_data_dir'] = private_data_dir - self.tmp_private_dir = build_proot_temp_dir() - logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir)) - kwargs['proot_temp_dir'] = self.tmp_private_dir - kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS] - logger.debug("Running from `{}` working directory.".format(cwd)) - - if self.venv_path != settings.ANSIBLE_VENV_PATH: - kwargs['proot_custom_virtualenv'] = self.venv_path - - return wrap_args_with_proot(cmd, cwd, **kwargs) - def command_to_json(self, cmd): data = {} stdout, stderr = '', '' env = self.build_env() - if ((self.is_custom or 'AWX_PRIVATE_DATA_DIR' in env) and - getattr(settings, 'AWX_PROOT_ENABLED', False)): - cmd = self.get_proot_args(cmd, env) - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = proc.communicate() stdout = smart_text(stdout) stderr = smart_text(stderr) - if self.tmp_private_dir: - shutil.rmtree(self.tmp_private_dir, True) if proc.returncode != 0: raise RuntimeError('%s failed (rc=%d) with stdout:\n%s\nstderr:\n%s' % ( - self.method, proc.returncode, stdout, stderr)) + 'ansible-inventory', proc.returncode, stdout, stderr)) for line in stderr.splitlines(): logger.error(line) @@ -231,9 +189,9 @@ class Command(BaseCommand): action='store_true', default=False, help='overwrite (rather than merge) variables') parser.add_argument('--keep-vars', dest='keep_vars', action='store_true', default=False, - help='use database variables if set') + help='DEPRECATED legacy option, has no effect') parser.add_argument('--custom', dest='custom', action='store_true', default=False, - help='this is a custom inventory script') + help='DEPRECATED indicates a custom inventory script, no longer used') parser.add_argument('--source', dest='source', type=str, default=None, metavar='s', help='inventory directory, file, or script to load') parser.add_argument('--enabled-var', dest='enabled_var', type=str, @@ -322,7 +280,8 @@ class Command(BaseCommand): else: raise NotImplementedError('Value of enabled {} not understood.'.format(enabled)) - def get_source_absolute_path(self, source): + @staticmethod + def get_source_absolute_path(source): if not os.path.exists(source): raise IOError('Source does not exist: %s' % source) source = os.path.join(os.getcwd(), os.path.dirname(source), @@ -330,61 +289,6 @@ class Command(BaseCommand): source = os.path.normpath(os.path.abspath(source)) return source - def load_inventory_from_database(self): - ''' - Load inventory and related objects from the database. - ''' - # Load inventory object based on name or ID. - if self.inventory_id: - q = dict(id=self.inventory_id) - else: - q = dict(name=self.inventory_name) - try: - self.inventory = Inventory.objects.get(**q) - except Inventory.DoesNotExist: - raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) - except Inventory.MultipleObjectsReturned: - raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) - logger.info('Updating inventory %d: %s' % (self.inventory.pk, - self.inventory.name)) - - # Load inventory source if specified via environment variable (when - # inventory_import is called from an InventoryUpdate task). - inventory_source_id = os.getenv('INVENTORY_SOURCE_ID', None) - inventory_update_id = os.getenv('INVENTORY_UPDATE_ID', None) - if inventory_source_id: - try: - self.inventory_source = InventorySource.objects.get(pk=inventory_source_id, - inventory=self.inventory) - except InventorySource.DoesNotExist: - raise CommandError('Inventory source with id=%s not found' % - inventory_source_id) - try: - self.inventory_update = InventoryUpdate.objects.get(pk=inventory_update_id) - except InventoryUpdate.DoesNotExist: - raise CommandError('Inventory update with id=%s not found' % - inventory_update_id) - # Otherwise, create a new inventory source to capture this invocation - # via command line. - else: - with ignore_inventory_computed_fields(): - self.inventory_source, created = InventorySource.objects.get_or_create( - inventory=self.inventory, - source='file', - source_path=os.path.abspath(self.source), - overwrite=self.overwrite, - overwrite_vars=self.overwrite_vars, - ) - self.inventory_update = self.inventory_source.create_inventory_update( - _eager_fields=dict( - job_args=json.dumps(sys.argv), - job_env=dict(os.environ.items()), - job_cwd=os.getcwd()) - ) - - # FIXME: Wait or raise error if inventory is being updated by another - # source. - def _batch_add_m2m(self, related_manager, *objs, **kwargs): key = (related_manager.instance.pk, related_manager.through._meta.db_table) flush = bool(kwargs.get('flush', False)) @@ -959,16 +863,91 @@ class Command(BaseCommand): self.inventory_update.save(update_fields=['org_host_limit_error']) def handle(self, *args, **options): + # Load inventory and related objects from database. + inventory_name = options.get('inventory_name', None) + inventory_id = options.get('inventory_id', None) + if inventory_name and inventory_id: + raise CommandError('--inventory-name and --inventory-id are mutually exclusive') + elif not inventory_name and not inventory_id: + raise CommandError('--inventory-name or --inventory-id is required') + + # Load inventory object based on name or ID. + if inventory_id: + q = dict(id=inventory_id) + else: + q = dict(name=inventory_name) + try: + inventory = Inventory.objects.get(**q) + except Inventory.DoesNotExist: + raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) + except Inventory.MultipleObjectsReturned: + raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) + logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) + + # Obtain rest of the options needed to run update + raw_source = options.get('source', None) + if not raw_source: + raise CommandError('--source is required') + source = Command.get_source_absolute_path(raw_source) + verbosity = int(options.get('verbosity', 1)) + venv_path = options.get('venv', None) + + # Create ad-hoc inventory source and inventory update objects + with ignore_inventory_computed_fields(): + inventory_source, created = InventorySource.objects.get_or_create( + inventory=inventory, + source='file', + source_path=os.path.abspath(source), + overwrite=bool(options.get('overwrite', False)), + overwrite_vars=bool(options.get('overwrite_vars', False)), + ) + inventory_update = inventory_source.create_inventory_update( + _eager_fields=dict( + job_args=json.dumps(sys.argv), + job_env=dict(os.environ.items()), + job_cwd=os.getcwd()) + ) + + data = AnsibleInventoryLoader( + source=source, venv_path=venv_path, verbosity=verbosity + ).load() + + logger.debug('Finished loading from source: %s', source) + status, tb, exc = self.perform_update(options, data, inventory_update) + + with ignore_inventory_computed_fields(): + inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk) + inventory_update.result_traceback = tb + inventory_update.status = status + inventory_update.save(update_fields=['status', 'result_traceback']) + inventory_source.status = status + inventory_source.save(update_fields=['status']) + + if exc: + logger.error(str(exc)) + + if exc: + if isinstance(exc, CommandError): + sys.exit(1) + raise exc + + def perform_update(self, options, data, inventory_update): + """Shared method for both awx-manage CLI updates and inventory updates + from the tasks system. + + This saves the inventory data to the database, calling load_into_database + but also wraps that method in a host of options processing + """ + # outside of normal options, these are needed as part of programatic interface + self.inventory = inventory_update.inventory + self.inventory_source = inventory_update.inventory_source + self.inventory_update = inventory_update + + # the update options, could be parser object or dict self.verbosity = int(options.get('verbosity', 1)) self.set_logging_level() - self.inventory_name = options.get('inventory_name', None) - self.inventory_id = options.get('inventory_id', None) - venv_path = options.get('venv', None) self.overwrite = bool(options.get('overwrite', False)) self.overwrite_vars = bool(options.get('overwrite_vars', False)) - self.keep_vars = bool(options.get('keep_vars', False)) - self.is_custom = bool(options.get('custom', False)) - self.source = options.get('source', None) self.enabled_var = options.get('enabled_var', None) self.enabled_value = options.get('enabled_value', None) self.group_filter = options.get('group_filter', None) or r'^.+$' @@ -976,17 +955,6 @@ class Command(BaseCommand): self.exclude_empty_groups = bool(options.get('exclude_empty_groups', False)) self.instance_id_var = options.get('instance_id_var', None) - self.invoked_from_dispatcher = False if os.getenv('INVENTORY_SOURCE_ID', None) is None else True - - # Load inventory and related objects from database. - if self.inventory_name and self.inventory_id: - raise CommandError('--inventory-name and --inventory-id are mutually exclusive') - elif not self.inventory_name and not self.inventory_id: - raise CommandError('--inventory-name or --inventory-id is required') - if (self.overwrite or self.overwrite_vars) and self.keep_vars: - raise CommandError('--overwrite/--overwrite-vars and --keep-vars are mutually exclusive') - if not self.source: - raise CommandError('--source is required') try: self.group_filter_re = re.compile(self.group_filter) except re.error: @@ -997,7 +965,7 @@ class Command(BaseCommand): raise CommandError('invalid regular expression for --host-filter') begin = time.time() - with advisory_lock('inventory_{}_update'.format(self.inventory_id)): + with advisory_lock('inventory_{}_update'.format(self.inventory.id)): self.load_inventory_from_database() try: @@ -1026,19 +994,11 @@ class Command(BaseCommand): self.inventory_update.status = 'running' self.inventory_update.save() - source = self.get_source_absolute_path(self.source) - - data = AnsibleInventoryLoader(source=source, is_custom=self.is_custom, - venv_path=venv_path, verbosity=self.verbosity).load() - - logger.debug('Finished loading from source: %s', source) logger.info('Processing JSON output...') inventory = MemInventory( group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) inventory = dict_to_mem_data(data, inventory=inventory) - del data # forget dict from import, could be large - logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), len(inventory.all_group.all_hosts)) @@ -1067,6 +1027,17 @@ class Command(BaseCommand): # and thus properly rolled back if there is an issue. with transaction.atomic(): # Merge/overwrite inventory into database. + if settings.SQL_DEBUG: + logger.warning('loading into database...') + with ignore_inventory_computed_fields(): + if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + self.load_into_database() + else: + with disable_activity_stream(): + self.load_into_database() + if settings.SQL_DEBUG: + queries_before2 = len(connection.queries) + self.inventory.update_computed_fields() if settings.SQL_DEBUG: logger.warning('loading into database...') with ignore_inventory_computed_fields(): @@ -1124,19 +1095,4 @@ class Command(BaseCommand): tb = traceback.format_exc() exc = e - if not self.invoked_from_dispatcher: - with ignore_inventory_computed_fields(): - self.inventory_update = InventoryUpdate.objects.get(pk=self.inventory_update.pk) - self.inventory_update.result_traceback = tb - self.inventory_update.status = status - self.inventory_update.save(update_fields=['status', 'result_traceback']) - self.inventory_source.status = status - self.inventory_source.save(update_fields=['status']) - - if exc: - logger.error(str(exc)) - - if exc: - if isinstance(exc, CommandError): - sys.exit(1) - raise exc + return status, tb, exc From ae9ae14e5ac38b7484f10057077d8f60ccc5c0fb Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 5 Oct 2020 14:33:06 -0400 Subject: [PATCH 02/20] Migrate inventory CLI options to programatic interface POC, successfully importing with this commit Attempt to surface saving related errors as a part of that --- awx/main/tasks.py | 106 +++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 43 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 82aa6a6de9..5e51bac386 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -23,7 +23,6 @@ import fcntl from pathlib import Path from uuid import uuid4 import urllib.parse as urlparse -import shlex # Django from django.conf import settings @@ -2566,47 +2565,18 @@ class RunInventoryUpdate(BaseTask): if inventory is None: raise RuntimeError('Inventory Source is not associated with an Inventory.') - # Piece together the initial command to run via. the shell. - args = ['awx-manage', 'inventory_import'] - args.extend(['--inventory-id', str(inventory.pk)]) + args = ['ansible-inventory', '--list', '--export'] - # Add appropriate arguments for overwrite if the inventory_update - # object calls for it. - if inventory_update.overwrite: - args.append('--overwrite') - if inventory_update.overwrite_vars: - args.append('--overwrite-vars') - - # Declare the virtualenv the management command should activate - # as it calls ansible-inventory - args.extend(['--venv', inventory_update.ansible_virtualenv_path]) - - src = inventory_update.source - if inventory_update.enabled_var: - args.extend(['--enabled-var', shlex.quote(inventory_update.enabled_var)]) - args.extend(['--enabled-value', shlex.quote(inventory_update.enabled_value)]) - else: - if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False): - args.extend(['--enabled-var', - getattr(settings, '%s_ENABLED_VAR' % src.upper())]) - if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False): - args.extend(['--enabled-value', - getattr(settings, '%s_ENABLED_VALUE' % src.upper())]) - if inventory_update.host_filter: - args.extend(['--host-filter', shlex.quote(inventory_update.host_filter)]) - if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()): - args.append('--exclude-empty-groups') - if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False): - args.extend(['--instance-id-var', - "'{}'".format(getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())),]) - # Add arguments for the source inventory script - args.append('--source') + # Add arguments for the source inventory file/script/thing + args.append('-i') args.append(self.pseudo_build_inventory(inventory_update, private_data_dir)) - if src == 'custom': - args.append("--custom") - args.append('-v%d' % inventory_update.verbosity) - if settings.DEBUG: - args.append('--traceback') + + args.append('--output') + args.append(os.path.join(private_data_dir, 'artifacts', 'output.json')) + + if inventory_update.verbosity: + args.append('-' + 'v' * inventory_update.verbosity) + return args def build_inventory(self, inventory_update, private_data_dir): @@ -2646,11 +2616,9 @@ class RunInventoryUpdate(BaseTask): def build_cwd(self, inventory_update, private_data_dir): ''' - There are two cases where the inventory "source" is in a different + There is one case where the inventory "source" is in a different location from the private data: - - deprecated vendored inventory scripts in awx/plugins/inventory - SCM, where source needs to live in the project folder - in these cases, the inventory does not exist in the standard tempdir ''' src = inventory_update.source if src == 'scm' and inventory_update.source_project_update: @@ -2708,6 +2676,58 @@ class RunInventoryUpdate(BaseTask): # This follows update, not sync, so make copy here RunProjectUpdate.make_local_copy(source_project, private_data_dir) + def post_run_hook(self, inventory_update, status): + if status != 'successful': + return # nothing to save, step out of the way to allow error reporting + + private_data_dir = inventory_update.job_env['AWX_PRIVATE_DATA_DIR'] + expected_output = os.path.join(private_data_dir, 'artifacts', 'output.json') + with open(expected_output) as f: + data = json.load(f) + + # build inventory save options + options = dict( + overwrite=inventory_update.overwrite, + overwrite_vars=inventory_update.overwrite_vars, + ) + src = inventory_update.source + + if inventory_update.enabled_var: + options['enabled_var'] = inventory_update.enabled_var + options['enabled_value'] = inventory_update.enabled_value + else: + if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False): + options['enabled_var'] = getattr(settings, '%s_ENABLED_VAR' % src.upper()) + if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False): + options['enabled_value'] = getattr(settings, '%s_ENABLED_VALUE' % src.upper()) + + if inventory_update.host_filter: + options['host_filter'] = inventory_update.host_filter + + if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()): + options['exclude_empty_groups'] = True + if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False): + options['instance_id_var'] = getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper()) + + # Verbosity is applied to saving process, as well as ansible-inventory CLI option + if inventory_update.verbosity: + options['verbosity'] = inventory_update.verbosity + + from awx.main.management.commands.inventory_import import Command as InventoryImportCommand + cmd = InventoryImportCommand() + save_status, tb, exc = cmd.perform_update(options, data, inventory_update) + + model_updates = {} + if save_status != inventory_update.status: + model_updates['status'] = save_status + if tb: + model_updates['result_traceback'] = tb + + if model_updates: + logger.debug('{} saw problems saving to database.'.format(inventory_update.log_format)) + model_updates['job_explanation'] = 'Update failed to save all changes to database properly' + self.update_model(inventory_update.pk, **model_updates) + @task(queue=get_local_queuename) class RunAdHocCommand(BaseTask): From 96fc38d182df9187588931b729c78d7733067c06 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 5 Oct 2020 23:52:10 -0400 Subject: [PATCH 03/20] Swap in-memory logger to write stdout in post_save_hook This commit makes the needed changes to inventory update post_save_hook logic so that the historic log lines that inventory updates write will be written to stdout, but this hack bypasses the ansible-runner verbose event logic and dispatches verbose events directly. Fix the venv application with the ansible-inventory system (note: much of this is undone in a later commit) Deal with some minor test updates for the ansible-inventory interface changes --- .../management/commands/inventory_import.py | 45 +++----- awx/main/tasks.py | 100 +++++++++++++++--- .../commands/test_inventory_import.py | 2 +- .../test_inventory_source_injectors.py | 2 + .../unit/commands/test_inventory_import.py | 26 ----- awx/main/tests/unit/test_tasks.py | 4 +- 6 files changed, 106 insertions(+), 73 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 3434869c42..d987d3739a 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -217,10 +217,10 @@ class Command(BaseCommand): 'specifies the unique, immutable instance ID, may be ' 'specified as "foo.bar" to traverse nested dicts.') - def set_logging_level(self): + def set_logging_level(self, verbosity): log_levels = dict(enumerate([logging.WARNING, logging.INFO, logging.DEBUG, 0])) - logger.setLevel(log_levels.get(self.verbosity, 0)) + logger.setLevel(log_levels.get(verbosity, 0)) def _get_instance_id(self, variables, default=''): ''' @@ -826,7 +826,7 @@ class Command(BaseCommand): # special check for tower-type inventory sources # but only if running the plugin TOWER_SOURCE_FILES = ['tower.yml', 'tower.yaml'] - if self.inventory_source.source == 'tower' and any(f in self.source for f in TOWER_SOURCE_FILES): + if self.inventory_source.source == 'tower' and any(f in self.inventory_source.source_path for f in TOWER_SOURCE_FILES): # only if this is the 2nd call to license check, we cannot compare before running plugin if hasattr(self, 'all_group'): self.remote_tower_license_compare(local_license_type) @@ -871,6 +871,15 @@ class Command(BaseCommand): elif not inventory_name and not inventory_id: raise CommandError('--inventory-name or --inventory-id is required') + # Obtain rest of the options needed to run update + raw_source = options.get('source', None) + if not raw_source: + raise CommandError('--source is required') + source = Command.get_source_absolute_path(raw_source) + verbosity = int(options.get('verbosity', 1)) + self.set_logging_level(verbosity) + venv_path = options.get('venv', None) + # Load inventory object based on name or ID. if inventory_id: q = dict(id=inventory_id) @@ -884,14 +893,6 @@ class Command(BaseCommand): raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) - # Obtain rest of the options needed to run update - raw_source = options.get('source', None) - if not raw_source: - raise CommandError('--source is required') - source = Command.get_source_absolute_path(raw_source) - verbosity = int(options.get('verbosity', 1)) - venv_path = options.get('venv', None) - # Create ad-hoc inventory source and inventory update objects with ignore_inventory_computed_fields(): inventory_source, created = InventorySource.objects.get_or_create( @@ -944,8 +945,6 @@ class Command(BaseCommand): self.inventory_update = inventory_update # the update options, could be parser object or dict - self.verbosity = int(options.get('verbosity', 1)) - self.set_logging_level() self.overwrite = bool(options.get('overwrite', False)) self.overwrite_vars = bool(options.get('overwrite_vars', False)) self.enabled_var = options.get('enabled_var', None) @@ -1039,19 +1038,9 @@ class Command(BaseCommand): queries_before2 = len(connection.queries) self.inventory.update_computed_fields() if settings.SQL_DEBUG: - logger.warning('loading into database...') - with ignore_inventory_computed_fields(): - if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): - self.load_into_database() - else: - with disable_activity_stream(): - self.load_into_database() - if settings.SQL_DEBUG: - queries_before2 = len(connection.queries) - self.inventory.update_computed_fields() - if settings.SQL_DEBUG: - logger.warning('update computed fields took %d queries', - len(connection.queries) - queries_before2) + logger.warning('update computed fields took %d queries', + len(connection.queries) - queries_before2) + # Check if the license is valid. # If the license is not valid, a CommandError will be thrown, # and inventory update will be marked as invalid. @@ -1064,9 +1053,9 @@ class Command(BaseCommand): self.check_org_host_limit() except CommandError as e: if license_fail: - self.mark_license_failure() + self.mark_license_failure(save=True) else: - self.mark_org_limits_failure() + self.mark_org_limits_failure(save=True) raise e if settings.SQL_DEBUG: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5e51bac386..04ac11b219 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2485,15 +2485,11 @@ class RunInventoryUpdate(BaseTask): return injector.build_private_data(inventory_update, private_data_dir) def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None): - """Build environment dictionary for inventory import. + """Build environment dictionary for ansible-inventory. - This used to be the mechanism by which any data that needs to be passed - to the inventory update script is set up. In particular, this is how - inventory update is aware of its proper credentials. - - Most environment injection is now accomplished by the credential - injectors. The primary purpose this still serves is to - still point to the inventory update INI or config file. + Most environment variables related to credentials or configuration + are accomplished by the inventory source injectors (in this method) + or custom credential type injectors (in main run method). """ env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, @@ -2501,8 +2497,10 @@ class RunInventoryUpdate(BaseTask): private_data_files=private_data_files) if private_data_files is None: private_data_files = {} - self.add_awx_venv(env) - # Pass inventory source ID to inventory script. + self.add_ansible_venv(settings.ANSIBLE_VENV_PATH, env) + + # Legacy environment variables, were used as signal to awx-manage command + # now they are provided in case some scripts may be relying on them env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id) env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk) env.update(STANDARD_INVENTORY_UPDATE_ENV) @@ -2568,14 +2566,21 @@ class RunInventoryUpdate(BaseTask): args = ['ansible-inventory', '--list', '--export'] # Add arguments for the source inventory file/script/thing + source_location = self.pseudo_build_inventory(inventory_update, private_data_dir) args.append('-i') - args.append(self.pseudo_build_inventory(inventory_update, private_data_dir)) + args.append(source_location) args.append('--output') args.append(os.path.join(private_data_dir, 'artifacts', 'output.json')) + if os.path.isdir(source_location): + playbook_dir = source_location + else: + playbook_dir = os.path.dirname(source_location) + args.extend(['--playbook-dir', playbook_dir]) + if inventory_update.verbosity: - args.append('-' + 'v' * inventory_update.verbosity) + args.append('-' + 'v' * min(5, inventory_update.verbosity * 2 + 1)) return args @@ -2713,19 +2718,82 @@ class RunInventoryUpdate(BaseTask): if inventory_update.verbosity: options['verbosity'] = inventory_update.verbosity + # Mock ansible-runner events + class CallbackHandler(logging.Handler): + def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, + counter=0, **kwargs): + self.event_handler = event_handler + self.cancel_callback = cancel_callback + self.job_timeout = job_timeout + self.job_start = time.time() + self.last_check = self.job_start + # TODO: we do not have events from the ansible-inventory process + # so there is no way to know initial counter of start line + self.counter = counter + self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity] + self._start_line = 0 + super(CallbackHandler, self).__init__(**kwargs) + + def emit(self, record): + this_time = time.time() + if this_time - self.last_check > 0.5: + self.last_check = this_time + if self.cancel_callback(): + raise RuntimeError('Inventory update has been canceled') + if self.job_timeout and ((this_time - self.job_start) > self.job_timeout): + raise RuntimeError('Inventory update has timed out') + + # skip logging for low severity logs + if record.levelno < self.skip_level: + return + + self.counter += 1 + msg = self.format(record) + n_lines = msg.count('\n') + dispatch_data = dict( + created=now().isoformat(), + event='verbose', + counter=self.counter, + stdout=msg + '\n', + start_line=self._start_line, + end_line=self._start_line + n_lines + ) + self._start_line += n_lines + 1 + + self.event_handler(dispatch_data) + + handler = CallbackHandler( + self.event_handler, self.cancel_callback, + verbosity=inventory_update.verbosity, + job_timeout=self.get_instance_timeout(self.instance), + counter=self.event_ct + ) + inv_logger = logging.getLogger('awx.main.commands.inventory_import') + handler.formatter = inv_logger.handlers[0].formatter + inv_logger.handlers[0] = handler + from awx.main.management.commands.inventory_import import Command as InventoryImportCommand cmd = InventoryImportCommand() - save_status, tb, exc = cmd.perform_update(options, data, inventory_update) + try: + save_status, tb, exc = cmd.perform_update(options, data, inventory_update) + except Exception as raw_exc: + # Ignore license errors specifically + if 'Host limit for organization' not in str(exc) and 'License' not in str(exc): + raise raw_exc model_updates = {} - if save_status != inventory_update.status: + if save_status != status: model_updates['status'] = save_status if tb: model_updates['result_traceback'] = tb if model_updates: - logger.debug('{} saw problems saving to database.'.format(inventory_update.log_format)) - model_updates['job_explanation'] = 'Update failed to save all changes to database properly' + logger.info('{} had problems saving to database with {}'.format( + inventory_update.log_format, ', '.join(list(model_updates.keys())) + )) + model_updates['job_explanation'] = 'Update failed to save all changes to database properly.' + if exc: + model_updates['job_explanation'] += ' {}'.format(exc) self.update_model(inventory_update.pk, **model_updates) diff --git a/awx/main/tests/functional/commands/test_inventory_import.py b/awx/main/tests/functional/commands/test_inventory_import.py index a0b1095c98..3fe4b92b5a 100644 --- a/awx/main/tests/functional/commands/test_inventory_import.py +++ b/awx/main/tests/functional/commands/test_inventory_import.py @@ -83,7 +83,7 @@ class MockLoader: return self._data -def mock_logging(self): +def mock_logging(self, level): pass diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index 4601668d25..dc54301d01 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -214,6 +214,8 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential f"'{inventory_filename}' file not found in inventory update runtime files {content.keys()}" env.pop('ANSIBLE_COLLECTIONS_PATHS', None) # collection paths not relevant to this test + env.pop('PYTHONPATH') + env.pop('VIRTUAL_ENV') base_dir = os.path.join(DATA, 'plugins') if not os.path.exists(base_dir): os.mkdir(base_dir) diff --git a/awx/main/tests/unit/commands/test_inventory_import.py b/awx/main/tests/unit/commands/test_inventory_import.py index 105086bcb8..db3e01408b 100644 --- a/awx/main/tests/unit/commands/test_inventory_import.py +++ b/awx/main/tests/unit/commands/test_inventory_import.py @@ -33,32 +33,6 @@ class TestInvalidOptions: assert 'inventory-id' in str(err.value) assert 'exclusive' in str(err.value) - def test_invalid_options_id_and_keep_vars(self): - # You can't overwrite and keep_vars at the same time, that wouldn't make sense - cmd = Command() - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - - def test_invalid_options_id_but_no_source(self): - # Need a source to import - cmd = Command() - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - with pytest.raises(CommandError) as err: - cmd.handle( - inventory_id=42, overwrite_vars=True, keep_vars=True - ) - assert 'overwrite-vars' in str(err.value) - assert 'exclusive' in str(err.value) - def test_invalid_options_missing_source(self): cmd = Command() with pytest.raises(CommandError) as err: diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 0de5dce996..8dd9225f00 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -2061,8 +2061,8 @@ class TestInventoryUpdateCredentials(TestJobExecution): credential, env, {}, [], private_data_dir ) - assert '--custom' in ' '.join(args) - script = args[args.index('--source') + 1] + assert '-i' in ' '.join(args) + script = args[args.index('-i') + 1] with open(script, 'r') as f: assert f.read() == inventory_update.source_script.script assert env['FOO'] == 'BAR' From e61f79c8c3067b598c84cbc09188df7e447a71d5 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Fri, 9 Oct 2020 15:33:13 -0400 Subject: [PATCH 04/20] Fix tests --- awx/main/management/commands/inventory_import.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index d987d3739a..8a56377a45 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -875,7 +875,6 @@ class Command(BaseCommand): raw_source = options.get('source', None) if not raw_source: raise CommandError('--source is required') - source = Command.get_source_absolute_path(raw_source) verbosity = int(options.get('verbosity', 1)) self.set_logging_level(verbosity) venv_path = options.get('venv', None) @@ -893,8 +892,11 @@ class Command(BaseCommand): raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) + # Create ad-hoc inventory source and inventory update objects with ignore_inventory_computed_fields(): + source = Command.get_source_absolute_path(raw_source) + inventory_source, created = InventorySource.objects.get_or_create( inventory=inventory, source='file', From 66bdcee854ea1d69f468b76ade1b1661e28bc3d4 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Mon, 26 Oct 2020 15:18:08 -0400 Subject: [PATCH 05/20] Address rebase fallout --- awx/main/management/commands/inventory_import.py | 1 - 1 file changed, 1 deletion(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 8a56377a45..944acde94b 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -967,7 +967,6 @@ class Command(BaseCommand): begin = time.time() with advisory_lock('inventory_{}_update'.format(self.inventory.id)): - self.load_inventory_from_database() try: self.check_license() From 9633714c492c76b6018756bdae2fb5aa531dca30 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Thu, 29 Oct 2020 12:27:42 -0700 Subject: [PATCH 06/20] create lock for perform_update * perform_update can be called from either awx-manage or the RunInventoryUpdate task * need to make sure that the inventory updates that happen with perform_update are atomic --- .../management/commands/inventory_import.py | 116 +++++++++--------- 1 file changed, 61 insertions(+), 55 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 944acde94b..f73f35b5de 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -863,68 +863,69 @@ class Command(BaseCommand): self.inventory_update.save(update_fields=['org_host_limit_error']) def handle(self, *args, **options): - # Load inventory and related objects from database. - inventory_name = options.get('inventory_name', None) - inventory_id = options.get('inventory_id', None) - if inventory_name and inventory_id: - raise CommandError('--inventory-name and --inventory-id are mutually exclusive') - elif not inventory_name and not inventory_id: - raise CommandError('--inventory-name or --inventory-id is required') + with advisory_lock('inventory_{}_import'.format(self.inventory.id)): + # Load inventory and related objects from database. + inventory_name = options.get('inventory_name', None) + inventory_id = options.get('inventory_id', None) + if inventory_name and inventory_id: + raise CommandError('--inventory-name and --inventory-id are mutually exclusive') + elif not inventory_name and not inventory_id: + raise CommandError('--inventory-name or --inventory-id is required') - # Obtain rest of the options needed to run update - raw_source = options.get('source', None) - if not raw_source: - raise CommandError('--source is required') - verbosity = int(options.get('verbosity', 1)) - self.set_logging_level(verbosity) - venv_path = options.get('venv', None) + # Obtain rest of the options needed to run update + raw_source = options.get('source', None) + if not raw_source: + raise CommandError('--source is required') + verbosity = int(options.get('verbosity', 1)) + self.set_logging_level(verbosity) + venv_path = options.get('venv', None) - # Load inventory object based on name or ID. - if inventory_id: - q = dict(id=inventory_id) - else: - q = dict(name=inventory_name) - try: - inventory = Inventory.objects.get(**q) - except Inventory.DoesNotExist: - raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) - except Inventory.MultipleObjectsReturned: - raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) - logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) + # Load inventory object based on name or ID. + if inventory_id: + q = dict(id=inventory_id) + else: + q = dict(name=inventory_name) + try: + inventory = Inventory.objects.get(**q) + except Inventory.DoesNotExist: + raise CommandError('Inventory with %s = %s cannot be found' % list(q.items())[0]) + except Inventory.MultipleObjectsReturned: + raise CommandError('Inventory with %s = %s returned multiple results' % list(q.items())[0]) + logger.info('Updating inventory %d: %s' % (inventory.pk, inventory.name)) - # Create ad-hoc inventory source and inventory update objects - with ignore_inventory_computed_fields(): - source = Command.get_source_absolute_path(raw_source) + # Create ad-hoc inventory source and inventory update objects + with ignore_inventory_computed_fields(): + source = Command.get_source_absolute_path(raw_source) - inventory_source, created = InventorySource.objects.get_or_create( - inventory=inventory, - source='file', - source_path=os.path.abspath(source), - overwrite=bool(options.get('overwrite', False)), - overwrite_vars=bool(options.get('overwrite_vars', False)), - ) - inventory_update = inventory_source.create_inventory_update( - _eager_fields=dict( - job_args=json.dumps(sys.argv), - job_env=dict(os.environ.items()), - job_cwd=os.getcwd()) - ) + inventory_source, created = InventorySource.objects.get_or_create( + inventory=inventory, + source='file', + source_path=os.path.abspath(source), + overwrite=bool(options.get('overwrite', False)), + overwrite_vars=bool(options.get('overwrite_vars', False)), + ) + inventory_update = inventory_source.create_inventory_update( + _eager_fields=dict( + job_args=json.dumps(sys.argv), + job_env=dict(os.environ.items()), + job_cwd=os.getcwd()) + ) - data = AnsibleInventoryLoader( - source=source, venv_path=venv_path, verbosity=verbosity - ).load() + data = AnsibleInventoryLoader( + source=source, venv_path=venv_path, verbosity=verbosity + ).load() - logger.debug('Finished loading from source: %s', source) - status, tb, exc = self.perform_update(options, data, inventory_update) + logger.debug('Finished loading from source: %s', source) + status, tb, exc = self.perform_update(options, data, inventory_update) - with ignore_inventory_computed_fields(): - inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk) - inventory_update.result_traceback = tb - inventory_update.status = status - inventory_update.save(update_fields=['status', 'result_traceback']) - inventory_source.status = status - inventory_source.save(update_fields=['status']) + with ignore_inventory_computed_fields(): + inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk) + inventory_update.result_traceback = tb + inventory_update.status = status + inventory_update.save(update_fields=['status', 'result_traceback']) + inventory_source.status = status + inventory_source.save(update_fields=['status']) if exc: logger.error(str(exc)) @@ -966,7 +967,12 @@ class Command(BaseCommand): raise CommandError('invalid regular expression for --host-filter') begin = time.time() - with advisory_lock('inventory_{}_update'.format(self.inventory.id)): + + # Since perform_update can be invoked either through the awx-manage CLI + # or from the task system, we need to create a new lock at this level + # (even though inventory_import.Command.handle -- which calls + # perform_update -- has its own lock, inventory_ID_import) + with advisory_lock('inventory_{}_perform_update'.format(self.inventory.id)): try: self.check_license() From 4bdc488fe708bd20299fdc522e9cf302622870ea Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 30 Oct 2020 14:03:16 -0700 Subject: [PATCH 07/20] restore proot code * add TODOs to note where proot-related code can be removed in the future (after moving to container-based execution) --- .../management/commands/inventory_import.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index f73f35b5de..2851004b91 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -31,7 +31,11 @@ from awx.main.utils.safe_yaml import sanitize_jinja # other AWX imports from awx.main.models.rbac import batch_role_ancestor_rebuilding +# TODO: remove proot utils once we move to running inv. updates in containers from awx.main.utils import ( + check_proot_installed, + wrap_args_with_proot, + build_proot_temp_dir, ignore_inventory_computed_fields, get_licenser ) @@ -134,16 +138,51 @@ class AnsibleInventoryLoader(object): logger.debug('Using base command: {}'.format(' '.join(bargs))) return bargs + # TODO: Remove this once we move to running ansible-inventory in containers + # and don't need proot for process isolation anymore + def get_proot_args(self, cmd, env): + cwd = os.getcwd() + if not check_proot_installed(): + raise RuntimeError("proot is not installed but is configured for use") + + kwargs = {} + # we cannot safely store tmp data in source dir or trust script contents + if env['AWX_PRIVATE_DATA_DIR']: + # If this is non-blank, file credentials are being used and we need access + private_data_dir = functioning_dir(env['AWX_PRIVATE_DATA_DIR']) + logger.debug("Using private credential data in '{}'.".format(private_data_dir)) + kwargs['private_data_dir'] = private_data_dir + self.tmp_private_dir = build_proot_temp_dir() + logger.debug("Using fresh temporary directory '{}' for isolation.".format(self.tmp_private_dir)) + kwargs['proot_temp_dir'] = self.tmp_private_dir + kwargs['proot_show_paths'] = [functioning_dir(self.source), settings.AWX_ANSIBLE_COLLECTIONS_PATHS] + logger.debug("Running from `{}` working directory.".format(cwd)) + + if self.venv_path != settings.ANSIBLE_VENV_PATH: + kwargs['proot_custom_virtualenv'] = self.venv_path + + return wrap_args_with_proot(cmd, cwd, **kwargs) + + def command_to_json(self, cmd): data = {} stdout, stderr = '', '' env = self.build_env() + # TODO: remove proot args once inv. updates run in containers + if (('AWX_PRIVATE_DATA_DIR' in env) and + getattr(settings, 'AWX_PROOT_ENABLED', False)): + cmd = self.get_proot_args(cmd, env) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = proc.communicate() stdout = smart_text(stdout) stderr = smart_text(stderr) + # TODO: can be removed when proot is removed + if self.tmp_private_dir: + shutil.rmtree(self.tmp_private_dir, True) + if proc.returncode != 0: raise RuntimeError('%s failed (rc=%d) with stdout:\n%s\nstderr:\n%s' % ( 'ansible-inventory', proc.returncode, stdout, stderr)) From a8710bf2f10e020594ac91fb4e8de66086bdda94 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Mon, 2 Nov 2020 13:16:52 -0800 Subject: [PATCH 08/20] restore proot for inventory updates - in the past, inv. update jobs called `awx-manage inventory_update` which took care of setting up process isolation - at this point, though, inv. update jobs call runner / ansible-inventory directly, so we need another way to put process isolation in place - thankfully, there was already support for providing process isolation for other types of jobs (namely JT Jobs, Project Updates and Ad Hoc commands) - so, we do what those other jobs do and override the stub for should_use_proot (which by default returns false) so that it keys off of the `AWX_PROOT_ENABLED` setting --- awx/main/tasks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 04ac11b219..a810ad7d00 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2461,6 +2461,12 @@ class RunInventoryUpdate(BaseTask): event_model = InventoryUpdateEvent event_data_key = 'inventory_update_id' + def should_use_proot(self, inventory_update): + ''' + Return whether this task should use proot. + ''' + return getattr(settings, 'AWX_PROOT_ENABLED', False) + @property def proot_show_paths(self): return [settings.AWX_ANSIBLE_COLLECTIONS_PATHS] From 72df8723f6b6f5b289a0838f02c1fc0e7a66d517 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Mon, 2 Nov 2020 17:36:55 -0800 Subject: [PATCH 09/20] lint --- awx/main/management/commands/inventory_import.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 2851004b91..dbe156a5ec 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -171,7 +171,7 @@ class AnsibleInventoryLoader(object): # TODO: remove proot args once inv. updates run in containers if (('AWX_PRIVATE_DATA_DIR' in env) and - getattr(settings, 'AWX_PROOT_ENABLED', False)): + getattr(settings, 'AWX_PROOT_ENABLED', False)): cmd = self.get_proot_args(cmd, env) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) From 12cbc9756ba902dfb28c87a24e76ff17c54017c8 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Mon, 2 Nov 2020 17:37:18 -0800 Subject: [PATCH 10/20] inventory updates should use custom venv --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a810ad7d00..f0db77a2e6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2503,7 +2503,7 @@ class RunInventoryUpdate(BaseTask): private_data_files=private_data_files) if private_data_files is None: private_data_files = {} - self.add_ansible_venv(settings.ANSIBLE_VENV_PATH, env) + self.add_ansible_venv(inventory_update.ansible_virtualenv_path, env, isolated=isolated) # Legacy environment variables, were used as signal to awx-manage command # now they are provided in case some scripts may be relying on them From 277c47ba4e6d5258b27c1f29769d9f6b93974e18 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Tue, 3 Nov 2020 13:00:39 -0800 Subject: [PATCH 11/20] add TODO reminders to remove proot / venv code * won't be needed once we move to container-based execution --- awx/main/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index f0db77a2e6..a0dc41f756 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2461,12 +2461,14 @@ class RunInventoryUpdate(BaseTask): event_model = InventoryUpdateEvent event_data_key = 'inventory_update_id' + # TODO: remove once inv updates run in containers def should_use_proot(self, inventory_update): ''' Return whether this task should use proot. ''' return getattr(settings, 'AWX_PROOT_ENABLED', False) + # TODO: remove once inv updates run in containers @property def proot_show_paths(self): return [settings.AWX_ANSIBLE_COLLECTIONS_PATHS] @@ -2503,6 +2505,7 @@ class RunInventoryUpdate(BaseTask): private_data_files=private_data_files) if private_data_files is None: private_data_files = {} + # TODO: remove once containers replace custom venvs self.add_ansible_venv(inventory_update.ansible_virtualenv_path, env, isolated=isolated) # Legacy environment variables, were used as signal to awx-manage command From d55f36eb9001a8983e32ddb28d97a0f854b3d374 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Tue, 3 Nov 2020 13:24:44 -0800 Subject: [PATCH 12/20] add clarifying comment * noting that the inv update task only uses the inventory update management command to save the inv to the database (it doesn't do the work of fetching hosts / groups) --- awx/main/tasks.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a0dc41f756..7e013bd198 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2784,6 +2784,12 @@ class RunInventoryUpdate(BaseTask): from awx.main.management.commands.inventory_import import Command as InventoryImportCommand cmd = InventoryImportCommand() try: + # note that we are only using the management command to + # save the inventory data to the database. + # we are not asking it to actually fetch hosts / groups. + # that work was taken care of earlier, when + # BaseTask.run called ansible-inventory (by way of ansible-runner) + # for us. save_status, tb, exc = cmd.perform_update(options, data, inventory_update) except Exception as raw_exc: # Ignore license errors specifically From 232801e0ba3e0514ce0dbfd0a9dd5979a4b32e54 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Tue, 3 Nov 2020 17:39:12 -0800 Subject: [PATCH 13/20] cache end_line for RunInventoryUpdate jobs --- awx/main/tasks.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 7e013bd198..92930e2c04 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1224,6 +1224,20 @@ class BaseTask(object): Ansible runner puts a parent_uuid on each event, no matter what the type. AWX only saves the parent_uuid if the event is for a Job. ''' + # cache end_line locally for RunInventoryUpdate tasks + # which generate job events from two 'streams': + # ansible-inventory and the awx.main.commands.inventory_import + # logger + if isinstance(self, RunInventoryUpdate): + if not getattr(self, 'end_line', None): + # this is the very first event + # note the end_line + self.end_line = event_data['end_line'] + else: + num_lines = event_data['end_line'] - event_data['start_line'] + event_data['start_line'] = self.end_line + 1 + self.end_line = event_data['end_line'] = event_data['start_line'] + num_lines + if event_data.get(self.event_data_key, None): if self.event_data_key != 'job_id': event_data.pop('parent_uuid', None) @@ -2758,7 +2772,7 @@ class RunInventoryUpdate(BaseTask): self.counter += 1 msg = self.format(record) - n_lines = msg.count('\n') + n_lines = msg.strip().count('\n') # don't count new-lines at boundry of text dispatch_data = dict( created=now().isoformat(), event='verbose', From 38638b4a6b5cea48a36bf5e73bd202fe80b179ac Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Mon, 9 Nov 2020 15:08:12 -0800 Subject: [PATCH 14/20] add note to remove private_dir when proot removed --- awx/main/management/commands/inventory_import.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index dbe156a5ec..ba5d618791 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -79,6 +79,8 @@ class AnsibleInventoryLoader(object): def __init__(self, source, venv_path=None, verbosity=0): self.source = source self.verbosity = verbosity + # TODO: remove once proot has been removed + self.tmp_private_dir = None if venv_path: self.venv_path = venv_path else: From 5ad60a3ed427ab099253884d193affe81711bd01 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Thu, 12 Nov 2020 16:07:01 -0800 Subject: [PATCH 15/20] use inventory_id to get advisory_lock --- awx/main/management/commands/inventory_import.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index ba5d618791..2636cf8d10 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -904,15 +904,15 @@ class Command(BaseCommand): self.inventory_update.save(update_fields=['org_host_limit_error']) def handle(self, *args, **options): - with advisory_lock('inventory_{}_import'.format(self.inventory.id)): - # Load inventory and related objects from database. - inventory_name = options.get('inventory_name', None) - inventory_id = options.get('inventory_id', None) - if inventory_name and inventory_id: - raise CommandError('--inventory-name and --inventory-id are mutually exclusive') - elif not inventory_name and not inventory_id: - raise CommandError('--inventory-name or --inventory-id is required') + # Load inventory and related objects from database. + inventory_name = options.get('inventory_name', None) + inventory_id = options.get('inventory_id', None) + if inventory_name and inventory_id: + raise CommandError('--inventory-name and --inventory-id are mutually exclusive') + elif not inventory_name and not inventory_id: + raise CommandError('--inventory-name or --inventory-id is required') + with advisory_lock('inventory_{}_import'.format(inventory_id)): # Obtain rest of the options needed to run update raw_source = options.get('source', None) if not raw_source: From 197d50bc44cf1d45c6ba7d2eb4f9b9a26a707485 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 13 Nov 2020 11:23:47 -0800 Subject: [PATCH 16/20] patch test_inventory_update_injected_content * proot now enabled at task-level since tasks are no longer calling awx-manage (which would set up its own proot) * dropping proot env var since it's not relevant to the test --- awx/main/tests/functional/test_inventory_source_injectors.py | 1 + 1 file changed, 1 insertion(+) diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index dc54301d01..fc28c92294 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -216,6 +216,7 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential env.pop('ANSIBLE_COLLECTIONS_PATHS', None) # collection paths not relevant to this test env.pop('PYTHONPATH') env.pop('VIRTUAL_ENV') + env.pop('PROOT_TMP_DIR') base_dir = os.path.join(DATA, 'plugins') if not os.path.exists(base_dir): os.mkdir(base_dir) From ec93af4ba839ae2944460b4aa888aea559f2363c Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Sat, 14 Nov 2020 22:23:42 -0500 Subject: [PATCH 17/20] Not all license errors are caught, do not assume they are --- awx/main/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 92930e2c04..d8a22440f8 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2797,6 +2797,7 @@ class RunInventoryUpdate(BaseTask): from awx.main.management.commands.inventory_import import Command as InventoryImportCommand cmd = InventoryImportCommand() + exc = None try: # note that we are only using the management command to # save the inventory data to the database. @@ -2806,6 +2807,8 @@ class RunInventoryUpdate(BaseTask): # for us. save_status, tb, exc = cmd.perform_update(options, data, inventory_update) except Exception as raw_exc: + if exc is None: + exc = raw_exc # Ignore license errors specifically if 'Host limit for organization' not in str(exc) and 'License' not in str(exc): raise raw_exc From d6e84b54c9e2c76c0e57e4cedb6c8873d580d8f0 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 18 Nov 2020 20:55:26 -0500 Subject: [PATCH 18/20] Some fixes for line numbering, and fixes for license error handling (#8) * Change handling of error cases to global post_run_hook * handle license errors correctly again * Fix some issues with line ordering from the custom logger thing * Remove debug log statement * Use PermissionDenied for license errors * More elegant handling of line initialization Update tests to new exception type Catch all save errors, fix timing offset bug Fix license error handling inside import command --- awx/main/exceptions.py | 7 + .../management/commands/inventory_import.py | 196 +++++++++--------- awx/main/tasks.py | 88 ++++---- .../commands/test_inventory_import.py | 5 +- 4 files changed, 145 insertions(+), 151 deletions(-) diff --git a/awx/main/exceptions.py b/awx/main/exceptions.py index 8aadfd80b0..64cbc94783 100644 --- a/awx/main/exceptions.py +++ b/awx/main/exceptions.py @@ -30,3 +30,10 @@ class _AwxTaskError(): AwxTaskError = _AwxTaskError() + + +class PostRunError(Exception): + def __init__(self, msg, status='failed', tb=''): + self.status = status + self.tb = tb + super(PostRunError, self).__init__(msg) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 2636cf8d10..2179faad6b 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -19,6 +19,9 @@ from django.core.management.base import BaseCommand, CommandError from django.db import connection, transaction from django.utils.encoding import smart_text +# DRF error class to distinguish license exceptions +from rest_framework.exceptions import PermissionDenied + # AWX inventory imports from awx.main.models.inventory import ( Inventory, @@ -839,9 +842,9 @@ class Command(BaseCommand): source_vars = self.all_group.variables remote_license_type = source_vars.get('tower_metadata', {}).get('license_type', None) if remote_license_type is None: - raise CommandError('Unexpected Error: Tower inventory plugin missing needed metadata!') + raise PermissionDenied('Unexpected Error: Tower inventory plugin missing needed metadata!') if local_license_type != remote_license_type: - raise CommandError('Tower server licenses must match: source: {} local: {}'.format( + raise PermissionDenied('Tower server licenses must match: source: {} local: {}'.format( remote_license_type, local_license_type )) @@ -850,7 +853,7 @@ class Command(BaseCommand): local_license_type = license_info.get('license_type', 'UNLICENSED') if local_license_type == 'UNLICENSED': logger.error(LICENSE_NON_EXISTANT_MESSAGE) - raise CommandError('No license found!') + raise PermissionDenied('No license found!') elif local_license_type == 'open': return available_instances = license_info.get('available_instances', 0) @@ -861,7 +864,7 @@ class Command(BaseCommand): if time_remaining <= 0: if hard_error: logger.error(LICENSE_EXPIRED_MESSAGE) - raise CommandError("License has expired!") + raise PermissionDenied("License has expired!") else: logger.warning(LICENSE_EXPIRED_MESSAGE) # special check for tower-type inventory sources @@ -878,7 +881,7 @@ class Command(BaseCommand): } if hard_error: logger.error(LICENSE_MESSAGE % d) - raise CommandError('License count exceeded!') + raise PermissionDenied('License count exceeded!') else: logger.warning(LICENSE_MESSAGE % d) @@ -893,7 +896,7 @@ class Command(BaseCommand): active_count = Host.objects.org_active_count(org.id) if active_count > org.max_hosts: - raise CommandError('Host limit for organization exceeded!') + raise PermissionDenied('Host limit for organization exceeded!') def mark_license_failure(self, save=True): self.inventory_update.license_error = True @@ -958,7 +961,17 @@ class Command(BaseCommand): ).load() logger.debug('Finished loading from source: %s', source) - status, tb, exc = self.perform_update(options, data, inventory_update) + + status, tb, exc = 'error', '', None + try: + self.perform_update(options, data, inventory_update) + status = 'successful' + except Exception as e: + exc = e + if isinstance(e, KeyboardInterrupt): + status = 'canceled' + else: + tb = traceback.format_exc() with ignore_inventory_computed_fields(): inventory_update = InventoryUpdate.objects.get(pk=inventory_update.pk) @@ -1017,119 +1030,106 @@ class Command(BaseCommand): try: self.check_license() - except CommandError as e: + except PermissionDenied as e: self.mark_license_failure(save=True) raise e try: # Check the per-org host limits self.check_org_host_limit() - except CommandError as e: + except PermissionDenied as e: self.mark_org_limits_failure(save=True) raise e - status, tb, exc = 'error', '', None - try: - if settings.SQL_DEBUG: - queries_before = len(connection.queries) + if settings.SQL_DEBUG: + queries_before = len(connection.queries) - # Update inventory update for this command line invocation. - with ignore_inventory_computed_fields(): - iu = self.inventory_update - if iu.status != 'running': - with transaction.atomic(): - self.inventory_update.status = 'running' - self.inventory_update.save() + # Update inventory update for this command line invocation. + with ignore_inventory_computed_fields(): + # TODO: move this to before perform_update + iu = self.inventory_update + if iu.status != 'running': + with transaction.atomic(): + self.inventory_update.status = 'running' + self.inventory_update.save() - logger.info('Processing JSON output...') - inventory = MemInventory( - group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) - inventory = dict_to_mem_data(data, inventory=inventory) + logger.info('Processing JSON output...') + inventory = MemInventory( + group_filter_re=self.group_filter_re, host_filter_re=self.host_filter_re) + inventory = dict_to_mem_data(data, inventory=inventory) - logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), - len(inventory.all_group.all_hosts)) + logger.info('Loaded %d groups, %d hosts', len(inventory.all_group.all_groups), + len(inventory.all_group.all_hosts)) - if self.exclude_empty_groups: - inventory.delete_empty_groups() + if self.exclude_empty_groups: + inventory.delete_empty_groups() - self.all_group = inventory.all_group + self.all_group = inventory.all_group - if settings.DEBUG: - # depending on inventory source, this output can be - # *exceedingly* verbose - crawling a deeply nested - # inventory/group data structure and printing metadata about - # each host and its memberships - # - # it's easy for this scale of data to overwhelm pexpect, - # (and it's likely only useful for purposes of debugging the - # actual inventory import code), so only print it if we have to: - # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 - self.all_group.debug_tree() + if settings.DEBUG: + # depending on inventory source, this output can be + # *exceedingly* verbose - crawling a deeply nested + # inventory/group data structure and printing metadata about + # each host and its memberships + # + # it's easy for this scale of data to overwhelm pexpect, + # (and it's likely only useful for purposes of debugging the + # actual inventory import code), so only print it if we have to: + # https://github.com/ansible/ansible-tower/issues/7414#issuecomment-321615104 + self.all_group.debug_tree() - with batch_role_ancestor_rebuilding(): - # If using with transaction.atomic() with try ... catch, - # with transaction.atomic() must be inside the try section of the code as per Django docs - try: - # Ensure that this is managed as an atomic SQL transaction, - # and thus properly rolled back if there is an issue. - with transaction.atomic(): - # Merge/overwrite inventory into database. - if settings.SQL_DEBUG: - logger.warning('loading into database...') - with ignore_inventory_computed_fields(): - if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + with batch_role_ancestor_rebuilding(): + # If using with transaction.atomic() with try ... catch, + # with transaction.atomic() must be inside the try section of the code as per Django docs + try: + # Ensure that this is managed as an atomic SQL transaction, + # and thus properly rolled back if there is an issue. + with transaction.atomic(): + # Merge/overwrite inventory into database. + if settings.SQL_DEBUG: + logger.warning('loading into database...') + with ignore_inventory_computed_fields(): + if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): + self.load_into_database() + else: + with disable_activity_stream(): self.load_into_database() - else: - with disable_activity_stream(): - self.load_into_database() - if settings.SQL_DEBUG: - queries_before2 = len(connection.queries) - self.inventory.update_computed_fields() if settings.SQL_DEBUG: - logger.warning('update computed fields took %d queries', - len(connection.queries) - queries_before2) + queries_before2 = len(connection.queries) + self.inventory.update_computed_fields() + if settings.SQL_DEBUG: + logger.warning('update computed fields took %d queries', + len(connection.queries) - queries_before2) - # Check if the license is valid. - # If the license is not valid, a CommandError will be thrown, - # and inventory update will be marked as invalid. - # with transaction.atomic() will roll back the changes. - license_fail = True - self.check_license() + # Check if the license is valid. + # If the license is not valid, a CommandError will be thrown, + # and inventory update will be marked as invalid. + # with transaction.atomic() will roll back the changes. + license_fail = True + self.check_license() - # Check the per-org host limits - license_fail = False - self.check_org_host_limit() - except CommandError as e: - if license_fail: - self.mark_license_failure(save=True) - else: - self.mark_org_limits_failure(save=True) - raise e - - if settings.SQL_DEBUG: - logger.warning('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) + # Check the per-org host limits + license_fail = False + self.check_org_host_limit() + except PermissionDenied as e: + if license_fail: + self.mark_license_failure(save=True) else: - logger.info('Inventory import completed for %s in %0.1fs', - self.inventory_source.name, time.time() - begin) - status = 'successful' + self.mark_org_limits_failure(save=True) + raise e - # If we're in debug mode, then log the queries and time - # used to do the operation. if settings.SQL_DEBUG: - queries_this_import = connection.queries[queries_before:] - sqltime = sum(float(x['time']) for x in queries_this_import) - logger.warning('Inventory import required %d queries ' - 'taking %0.3fs', len(queries_this_import), - sqltime) - except Exception as e: - if isinstance(e, KeyboardInterrupt): - status = 'canceled' - exc = e - elif isinstance(e, CommandError): - exc = e + logger.warning('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) else: - tb = traceback.format_exc() - exc = e + logger.info('Inventory import completed for %s in %0.1fs', + self.inventory_source.name, time.time() - begin) - return status, tb, exc + # If we're in debug mode, then log the queries and time + # used to do the operation. + if settings.SQL_DEBUG: + queries_this_import = connection.queries[queries_before:] + sqltime = sum(float(x['time']) for x in queries_this_import) + logger.warning('Inventory import required %d queries ' + 'taking %0.3fs', len(queries_this_import), + sqltime) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d8a22440f8..238f002562 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -63,7 +63,7 @@ from awx.main.models import ( build_safe_env, enforce_bigint_pk_migration ) from awx.main.constants import ACTIVE_STATES -from awx.main.exceptions import AwxTaskError +from awx.main.exceptions import AwxTaskError, PostRunError from awx.main.queue import CallbackQueueDispatcher from awx.main.isolated import manager as isolated_manager from awx.main.dispatch.publish import task @@ -1229,14 +1229,7 @@ class BaseTask(object): # ansible-inventory and the awx.main.commands.inventory_import # logger if isinstance(self, RunInventoryUpdate): - if not getattr(self, 'end_line', None): - # this is the very first event - # note the end_line - self.end_line = event_data['end_line'] - else: - num_lines = event_data['end_line'] - event_data['start_line'] - event_data['start_line'] = self.end_line + 1 - self.end_line = event_data['end_line'] = event_data['start_line'] + num_lines + self.end_line = event_data['end_line'] if event_data.get(self.event_data_key, None): if self.event_data_key != 'job_id': @@ -1534,6 +1527,12 @@ class BaseTask(object): try: self.post_run_hook(self.instance, status) + except PostRunError as exc: + if status == 'successful': + status = exc.status + extra_update_fields['job_explanation'] = exc.args[0] + if exc.tb: + extra_update_fields['result_traceback'] = exc.tb except Exception: logger.exception('{} Post run hook errored.'.format(self.instance.log_format)) @@ -2744,27 +2743,28 @@ class RunInventoryUpdate(BaseTask): # Mock ansible-runner events class CallbackHandler(logging.Handler): def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, - counter=0, **kwargs): + start_time=None, counter=0, initial_line=0, **kwargs): self.event_handler = event_handler self.cancel_callback = cancel_callback self.job_timeout = job_timeout - self.job_start = time.time() + if start_time is None: + self.job_start = now() + else: + self.job_start = start_time self.last_check = self.job_start - # TODO: we do not have events from the ansible-inventory process - # so there is no way to know initial counter of start line self.counter = counter self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity] - self._start_line = 0 + self._start_line = initial_line super(CallbackHandler, self).__init__(**kwargs) def emit(self, record): - this_time = time.time() - if this_time - self.last_check > 0.5: + this_time = now() + if (this_time - self.last_check).total_seconds() > 0.5: self.last_check = this_time if self.cancel_callback(): - raise RuntimeError('Inventory update has been canceled') - if self.job_timeout and ((this_time - self.job_start) > self.job_timeout): - raise RuntimeError('Inventory update has timed out') + raise PostRunError('Inventory update has been canceled', status='canceled') + if self.job_timeout and ((this_time - self.job_start).total_seconds() > self.job_timeout): + raise PostRunError('Inventory update has timed out', status='canceled') # skip logging for low severity logs if record.levelno < self.skip_level: @@ -2772,16 +2772,16 @@ class RunInventoryUpdate(BaseTask): self.counter += 1 msg = self.format(record) - n_lines = msg.strip().count('\n') # don't count new-lines at boundry of text + n_lines = len(msg.strip().split('\n')) # don't count new-lines at boundry of text dispatch_data = dict( created=now().isoformat(), event='verbose', counter=self.counter, - stdout=msg + '\n', + stdout=msg, start_line=self._start_line, end_line=self._start_line + n_lines ) - self._start_line += n_lines + 1 + self._start_line += n_lines self.event_handler(dispatch_data) @@ -2789,7 +2789,8 @@ class RunInventoryUpdate(BaseTask): self.event_handler, self.cancel_callback, verbosity=inventory_update.verbosity, job_timeout=self.get_instance_timeout(self.instance), - counter=self.event_ct + start_time=inventory_update.started, + counter=self.event_ct, initial_line=self.end_line ) inv_logger = logging.getLogger('awx.main.commands.inventory_import') handler.formatter = inv_logger.handlers[0].formatter @@ -2797,36 +2798,19 @@ class RunInventoryUpdate(BaseTask): from awx.main.management.commands.inventory_import import Command as InventoryImportCommand cmd = InventoryImportCommand() - exc = None try: - # note that we are only using the management command to - # save the inventory data to the database. - # we are not asking it to actually fetch hosts / groups. - # that work was taken care of earlier, when - # BaseTask.run called ansible-inventory (by way of ansible-runner) - # for us. - save_status, tb, exc = cmd.perform_update(options, data, inventory_update) - except Exception as raw_exc: - if exc is None: - exc = raw_exc - # Ignore license errors specifically - if 'Host limit for organization' not in str(exc) and 'License' not in str(exc): - raise raw_exc - - model_updates = {} - if save_status != status: - model_updates['status'] = save_status - if tb: - model_updates['result_traceback'] = tb - - if model_updates: - logger.info('{} had problems saving to database with {}'.format( - inventory_update.log_format, ', '.join(list(model_updates.keys())) - )) - model_updates['job_explanation'] = 'Update failed to save all changes to database properly.' - if exc: - model_updates['job_explanation'] += ' {}'.format(exc) - self.update_model(inventory_update.pk, **model_updates) + # save the inventory data to database. + # canceling exceptions will be handled in the global post_run_hook + cmd.perform_update(options, data, inventory_update) + except PermissionDenied as exc: + logger.exception('License error saving {} content'.format(inventory_update.log_format)) + raise PostRunError(str(exc), status='error') + except Exception: + logger.exception('Exception saving {} content, rolling back changes.'.format( + inventory_update.log_format)) + raise PostRunError( + 'Error occured while saving inventory data, see traceback or server logs', + status='error', tb=traceback.format_exc()) @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/commands/test_inventory_import.py b/awx/main/tests/functional/commands/test_inventory_import.py index 3fe4b92b5a..0500ef197c 100644 --- a/awx/main/tests/functional/commands/test_inventory_import.py +++ b/awx/main/tests/functional/commands/test_inventory_import.py @@ -9,6 +9,9 @@ import os # Django from django.core.management.base import CommandError +# for license errors +from rest_framework.exceptions import PermissionDenied + # AWX from awx.main.management.commands import inventory_import from awx.main.models import Inventory, Host, Group, InventorySource @@ -322,6 +325,6 @@ def test_tower_version_compare(): "version": "2.0.1-1068-g09684e2c41" } } - with pytest.raises(CommandError): + with pytest.raises(PermissionDenied): cmd.remote_tower_license_compare('very_supported') cmd.remote_tower_license_compare('open') From 7734def856bc28f12bb5c87ca304cc035cb925fe Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 30 Nov 2020 11:22:06 -0500 Subject: [PATCH 19/20] Fix inventory log timestamp, organize logging code The fixes and issue where the timestaps in the stdout for inventory updates gave the time since the start of the dispatcher instead of the time since the start of the update. This commit also moves the handler into the utils module where other custom AWX handlers live, instead of tasks.py this is to keep tasks.py relatively clean, as best as possible --- awx/main/tasks.py | 52 ++++----------------------------- awx/main/utils/formatters.py | 10 ++++++- awx/main/utils/handlers.py | 56 ++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 48 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 238f002562..93195edb12 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -78,6 +78,7 @@ from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock +from awx.main.utils.handlers import SpecialInventoryHandler from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -2740,52 +2741,7 @@ class RunInventoryUpdate(BaseTask): if inventory_update.verbosity: options['verbosity'] = inventory_update.verbosity - # Mock ansible-runner events - class CallbackHandler(logging.Handler): - def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, - start_time=None, counter=0, initial_line=0, **kwargs): - self.event_handler = event_handler - self.cancel_callback = cancel_callback - self.job_timeout = job_timeout - if start_time is None: - self.job_start = now() - else: - self.job_start = start_time - self.last_check = self.job_start - self.counter = counter - self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity] - self._start_line = initial_line - super(CallbackHandler, self).__init__(**kwargs) - - def emit(self, record): - this_time = now() - if (this_time - self.last_check).total_seconds() > 0.5: - self.last_check = this_time - if self.cancel_callback(): - raise PostRunError('Inventory update has been canceled', status='canceled') - if self.job_timeout and ((this_time - self.job_start).total_seconds() > self.job_timeout): - raise PostRunError('Inventory update has timed out', status='canceled') - - # skip logging for low severity logs - if record.levelno < self.skip_level: - return - - self.counter += 1 - msg = self.format(record) - n_lines = len(msg.strip().split('\n')) # don't count new-lines at boundry of text - dispatch_data = dict( - created=now().isoformat(), - event='verbose', - counter=self.counter, - stdout=msg, - start_line=self._start_line, - end_line=self._start_line + n_lines - ) - self._start_line += n_lines - - self.event_handler(dispatch_data) - - handler = CallbackHandler( + handler = SpecialInventoryHandler( self.event_handler, self.cancel_callback, verbosity=inventory_update.verbosity, job_timeout=self.get_instance_timeout(self.instance), @@ -2793,7 +2749,9 @@ class RunInventoryUpdate(BaseTask): counter=self.event_ct, initial_line=self.end_line ) inv_logger = logging.getLogger('awx.main.commands.inventory_import') - handler.formatter = inv_logger.handlers[0].formatter + formatter = inv_logger.handlers[0].formatter + formatter.job_start = inventory_update.started + handler.formatter = formatter inv_logger.handlers[0] = handler from awx.main.management.commands.inventory_import import Command as InventoryImportCommand diff --git a/awx/main/utils/formatters.py b/awx/main/utils/formatters.py index 171a994435..8afd121d5c 100644 --- a/awx/main/utils/formatters.py +++ b/awx/main/utils/formatters.py @@ -9,6 +9,7 @@ import socket from datetime import datetime from dateutil.tz import tzutc +from django.utils.timezone import now from django.core.serializers.json import DjangoJSONEncoder from django.conf import settings @@ -17,8 +18,15 @@ class TimeFormatter(logging.Formatter): ''' Custom log formatter used for inventory imports ''' + def __init__(self, start_time=None, **kwargs): + if start_time is None: + self.job_start = now() + else: + self.job_start = start_time + super(TimeFormatter, self).__init__(**kwargs) + def format(self, record): - record.relativeSeconds = record.relativeCreated / 1000.0 + record.relativeSeconds = (now() - self.job_start).total_seconds() return logging.Formatter.format(self, record) diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c5e0014f8e..b6eefd9c59 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -7,6 +7,10 @@ import os.path # Django from django.conf import settings +from django.utils.timezone import now + +# AWX +from awx.main.exceptions import PostRunError class RSysLogHandler(logging.handlers.SysLogHandler): @@ -40,6 +44,58 @@ class RSysLogHandler(logging.handlers.SysLogHandler): pass +class SpecialInventoryHandler(logging.Handler): + """Logging handler used for the saving-to-database part of inventory updates + ran by the task system + this dispatches events directly to be processed by the callback receiver, + as opposed to ansible-runner + """ + + def __init__(self, event_handler, cancel_callback, job_timeout, verbosity, + start_time=None, counter=0, initial_line=0, **kwargs): + self.event_handler = event_handler + self.cancel_callback = cancel_callback + self.job_timeout = job_timeout + if start_time is None: + self.job_start = now() + else: + self.job_start = start_time + self.last_check = self.job_start + self.counter = counter + self.skip_level = [logging.WARNING, logging.INFO, logging.DEBUG, 0][verbosity] + self._current_line = initial_line + super(SpecialInventoryHandler, self).__init__(**kwargs) + + def emit(self, record): + # check cancel and timeout status regardless of log level + this_time = now() + if (this_time - self.last_check).total_seconds() > 0.5: # cancel callback is expensive + self.last_check = this_time + if self.cancel_callback(): + raise PostRunError('Inventory update has been canceled', status='canceled') + if self.job_timeout and ((this_time - self.job_start).total_seconds() > self.job_timeout): + raise PostRunError('Inventory update has timed out', status='canceled') + + # skip logging for low severity logs + if record.levelno < self.skip_level: + return + + self.counter += 1 + msg = self.format(record) + n_lines = len(msg.strip().split('\n')) # don't count line breaks at boundry of text + dispatch_data = dict( + created=now().isoformat(), + event='verbose', + counter=self.counter, + stdout=msg, + start_line=self._current_line, + end_line=self._current_line + n_lines + ) + self._current_line += n_lines + + self.event_handler(dispatch_data) + + ColorHandler = logging.StreamHandler if settings.COLOR_LOGS is True: From 900127fde7cfcdcb9e0d2e071e3be346c817dfb1 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 30 Nov 2020 14:39:02 -0500 Subject: [PATCH 20/20] Fix bug in inventory update canceling --- awx/main/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 93195edb12..afa7d8e714 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -2763,6 +2763,9 @@ class RunInventoryUpdate(BaseTask): except PermissionDenied as exc: logger.exception('License error saving {} content'.format(inventory_update.log_format)) raise PostRunError(str(exc), status='error') + except PostRunError: + logger.exception('Error saving {} content, rolling back changes'.format(inventory_update.log_format)) + raise except Exception: logger.exception('Exception saving {} content, rolling back changes.'.format( inventory_update.log_format))