Respect settings to keep files and work units

Add new logic to cleanup orphaned work units
  from administrative tasks

Remove noisy log which is often irrelevant
  about running-cleanup-on-execution-nodes
  we already have other logs for this
This commit is contained in:
Alan Rominger 2021-10-12 13:42:21 -04:00 committed by Shane McDonald
parent 1660900914
commit 7b35902d33
No known key found for this signature in database
GPG Key ID: 6F374AF6E9EB9374
4 changed files with 54 additions and 16 deletions

View File

@ -162,7 +162,9 @@ class Instance(HasPolicyEditsMixin, BaseModel):
returns a dict that is passed to the python interface for the runner method corresponding to that command
any kwargs will override that key=value combination in the returned dict
"""
vargs = dict(file_pattern='/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*'))
vargs = dict()
if settings.AWX_CLEANUP_PATHS:
vargs['file_pattern'] = '/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*')
vargs.update(kwargs)
if 'exclude_strings' not in vargs and vargs.get('file_pattern'):
active_pks = list(UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).values_list('pk', flat=True))

View File

@ -108,7 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
@ -397,20 +397,22 @@ def _cleanup_images_and_files(**kwargs):
return
this_inst = Instance.objects.me()
runner_cleanup_kwargs = this_inst.get_cleanup_task_kwargs(**kwargs)
stdout = ''
with StringIO() as buffer:
with redirect_stdout(buffer):
ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs)
stdout = buffer.getvalue()
if '(changed: True)' in stdout:
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
if runner_cleanup_kwargs:
stdout = ''
with StringIO() as buffer:
with redirect_stdout(buffer):
ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs)
stdout = buffer.getvalue()
if '(changed: True)' in stdout:
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
# if we are the first instance alphabetically, then run cleanup on execution nodes
checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first()
if checker_instance and this_inst.hostname == checker_instance.hostname:
logger.info(f'Running execution node cleanup with kwargs {kwargs}')
for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0):
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
if not runner_cleanup_kwargs:
continue
try:
stdout = worker_cleanup(inst.hostname, runner_cleanup_kwargs)
if '(changed: True)' in stdout:
@ -649,6 +651,8 @@ def awx_receptor_workunit_reaper():
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
receptor_ctl.simple_command(f"work release {job.work_unit_id}")
administrative_workunit_reaper(receptor_work_list)
@task(queue=get_local_queuename)
def awx_k8s_reaper():
@ -3184,7 +3188,7 @@ class AWXReceptorJob:
receptor_params["secret_kube_config"] = kubeconfig_yaml
else:
private_data_dir = self.runner_params['private_data_dir']
if self.work_type == 'ansible-runner':
if self.work_type == 'ansible-runner' and settings.AWX_CLEANUP_PATHS:
# on execution nodes, we rely on the private data dir being deleted
cli_params = f"--private-data-dir={private_data_dir} --delete"
else:

View File

@ -3,6 +3,7 @@ import yaml
import time
from receptorctl.socket_interface import ReceptorControl
from django.conf import settings
from enum import Enum, unique
@ -11,6 +12,8 @@ logger = logging.getLogger('awx.main.utils.receptor')
__RECEPTOR_CONF = '/etc/receptor/receptor.conf'
RECEPTOR_ACTIVE_STATES = ('Pending', 'Running')
@unique
class ReceptorConnectionType(Enum):
@ -62,6 +65,32 @@ def get_conn_type(node_name, receptor_ctl):
return ReceptorConnectionType(node.get('ConnType'))
def administrative_workunit_reaper(work_list=None):
"""
This releases completed work units that were spawned by actions inside of this module
specifically, this should catch any completed work unit left by
- worker_info
- worker_cleanup
These should ordinarily be released when the method finishes, but this is a
cleanup of last-resort, in case something went awry
"""
receptor_ctl = get_receptor_ctl()
if work_list is None:
work_list = receptor_ctl.simple_command("work list")
for unit_id, work_data in work_list.items():
extra_data = work_data.get('ExtraData')
if (extra_data is None) or (extra_data.get('RemoteWorkType') != 'ansible-runner'):
continue # if this is not ansible-runner work, we do not want to touch it
params = extra_data.get('RemoteParams', {}).get('params')
if not (params == '--worker-info' or params.startswith('cleanup')):
continue # if this is not a cleanup or health check, we do not want to touch it
if work_data.get('StateName') in RECEPTOR_ACTIVE_STATES:
continue # do not want to touch active work units
logger.info(f'Reaping orphaned work unit {unit_id} with params {params}')
receptor_ctl.simple_command(f"work release {unit_id}")
class RemoteJobError(RuntimeError):
pass
@ -95,7 +124,7 @@ def run_until_complete(node, timing_data=None, **kwargs):
while run_timing < 20.0:
status = receptor_ctl.simple_command(f'work status {unit_id}')
state_name = status.get('StateName')
if state_name not in ('Pending', 'Running'):
if state_name not in RECEPTOR_ACTIVE_STATES:
break
run_timing = time.time() - run_start
time.sleep(0.5)
@ -110,9 +139,10 @@ def run_until_complete(node, timing_data=None, **kwargs):
finally:
res = receptor_ctl.simple_command(f"work release {unit_id}")
if res != {'released': unit_id}:
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}')
if settings.RECEPTOR_RELEASE_WORK:
res = receptor_ctl.simple_command(f"work release {unit_id}")
if res != {'released': unit_id}:
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}')
receptor_ctl.close()

View File

@ -68,7 +68,6 @@ DATABASES = {
# the K8S cluster where awx itself is running)
IS_K8S = False
RECEPTOR_RELEASE_WORK = True
AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10
AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default')
# Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s".
@ -931,6 +930,9 @@ AWX_CALLBACK_PROFILE = False
# Delete temporary directories created to store playbook run-time
AWX_CLEANUP_PATHS = True
# Delete completed work units in receptor
RECEPTOR_RELEASE_WORK = True
MIDDLEWARE = [
'django_guid.middleware.GuidMiddleware',
'awx.main.middleware.TimingMiddleware',