diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 1e9c6d983a..ea9f7d8d0e 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -162,7 +162,9 @@ class Instance(HasPolicyEditsMixin, BaseModel): returns a dict that is passed to the python interface for the runner method corresponding to that command any kwargs will override that key=value combination in the returned dict """ - vargs = dict(file_pattern='/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*')) + vargs = dict() + if settings.AWX_CLEANUP_PATHS: + vargs['file_pattern'] = '/tmp/{}*'.format(JOB_FOLDER_PREFIX % '*') vargs.update(kwargs) if 'exclude_strings' not in vargs and vargs.get('file_pattern'): active_pks = list(UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting')).values_list('pk', flat=True)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 42fb01d253..8cf1e39969 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,7 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup +from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -397,20 +397,22 @@ def _cleanup_images_and_files(**kwargs): return this_inst = Instance.objects.me() runner_cleanup_kwargs = this_inst.get_cleanup_task_kwargs(**kwargs) - stdout = '' - with StringIO() as buffer: - with redirect_stdout(buffer): - ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs) - stdout = buffer.getvalue() - if '(changed: True)' in stdout: - logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}') + if runner_cleanup_kwargs: + stdout = '' + with StringIO() as buffer: + with redirect_stdout(buffer): + ansible_runner.cleanup.run_cleanup(runner_cleanup_kwargs) + stdout = buffer.getvalue() + if '(changed: True)' in stdout: + logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}') # if we are the first instance alphabetically, then run cleanup on execution nodes checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first() if checker_instance and this_inst.hostname == checker_instance.hostname: - logger.info(f'Running execution node cleanup with kwargs {kwargs}') for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0): runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs) + if not runner_cleanup_kwargs: + continue try: stdout = worker_cleanup(inst.hostname, runner_cleanup_kwargs) if '(changed: True)' in stdout: @@ -649,6 +651,8 @@ def awx_receptor_workunit_reaper(): receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") receptor_ctl.simple_command(f"work release {job.work_unit_id}") + administrative_workunit_reaper(receptor_work_list) + @task(queue=get_local_queuename) def awx_k8s_reaper(): @@ -3184,7 +3188,7 @@ class AWXReceptorJob: receptor_params["secret_kube_config"] = kubeconfig_yaml else: private_data_dir = self.runner_params['private_data_dir'] - if self.work_type == 'ansible-runner': + if self.work_type == 'ansible-runner' and settings.AWX_CLEANUP_PATHS: # on execution nodes, we rely on the private data dir being deleted cli_params = f"--private-data-dir={private_data_dir} --delete" else: diff --git a/awx/main/utils/receptor.py b/awx/main/utils/receptor.py index b92b57c46a..cadf51a1b1 100644 --- a/awx/main/utils/receptor.py +++ b/awx/main/utils/receptor.py @@ -3,6 +3,7 @@ import yaml import time from receptorctl.socket_interface import ReceptorControl + from django.conf import settings from enum import Enum, unique @@ -11,6 +12,8 @@ logger = logging.getLogger('awx.main.utils.receptor') __RECEPTOR_CONF = '/etc/receptor/receptor.conf' +RECEPTOR_ACTIVE_STATES = ('Pending', 'Running') + @unique class ReceptorConnectionType(Enum): @@ -62,6 +65,32 @@ def get_conn_type(node_name, receptor_ctl): return ReceptorConnectionType(node.get('ConnType')) +def administrative_workunit_reaper(work_list=None): + """ + This releases completed work units that were spawned by actions inside of this module + specifically, this should catch any completed work unit left by + - worker_info + - worker_cleanup + These should ordinarily be released when the method finishes, but this is a + cleanup of last-resort, in case something went awry + """ + receptor_ctl = get_receptor_ctl() + if work_list is None: + work_list = receptor_ctl.simple_command("work list") + + for unit_id, work_data in work_list.items(): + extra_data = work_data.get('ExtraData') + if (extra_data is None) or (extra_data.get('RemoteWorkType') != 'ansible-runner'): + continue # if this is not ansible-runner work, we do not want to touch it + params = extra_data.get('RemoteParams', {}).get('params') + if not (params == '--worker-info' or params.startswith('cleanup')): + continue # if this is not a cleanup or health check, we do not want to touch it + if work_data.get('StateName') in RECEPTOR_ACTIVE_STATES: + continue # do not want to touch active work units + logger.info(f'Reaping orphaned work unit {unit_id} with params {params}') + receptor_ctl.simple_command(f"work release {unit_id}") + + class RemoteJobError(RuntimeError): pass @@ -95,7 +124,7 @@ def run_until_complete(node, timing_data=None, **kwargs): while run_timing < 20.0: status = receptor_ctl.simple_command(f'work status {unit_id}') state_name = status.get('StateName') - if state_name not in ('Pending', 'Running'): + if state_name not in RECEPTOR_ACTIVE_STATES: break run_timing = time.time() - run_start time.sleep(0.5) @@ -110,9 +139,10 @@ def run_until_complete(node, timing_data=None, **kwargs): finally: - res = receptor_ctl.simple_command(f"work release {unit_id}") - if res != {'released': unit_id}: - logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') + if settings.RECEPTOR_RELEASE_WORK: + res = receptor_ctl.simple_command(f"work release {unit_id}") + if res != {'released': unit_id}: + logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') receptor_ctl.close() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 3f949abeca..3fda6efff9 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -68,7 +68,6 @@ DATABASES = { # the K8S cluster where awx itself is running) IS_K8S = False -RECEPTOR_RELEASE_WORK = True AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10 AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default') # Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s". @@ -931,6 +930,9 @@ AWX_CALLBACK_PROFILE = False # Delete temporary directories created to store playbook run-time AWX_CLEANUP_PATHS = True +# Delete completed work units in receptor +RECEPTOR_RELEASE_WORK = True + MIDDLEWARE = [ 'django_guid.middleware.GuidMiddleware', 'awx.main.middleware.TimingMiddleware',