mirror of
https://github.com/ansible/awx.git
synced 2026-02-23 14:05:59 -03:30
close db and cache connection in new threads
This commit is contained in:
@@ -95,6 +95,7 @@ from awx.main.utils import (
|
|||||||
get_awx_version,
|
get_awx_version,
|
||||||
deepmerge,
|
deepmerge,
|
||||||
parse_yaml_or_json,
|
parse_yaml_or_json,
|
||||||
|
cleanup_new_process,
|
||||||
)
|
)
|
||||||
from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec
|
from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec
|
||||||
from awx.main.utils.ansible import read_ansible_config
|
from awx.main.utils.ansible import read_ansible_config
|
||||||
@@ -2972,6 +2973,7 @@ class AWXReceptorJob:
|
|||||||
|
|
||||||
# Spawned in a thread so Receptor can start reading before we finish writing, we
|
# Spawned in a thread so Receptor can start reading before we finish writing, we
|
||||||
# write our payload to the left side of our socketpair.
|
# write our payload to the left side of our socketpair.
|
||||||
|
@cleanup_new_process
|
||||||
def transmit(self, _socket):
|
def transmit(self, _socket):
|
||||||
if not settings.IS_K8S and self.work_type == 'local':
|
if not settings.IS_K8S and self.work_type == 'local':
|
||||||
self.runner_params['only_transmit_kwargs'] = True
|
self.runner_params['only_transmit_kwargs'] = True
|
||||||
@@ -2981,6 +2983,7 @@ class AWXReceptorJob:
|
|||||||
# Socket must be shutdown here, or the reader will hang forever.
|
# Socket must be shutdown here, or the reader will hang forever.
|
||||||
_socket.shutdown(socket.SHUT_WR)
|
_socket.shutdown(socket.SHUT_WR)
|
||||||
|
|
||||||
|
@cleanup_new_process
|
||||||
def processor(self, resultfile):
|
def processor(self, resultfile):
|
||||||
return ansible_runner.interface.run(
|
return ansible_runner.interface.run(
|
||||||
streamer='process',
|
streamer='process',
|
||||||
@@ -3022,6 +3025,7 @@ class AWXReceptorJob:
|
|||||||
|
|
||||||
return work_type
|
return work_type
|
||||||
|
|
||||||
|
@cleanup_new_process
|
||||||
def cancel_watcher(self, processor_future):
|
def cancel_watcher(self, processor_future):
|
||||||
while True:
|
while True:
|
||||||
if processor_future.done():
|
if processor_future.done():
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ from django.db.models.fields.related import ForeignObjectRel, ManyToManyField
|
|||||||
from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor, ManyToManyDescriptor
|
from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor, ManyToManyDescriptor
|
||||||
from django.db.models.query import QuerySet
|
from django.db.models.query import QuerySet
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
|
from django.db import connection as django_connection
|
||||||
|
from django.core.cache import cache as django_cache
|
||||||
|
|
||||||
# Django REST Framework
|
# Django REST Framework
|
||||||
from rest_framework.exceptions import ParseError
|
from rest_framework.exceptions import ParseError
|
||||||
@@ -85,6 +87,7 @@ __all__ = [
|
|||||||
'create_temporary_fifo',
|
'create_temporary_fifo',
|
||||||
'truncate_stdout',
|
'truncate_stdout',
|
||||||
'deepmerge',
|
'deepmerge',
|
||||||
|
'cleanup_new_process',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@@ -1019,3 +1022,17 @@ def deepmerge(a, b):
|
|||||||
return a
|
return a
|
||||||
else:
|
else:
|
||||||
return b
|
return b
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_new_process(func):
|
||||||
|
"""
|
||||||
|
Cleanup django connection, cache connection, before executing new thread or processes entry point, func.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper_cleanup_new_process(*args, **kwargs):
|
||||||
|
django_connection.close()
|
||||||
|
django_cache.close()
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper_cleanup_new_process
|
||||||
|
|||||||
Reference in New Issue
Block a user