From b70793db5c4d34ff4a2ec4ff8211d1ad0f4a6ac6 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 5 Oct 2021 16:32:03 -0400 Subject: [PATCH] Consolidate cleanup actions under new `ansible-runner worker cleanup` command (#11160) * Primary development of integrating runner cleanup command * Fixup image cleanup signals and their tests * Use alphabetical sort to solve the cluster coordination problem * Update test to new pattern * Clarity edits to interface with ansible-runner cleanup method * Another change corresponding to ansible-runner CLI updates * Fix incomplete implementation of receptor remote cleanup * Share receptor utils code between worker_info and cleanup * Complete task logging from calling runner cleanup command * Wrap up unit tests and some contract changes that fall out of those * Fix bug in CLI construction * Fix queryset filter bug --- awx/main/constants.py | 3 + awx/main/models/ha.py | 17 +++ awx/main/signals.py | 18 ++- awx/main/tasks.py | 60 ++++++---- .../models/test_execution_environment.py | 46 ++++++++ awx/main/tests/unit/models/test_ha.py | 16 +++ awx/main/tests/unit/utils/test_receptor.py | 21 ++++ awx/main/utils/receptor.py | 104 +++++++++++++----- awx/settings/defaults.py | 2 +- 9 files changed, 240 insertions(+), 47 deletions(-) create mode 100644 awx/main/tests/functional/models/test_execution_environment.py create mode 100644 awx/main/tests/unit/utils/test_receptor.py diff --git a/awx/main/constants.py b/awx/main/constants.py index b7a5813f7f..0a0491de0e 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -81,3 +81,6 @@ LOGGER_BLOCKLIST = ( # Reported version for node seen in receptor mesh but for which capacity check # failed or is in progress RECEPTOR_PENDING = 'ansible-runner-???' + +# Naming pattern for AWX jobs in /tmp folder, like /tmp/awx_42_xiwm +JOB_FOLDER_PREFIX = 'awx_%s_' diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 9b49f77e40..1e9c6d983a 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -20,6 +20,7 @@ from awx import __version__ as awx_application_version from awx.api.versioning import reverse from awx.main.managers import InstanceManager, InstanceGroupManager, UUID_DEFAULT from awx.main.fields import JSONField +from awx.main.constants import JOB_FOLDER_PREFIX from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity @@ -155,6 +156,22 @@ class Instance(HasPolicyEditsMixin, BaseModel): Instance.objects.filter(enabled=True, capacity__gt=0).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True) ) + def get_cleanup_task_kwargs(self, **kwargs): + """ + Produce options to use for the command: ansible-runner worker cleanup + returns a dict that is passed to the python interface for the runner method corresponding to that command + any kwargs will override that key=value combination in the returned dict + """ + vargs = dict(file_pattern='/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*')) + vargs.update(kwargs) + if 'exclude_strings' not in vargs and vargs.get('file_pattern'): + active_pks = list(UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).values_list('pk', flat=True)) + if active_pks: + vargs['exclude_strings'] = [JOB_FOLDER_PREFIX % job_id for job_id in active_pks] + if 'remove_images' in vargs or 'image_prune' in vargs: + vargs.setdefault('process_isolation_executable', 'podman') + return vargs + def is_lost(self, ref_time=None): if self.last_seen is None: return True diff --git a/awx/main/signals.py b/awx/main/signals.py index fe3fdbc756..5caf7b45a8 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -58,7 +58,7 @@ from awx.main.models import ( from awx.main.constants import CENSOR_VALUE from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates -from awx.main.tasks import update_inventory_computed_fields +from awx.main.tasks import update_inventory_computed_fields, handle_removed_image from awx.main.fields import ( is_implicit_parent, update_role_parentage_for_instance, @@ -624,10 +624,26 @@ def deny_orphaned_approvals(sender, instance, **kwargs): approval.deny() +def _handle_image_cleanup(removed_image, pk): + if (not removed_image) or ExecutionEnvironment.objects.filter(image=removed_image).exclude(pk=pk).exists(): + return # if other EE objects reference the tag, then do not purge it + handle_removed_image.delay(remove_images=[removed_image]) + + @receiver(pre_delete, sender=ExecutionEnvironment) def remove_default_ee(sender, instance, **kwargs): if instance.id == getattr(settings.DEFAULT_EXECUTION_ENVIRONMENT, 'id', None): settings.DEFAULT_EXECUTION_ENVIRONMENT = None + _handle_image_cleanup(instance.image, instance.pk) + + +@receiver(post_save, sender=ExecutionEnvironment) +def remove_stale_image(sender, instance, created, **kwargs): + if created: + return + removed_image = instance._prior_values_store.get('image') + if removed_image and removed_image != instance.image: + _handle_image_cleanup(removed_image, instance.pk) @receiver(post_save, sender=Session) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0deb9357aa..1bde975373 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -11,6 +11,8 @@ import importlib import json import logging import os +from io import StringIO +from contextlib import redirect_stdout import shutil import stat import tempfile @@ -27,7 +29,6 @@ import socket import threading import concurrent.futures from base64 import b64encode -import subprocess import sys # Django @@ -51,13 +52,14 @@ from gitdb.exc import BadName as BadGitName # Runner import ansible_runner +import ansible_runner.cleanup # dateutil from dateutil.parser import parse as parse_date # AWX from awx import __version__ as awx_application_version -from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS +from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS, JOB_FOLDER_PREFIX from awx.main.access import access_registry from awx.main.redact import UriCleaner from awx.main.models import ( @@ -106,7 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client +from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -390,22 +392,42 @@ def purge_old_stdout_files(): logger.debug("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT, f))) -@task(queue=get_local_queuename) -def cleanup_execution_environment_images(): +def _cleanup_images_and_files(**kwargs): if settings.IS_K8S: return - process = subprocess.run('podman images --filter="dangling=true" --format json'.split(" "), capture_output=True) - if process.returncode != 0: - logger.debug("Cleanup execution environment images: could not get list of images") - return - if len(process.stdout) > 0: - images_system = json.loads(process.stdout) - for e in images_system: - image_name = e["Id"] - logger.debug(f"Cleanup execution environment images: deleting {image_name}") - process = subprocess.run(['podman', 'rmi', image_name, '-f'], stdout=subprocess.DEVNULL) - if process.returncode != 0: - logger.debug(f"Failed to delete image {image_name}") + this_inst = Instance.objects.me() + runner_cleanup_kwargs = this_inst.get_cleanup_task_kwargs(**kwargs) + stdout = '' + with StringIO() as buffer: + with redirect_stdout(buffer): + ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs) + stdout = buffer.getvalue() + if '(changed: True)' in stdout: + logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}') + + # if we are the first instance alphabetically, then run cleanup on execution nodes + checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first() + if checker_instance and this_inst.hostname == checker_instance.hostname: + logger.info(f'Running execution node cleanup with kwargs {kwargs}') + for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0): + runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs) + try: + stdout = worker_cleanup(inst.hostname, runner_cleanup_kwargs) + if '(changed: True)' in stdout: + logger.info(f'Performed cleanup on execution node {inst.hostname} with output:\n{stdout}') + except RuntimeError: + logger.exception(f'Error running cleanup on execution node {inst.hostname}') + + +@task(queue='tower_broadcast_all') +def handle_removed_image(remove_images=None): + """Special broadcast invocation of this method to handle case of deleted EE""" + _cleanup_images_and_files(remove_images=remove_images, file_pattern='') + + +@task(queue=get_local_queuename) +def cleanup_images_and_files(): + _cleanup_images_and_files() @task(queue=get_local_queuename) @@ -441,7 +463,7 @@ def execution_node_health_check(node): if instance.node_type != 'execution': raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}') - data = worker_info(node, work_type='ansible-runner' if instance.node_type == 'execution' else 'local') + data = worker_info(node) prior_capacity = instance.capacity @@ -980,7 +1002,7 @@ class BaseTask(object): """ Create a temporary directory for job-related files. """ - path = tempfile.mkdtemp(prefix='awx_%s_' % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH) + path = tempfile.mkdtemp(prefix=JOB_FOLDER_PREFIX % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH) os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) if settings.AWX_CLEANUP_PATHS: self.cleanup_paths.append(path) diff --git a/awx/main/tests/functional/models/test_execution_environment.py b/awx/main/tests/functional/models/test_execution_environment.py new file mode 100644 index 0000000000..489c163f0c --- /dev/null +++ b/awx/main/tests/functional/models/test_execution_environment.py @@ -0,0 +1,46 @@ +import pytest + +from awx.main.models.execution_environments import ExecutionEnvironment + + +@pytest.fixture +def cleanup_patch(mocker): + return mocker.patch('awx.main.signals.handle_removed_image') + + +@pytest.mark.django_db +def test_image_unchanged_no_delete_task(cleanup_patch): + """When an irrelevant EE field is changed, we do not run the image cleanup task""" + execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') + execution_environment.description = 'foobar' + execution_environment.save() + + cleanup_patch.delay.assert_not_called() + + +@pytest.mark.django_db +def test_image_changed_creates_delete_task(cleanup_patch): + execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') + execution_environment.image = 'quay.io/new/image' + execution_environment.save() + + cleanup_patch.delay.assert_called_once_with(remove_images=['quay.io/foo/bar']) + + +@pytest.mark.django_db +def test_image_still_in_use(cleanup_patch): + """When an image is still in use by another EE, we do not clean it up""" + ExecutionEnvironment.objects.create(name='unrelated-ee', image='quay.io/foo/bar') + execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') + execution_environment.image = 'quay.io/new/image' + execution_environment.save() + + cleanup_patch.delay.assert_not_called() + + +@pytest.mark.django_db +def test_image_deletion_creates_delete_task(cleanup_patch): + execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') + execution_environment.delete() + + cleanup_patch.delay.assert_called_once_with(remove_images=['quay.io/foo/bar']) diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index 4a07cafd2d..a0ce3476e2 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -89,3 +89,19 @@ class TestInstanceGroup(object): assert ig.find_largest_idle_instance(instances_online_only) is None, reason else: assert ig.find_largest_idle_instance(instances_online_only) == instances[instance_fit_index], reason + + +def test_cleanup_params_defaults(): + inst = Instance(hostname='foobar') + assert inst.get_cleanup_task_kwargs(exclude_strings=['awx_423_']) == {'exclude_strings': ['awx_423_'], 'file_pattern': '/tmp/awx_*_*'} + + +def test_cleanup_params_for_image_cleanup(): + inst = Instance(hostname='foobar') + # see CLI conversion in awx.main.tests.unit.utils.test_receptor + assert inst.get_cleanup_task_kwargs(file_pattern='', remove_images=['quay.invalid/foo/bar'], image_prune=True) == { + 'file_pattern': '', + 'process_isolation_executable': 'podman', + 'remove_images': ['quay.invalid/foo/bar'], + 'image_prune': True, + } diff --git a/awx/main/tests/unit/utils/test_receptor.py b/awx/main/tests/unit/utils/test_receptor.py new file mode 100644 index 0000000000..944494ebdc --- /dev/null +++ b/awx/main/tests/unit/utils/test_receptor.py @@ -0,0 +1,21 @@ +from awx.main.utils.receptor import _convert_args_to_cli + + +def test_file_cleanup_scenario(): + args = _convert_args_to_cli({'exclude_strings': ['awx_423_', 'awx_582_'], 'file_pattern': '/tmp/awx_*_*'}) + assert ' '.join(args) == 'cleanup --exclude-strings=awx_423_ awx_582_ --file-pattern=/tmp/awx_*_*' + + +def test_image_cleanup_scenario(): + # See input dict in awx.main.tests.unit.models.test_ha + args = _convert_args_to_cli( + { + 'file_pattern': '', + 'process_isolation_executable': 'podman', + 'remove_images': ['quay.invalid/foo/bar:latest', 'quay.invalid/foo/bar:devel'], + 'image_prune': True, + } + ) + assert ( + ' '.join(args) == 'cleanup --remove-images=quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel --image-prune --process-isolation-executable=podman' + ) diff --git a/awx/main/utils/receptor.py b/awx/main/utils/receptor.py index 0aa87e843c..ece0f5fcec 100644 --- a/awx/main/utils/receptor.py +++ b/awx/main/utils/receptor.py @@ -61,40 +61,48 @@ def get_conn_type(node_name, receptor_ctl): return ReceptorConnectionType(node.get('ConnType')) -def worker_info(node_name, work_type='ansible-runner'): - receptor_ctl = get_receptor_ctl() - use_stream_tls = getattr(get_conn_type(node_name, receptor_ctl), 'name', None) == "STREAMTLS" - transmit_start = time.time() - error_list = [] - data = {'errors': error_list, 'transmit_timing': 0.0} +class RemoteJobError(RuntimeError): + pass - kwargs = {} - kwargs['tlsclient'] = get_tls_client(use_stream_tls) - kwargs['signwork'] = True - if work_type != 'local': - kwargs['ttl'] = '20s' - result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs) + +def run_until_complete(node, timing_data=None, **kwargs): + """ + Runs an ansible-runner work_type on remote node, waits until it completes, then returns stdout. + """ + receptor_ctl = get_receptor_ctl() + + use_stream_tls = getattr(get_conn_type(node, receptor_ctl), 'name', None) == "STREAMTLS" + kwargs.setdefault('tlsclient', get_tls_client(use_stream_tls)) + kwargs.setdefault('signwork', True) + kwargs.setdefault('ttl', '20s') + kwargs.setdefault('payload', '') + + transmit_start = time.time() + result = receptor_ctl.submit_work(worktype='ansible-runner', node=node, **kwargs) unit_id = result['unitid'] run_start = time.time() - data['transmit_timing'] = run_start - transmit_start - data['run_timing'] = 0.0 + if timing_data: + timing_data['transmit_timing'] = run_start - transmit_start + run_timing = 0.0 + stdout = '' try: resultfile = receptor_ctl.get_work_results(unit_id) - stdout = '' - - while data['run_timing'] < 20.0: + while run_timing < 20.0: status = receptor_ctl.simple_command(f'work status {unit_id}') state_name = status.get('StateName') if state_name not in ('Pending', 'Running'): break - data['run_timing'] = time.time() - run_start + run_timing = time.time() - run_start time.sleep(0.5) else: - error_list.append(f'Timeout getting worker info on {node_name}, state remains in {state_name}') + raise RemoteJobError(f'Receptor job timeout on {node} after {run_timing} seconds, state remains in {state_name}') + + if timing_data: + timing_data['run_timing'] = run_timing stdout = resultfile.read() stdout = str(stdout, encoding='utf-8') @@ -103,19 +111,27 @@ def worker_info(node_name, work_type='ansible-runner'): res = receptor_ctl.simple_command(f"work release {unit_id}") if res != {'released': unit_id}: - logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node_name}, data: {res}') + logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') receptor_ctl.close() if state_name.lower() == 'failed': work_detail = status.get('Detail', '') - if not work_detail.startswith('exit status'): - error_list.append(f'Receptor error getting worker info from {node_name}, detail:\n{work_detail}') - elif 'unrecognized arguments: --worker-info' in stdout: - error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info') + if work_detail: + raise RemoteJobError(f'Receptor error from {node}, detail:\n{work_detail}') else: - error_list.append(f'Unknown ansible-runner error on node {node_name}, stdout:\n{stdout}') - else: + raise RemoteJobError(f'Unknown ansible-runner error on node {node}, stdout:\n{stdout}') + + return stdout + + +def worker_info(node_name, work_type='ansible-runner'): + error_list = [] + data = {'errors': error_list, 'transmit_timing': 0.0} + + try: + stdout = run_until_complete(node=node_name, timing_data=data, params={"params": "--worker-info"}) + yaml_stdout = stdout.strip() remote_data = {} try: @@ -129,6 +145,13 @@ def worker_info(node_name, work_type='ansible-runner'): error_list.extend(remote_data.pop('errors', [])) # merge both error lists data.update(remote_data) + except RemoteJobError as exc: + details = exc.args[0] + if 'unrecognized arguments: --worker-info' in details: + error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info') + else: + error_list.append(details) + # If we have a connection error, missing keys would be trivial consequence of that if not data['errors']: # see tasks.py usage of keys @@ -137,3 +160,32 @@ def worker_info(node_name, work_type='ansible-runner'): data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys))) return data + + +def _convert_args_to_cli(vargs): + """ + For the ansible-runner worker cleanup command + converts the dictionary (parsed argparse variables) used for python interface + into a string of CLI options, which has to be used on execution nodes. + """ + args = ['cleanup'] + for option in ('exclude_strings', 'remove_images'): + if vargs.get(option): + args.append('--{}={}'.format(option.replace('_', '-'), ' '.join(vargs.get(option)))) + for option in ('file_pattern', 'image_prune', 'process_isolation_executable', 'grace_period'): + if vargs.get(option) is True: + args.append('--{}'.format(option.replace('_', '-'))) + elif vargs.get(option) not in (None, ''): + args.append('--{}={}'.format(option.replace('_', '-'), vargs.get(option))) + return args + + +def worker_cleanup(node_name, vargs, timeout=300.0): + args = _convert_args_to_cli(vargs) + + remote_command = ' '.join(args) + logger.debug(f'Running command over receptor mesh on {node_name}: ansible-runner worker {remote_command}') + + stdout = run_until_complete(node=node_name, params={"params": remote_command}) + + return stdout diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e3c2787c57..3f949abeca 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -441,7 +441,7 @@ CELERYBEAT_SCHEDULE = { 'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, 'receptor_reaper': {'task': 'awx.main.tasks.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, - 'cleanup_images': {'task': 'awx.main.tasks.cleanup_execution_environment_images', 'schedule': timedelta(hours=3)}, + 'cleanup_images': {'task': 'awx.main.tasks.cleanup_images_and_files', 'schedule': timedelta(hours=3)}, } # Django Caching Configuration