Consolidate cleanup actions under new ansible-runner worker cleanup command (#11160)

* Primary development of integrating runner cleanup command

* Fixup image cleanup signals and their tests

* Use alphabetical sort to solve the cluster coordination problem

* Update test to new pattern

* Clarity edits to interface with ansible-runner cleanup method

* Another change corresponding to ansible-runner CLI updates

* Fix incomplete implementation of receptor remote cleanup

* Share receptor utils code between worker_info and cleanup

* Complete task logging from calling runner cleanup command

* Wrap up unit tests and some contract changes that fall out of those

* Fix bug in CLI construction

* Fix queryset filter bug
This commit is contained in:
Alan Rominger 2021-10-05 16:32:03 -04:00 committed by GitHub
parent 4c205dfde9
commit b70793db5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 240 additions and 47 deletions

View File

@ -81,3 +81,6 @@ LOGGER_BLOCKLIST = (
# Reported version for node seen in receptor mesh but for which capacity check
# failed or is in progress
RECEPTOR_PENDING = 'ansible-runner-???'
# Naming pattern for AWX jobs in /tmp folder, like /tmp/awx_42_xiwm
JOB_FOLDER_PREFIX = 'awx_%s_'

View File

@ -20,6 +20,7 @@ from awx import __version__ as awx_application_version
from awx.api.versioning import reverse
from awx.main.managers import InstanceManager, InstanceGroupManager, UUID_DEFAULT
from awx.main.fields import JSONField
from awx.main.constants import JOB_FOLDER_PREFIX
from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils.common import get_corrected_cpu, get_cpu_effective_capacity, get_corrected_memory, get_mem_effective_capacity
@ -155,6 +156,22 @@ class Instance(HasPolicyEditsMixin, BaseModel):
Instance.objects.filter(enabled=True, capacity__gt=0).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True)
)
def get_cleanup_task_kwargs(self, **kwargs):
"""
Produce options to use for the command: ansible-runner worker cleanup
returns a dict that is passed to the python interface for the runner method corresponding to that command
any kwargs will override that key=value combination in the returned dict
"""
vargs = dict(file_pattern='/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*'))
vargs.update(kwargs)
if 'exclude_strings' not in vargs and vargs.get('file_pattern'):
active_pks = list(UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).values_list('pk', flat=True))
if active_pks:
vargs['exclude_strings'] = [JOB_FOLDER_PREFIX % job_id for job_id in active_pks]
if 'remove_images' in vargs or 'image_prune' in vargs:
vargs.setdefault('process_isolation_executable', 'podman')
return vargs
def is_lost(self, ref_time=None):
if self.last_seen is None:
return True

View File

@ -58,7 +58,7 @@ from awx.main.models import (
from awx.main.constants import CENSOR_VALUE
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks import update_inventory_computed_fields
from awx.main.tasks import update_inventory_computed_fields, handle_removed_image
from awx.main.fields import (
is_implicit_parent,
update_role_parentage_for_instance,
@ -624,10 +624,26 @@ def deny_orphaned_approvals(sender, instance, **kwargs):
approval.deny()
def _handle_image_cleanup(removed_image, pk):
if (not removed_image) or ExecutionEnvironment.objects.filter(image=removed_image).exclude(pk=pk).exists():
return # if other EE objects reference the tag, then do not purge it
handle_removed_image.delay(remove_images=[removed_image])
@receiver(pre_delete, sender=ExecutionEnvironment)
def remove_default_ee(sender, instance, **kwargs):
if instance.id == getattr(settings.DEFAULT_EXECUTION_ENVIRONMENT, 'id', None):
settings.DEFAULT_EXECUTION_ENVIRONMENT = None
_handle_image_cleanup(instance.image, instance.pk)
@receiver(post_save, sender=ExecutionEnvironment)
def remove_stale_image(sender, instance, created, **kwargs):
if created:
return
removed_image = instance._prior_values_store.get('image')
if removed_image and removed_image != instance.image:
_handle_image_cleanup(removed_image, instance.pk)
@receiver(post_save, sender=Session)

View File

@ -11,6 +11,8 @@ import importlib
import json
import logging
import os
from io import StringIO
from contextlib import redirect_stdout
import shutil
import stat
import tempfile
@ -27,7 +29,6 @@ import socket
import threading
import concurrent.futures
from base64 import b64encode
import subprocess
import sys
# Django
@ -51,13 +52,14 @@ from gitdb.exc import BadName as BadGitName
# Runner
import ansible_runner
import ansible_runner.cleanup
# dateutil
from dateutil.parser import parse as parse_date
# AWX
from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV, MINIMAL_EVENTS, JOB_FOLDER_PREFIX
from awx.main.access import access_registry
from awx.main.redact import UriCleaner
from awx.main.models import (
@ -106,7 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
@ -390,22 +392,42 @@ def purge_old_stdout_files():
logger.debug("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT, f)))
@task(queue=get_local_queuename)
def cleanup_execution_environment_images():
def _cleanup_images_and_files(**kwargs):
if settings.IS_K8S:
return
process = subprocess.run('podman images --filter="dangling=true" --format json'.split(" "), capture_output=True)
if process.returncode != 0:
logger.debug("Cleanup execution environment images: could not get list of images")
return
if len(process.stdout) > 0:
images_system = json.loads(process.stdout)
for e in images_system:
image_name = e["Id"]
logger.debug(f"Cleanup execution environment images: deleting {image_name}")
process = subprocess.run(['podman', 'rmi', image_name, '-f'], stdout=subprocess.DEVNULL)
if process.returncode != 0:
logger.debug(f"Failed to delete image {image_name}")
this_inst = Instance.objects.me()
runner_cleanup_kwargs = this_inst.get_cleanup_task_kwargs(**kwargs)
stdout = ''
with StringIO() as buffer:
with redirect_stdout(buffer):
ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs)
stdout = buffer.getvalue()
if '(changed: True)' in stdout:
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
# if we are the first instance alphabetically, then run cleanup on execution nodes
checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first()
if checker_instance and this_inst.hostname == checker_instance.hostname:
logger.info(f'Running execution node cleanup with kwargs {kwargs}')
for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0):
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
try:
stdout = worker_cleanup(inst.hostname, runner_cleanup_kwargs)
if '(changed: True)' in stdout:
logger.info(f'Performed cleanup on execution node {inst.hostname} with output:\n{stdout}')
except RuntimeError:
logger.exception(f'Error running cleanup on execution node {inst.hostname}')
@task(queue='tower_broadcast_all')
def handle_removed_image(remove_images=None):
"""Special broadcast invocation of this method to handle case of deleted EE"""
_cleanup_images_and_files(remove_images=remove_images, file_pattern='')
@task(queue=get_local_queuename)
def cleanup_images_and_files():
_cleanup_images_and_files()
@task(queue=get_local_queuename)
@ -441,7 +463,7 @@ def execution_node_health_check(node):
if instance.node_type != 'execution':
raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}')
data = worker_info(node, work_type='ansible-runner' if instance.node_type == 'execution' else 'local')
data = worker_info(node)
prior_capacity = instance.capacity
@ -980,7 +1002,7 @@ class BaseTask(object):
"""
Create a temporary directory for job-related files.
"""
path = tempfile.mkdtemp(prefix='awx_%s_' % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH)
path = tempfile.mkdtemp(prefix=JOB_FOLDER_PREFIX % instance.pk, dir=settings.AWX_ISOLATION_BASE_PATH)
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
if settings.AWX_CLEANUP_PATHS:
self.cleanup_paths.append(path)

View File

@ -0,0 +1,46 @@
import pytest
from awx.main.models.execution_environments import ExecutionEnvironment
@pytest.fixture
def cleanup_patch(mocker):
return mocker.patch('awx.main.signals.handle_removed_image')
@pytest.mark.django_db
def test_image_unchanged_no_delete_task(cleanup_patch):
"""When an irrelevant EE field is changed, we do not run the image cleanup task"""
execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
execution_environment.description = 'foobar'
execution_environment.save()
cleanup_patch.delay.assert_not_called()
@pytest.mark.django_db
def test_image_changed_creates_delete_task(cleanup_patch):
execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
execution_environment.image = 'quay.io/new/image'
execution_environment.save()
cleanup_patch.delay.assert_called_once_with(remove_images=['quay.io/foo/bar'])
@pytest.mark.django_db
def test_image_still_in_use(cleanup_patch):
"""When an image is still in use by another EE, we do not clean it up"""
ExecutionEnvironment.objects.create(name='unrelated-ee', image='quay.io/foo/bar')
execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
execution_environment.image = 'quay.io/new/image'
execution_environment.save()
cleanup_patch.delay.assert_not_called()
@pytest.mark.django_db
def test_image_deletion_creates_delete_task(cleanup_patch):
execution_environment = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
execution_environment.delete()
cleanup_patch.delay.assert_called_once_with(remove_images=['quay.io/foo/bar'])

View File

@ -89,3 +89,19 @@ class TestInstanceGroup(object):
assert ig.find_largest_idle_instance(instances_online_only) is None, reason
else:
assert ig.find_largest_idle_instance(instances_online_only) == instances[instance_fit_index], reason
def test_cleanup_params_defaults():
inst = Instance(hostname='foobar')
assert inst.get_cleanup_task_kwargs(exclude_strings=['awx_423_']) == {'exclude_strings': ['awx_423_'], 'file_pattern': '/tmp/awx_*_*'}
def test_cleanup_params_for_image_cleanup():
inst = Instance(hostname='foobar')
# see CLI conversion in awx.main.tests.unit.utils.test_receptor
assert inst.get_cleanup_task_kwargs(file_pattern='', remove_images=['quay.invalid/foo/bar'], image_prune=True) == {
'file_pattern': '',
'process_isolation_executable': 'podman',
'remove_images': ['quay.invalid/foo/bar'],
'image_prune': True,
}

View File

@ -0,0 +1,21 @@
from awx.main.utils.receptor import _convert_args_to_cli
def test_file_cleanup_scenario():
args = _convert_args_to_cli({'exclude_strings': ['awx_423_', 'awx_582_'], 'file_pattern': '/tmp/awx_*_*'})
assert ' '.join(args) == 'cleanup --exclude-strings=awx_423_ awx_582_ --file-pattern=/tmp/awx_*_*'
def test_image_cleanup_scenario():
# See input dict in awx.main.tests.unit.models.test_ha
args = _convert_args_to_cli(
{
'file_pattern': '',
'process_isolation_executable': 'podman',
'remove_images': ['quay.invalid/foo/bar:latest', 'quay.invalid/foo/bar:devel'],
'image_prune': True,
}
)
assert (
' '.join(args) == 'cleanup --remove-images=quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel --image-prune --process-isolation-executable=podman'
)

View File

@ -61,40 +61,48 @@ def get_conn_type(node_name, receptor_ctl):
return ReceptorConnectionType(node.get('ConnType'))
def worker_info(node_name, work_type='ansible-runner'):
receptor_ctl = get_receptor_ctl()
use_stream_tls = getattr(get_conn_type(node_name, receptor_ctl), 'name', None) == "STREAMTLS"
transmit_start = time.time()
error_list = []
data = {'errors': error_list, 'transmit_timing': 0.0}
class RemoteJobError(RuntimeError):
pass
kwargs = {}
kwargs['tlsclient'] = get_tls_client(use_stream_tls)
kwargs['signwork'] = True
if work_type != 'local':
kwargs['ttl'] = '20s'
result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs)
def run_until_complete(node, timing_data=None, **kwargs):
"""
Runs an ansible-runner work_type on remote node, waits until it completes, then returns stdout.
"""
receptor_ctl = get_receptor_ctl()
use_stream_tls = getattr(get_conn_type(node, receptor_ctl), 'name', None) == "STREAMTLS"
kwargs.setdefault('tlsclient', get_tls_client(use_stream_tls))
kwargs.setdefault('signwork', True)
kwargs.setdefault('ttl', '20s')
kwargs.setdefault('payload', '')
transmit_start = time.time()
result = receptor_ctl.submit_work(worktype='ansible-runner', node=node, **kwargs)
unit_id = result['unitid']
run_start = time.time()
data['transmit_timing'] = run_start - transmit_start
data['run_timing'] = 0.0
if timing_data:
timing_data['transmit_timing'] = run_start - transmit_start
run_timing = 0.0
stdout = ''
try:
resultfile = receptor_ctl.get_work_results(unit_id)
stdout = ''
while data['run_timing'] < 20.0:
while run_timing < 20.0:
status = receptor_ctl.simple_command(f'work status {unit_id}')
state_name = status.get('StateName')
if state_name not in ('Pending', 'Running'):
break
data['run_timing'] = time.time() - run_start
run_timing = time.time() - run_start
time.sleep(0.5)
else:
error_list.append(f'Timeout getting worker info on {node_name}, state remains in {state_name}')
raise RemoteJobError(f'Receptor job timeout on {node} after {run_timing} seconds, state remains in {state_name}')
if timing_data:
timing_data['run_timing'] = run_timing
stdout = resultfile.read()
stdout = str(stdout, encoding='utf-8')
@ -103,19 +111,27 @@ def worker_info(node_name, work_type='ansible-runner'):
res = receptor_ctl.simple_command(f"work release {unit_id}")
if res != {'released': unit_id}:
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node_name}, data: {res}')
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}')
receptor_ctl.close()
if state_name.lower() == 'failed':
work_detail = status.get('Detail', '')
if not work_detail.startswith('exit status'):
error_list.append(f'Receptor error getting worker info from {node_name}, detail:\n{work_detail}')
elif 'unrecognized arguments: --worker-info' in stdout:
error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info')
if work_detail:
raise RemoteJobError(f'Receptor error from {node}, detail:\n{work_detail}')
else:
error_list.append(f'Unknown ansible-runner error on node {node_name}, stdout:\n{stdout}')
else:
raise RemoteJobError(f'Unknown ansible-runner error on node {node}, stdout:\n{stdout}')
return stdout
def worker_info(node_name, work_type='ansible-runner'):
error_list = []
data = {'errors': error_list, 'transmit_timing': 0.0}
try:
stdout = run_until_complete(node=node_name, timing_data=data, params={"params": "--worker-info"})
yaml_stdout = stdout.strip()
remote_data = {}
try:
@ -129,6 +145,13 @@ def worker_info(node_name, work_type='ansible-runner'):
error_list.extend(remote_data.pop('errors', [])) # merge both error lists
data.update(remote_data)
except RemoteJobError as exc:
details = exc.args[0]
if 'unrecognized arguments: --worker-info' in details:
error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info')
else:
error_list.append(details)
# If we have a connection error, missing keys would be trivial consequence of that
if not data['errors']:
# see tasks.py usage of keys
@ -137,3 +160,32 @@ def worker_info(node_name, work_type='ansible-runner'):
data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys)))
return data
def _convert_args_to_cli(vargs):
"""
For the ansible-runner worker cleanup command
converts the dictionary (parsed argparse variables) used for python interface
into a string of CLI options, which has to be used on execution nodes.
"""
args = ['cleanup']
for option in ('exclude_strings', 'remove_images'):
if vargs.get(option):
args.append('--{}={}'.format(option.replace('_', '-'), ' '.join(vargs.get(option))))
for option in ('file_pattern', 'image_prune', 'process_isolation_executable', 'grace_period'):
if vargs.get(option) is True:
args.append('--{}'.format(option.replace('_', '-')))
elif vargs.get(option) not in (None, ''):
args.append('--{}={}'.format(option.replace('_', '-'), vargs.get(option)))
return args
def worker_cleanup(node_name, vargs, timeout=300.0):
args = _convert_args_to_cli(vargs)
remote_command = ' '.join(args)
logger.debug(f'Running command over receptor mesh on {node_name}: ansible-runner worker {remote_command}')
stdout = run_until_complete(node=node_name, params={"params": remote_command})
return stdout

View File

@ -441,7 +441,7 @@ CELERYBEAT_SCHEDULE = {
'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'receptor_reaper': {'task': 'awx.main.tasks.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
'cleanup_images': {'task': 'awx.main.tasks.cleanup_execution_environment_images', 'schedule': timedelta(hours=3)},
'cleanup_images': {'task': 'awx.main.tasks.cleanup_images_and_files', 'schedule': timedelta(hours=3)},
}
# Django Caching Configuration