diff --git a/awx/main/tasks.py b/awx/main/tasks.py index ee1f67caba..35f4e8df38 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -96,6 +96,7 @@ from awx.main.utils import ( get_awx_version, deepmerge, parse_yaml_or_json, + cleanup_new_process, ) from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec from awx.main.utils.ansible import read_ansible_config @@ -2989,6 +2990,7 @@ class AWXReceptorJob: # Spawned in a thread so Receptor can start reading before we finish writing, we # write our payload to the left side of our socketpair. + @cleanup_new_process def transmit(self, _socket): if not settings.IS_K8S and self.work_type == 'local': self.runner_params['only_transmit_kwargs'] = True @@ -2999,6 +3001,7 @@ class AWXReceptorJob: # Socket must be shutdown here, or the reader will hang forever. _socket.shutdown(socket.SHUT_WR) + @cleanup_new_process def processor(self, resultfile): return ansible_runner.interface.run( streamer='process', @@ -3040,6 +3043,7 @@ class AWXReceptorJob: return work_type + @cleanup_new_process def cancel_watcher(self, processor_future): while True: if processor_future.done(): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index b9690fd641..543f351d4e 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -26,6 +26,8 @@ from django.db.models.fields.related import ForeignObjectRel, ManyToManyField from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor, ManyToManyDescriptor from django.db.models.query import QuerySet from django.db.models import Q +from django.db import connection as django_connection +from django.core.cache import cache as django_cache # Django REST Framework from rest_framework.exceptions import ParseError @@ -85,6 +87,7 @@ __all__ = [ 'create_temporary_fifo', 'truncate_stdout', 'deepmerge', + 'cleanup_new_process', ] @@ -1019,3 +1022,17 @@ def deepmerge(a, b): return a else: return b + + +def cleanup_new_process(func): + """ + Cleanup django connection, cache connection, before executing new thread or processes entry point, func. + """ + + @wraps(func) + def wrapper_cleanup_new_process(*args, **kwargs): + django_connection.close() + django_cache.close() + return func(*args, **kwargs) + + return wrapper_cleanup_new_process