mirror of
https://github.com/ansible/awx.git
synced 2026-02-16 10:40:01 -03:30
Initial StreamTLS support for receptor nodes
This commit is contained in:
@@ -106,7 +106,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
|||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
from awx.main.utils.handlers import SpecialInventoryHandler
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
||||||
from awx.main.utils.receptor import get_receptor_ctl, worker_info
|
from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
from awx.main import analytics
|
from awx.main import analytics
|
||||||
from awx.conf import settings_registry
|
from awx.conf import settings_registry
|
||||||
@@ -3049,6 +3049,9 @@ class AWXReceptorJob:
|
|||||||
_kw = {}
|
_kw = {}
|
||||||
if self.work_type == 'ansible-runner':
|
if self.work_type == 'ansible-runner':
|
||||||
_kw['node'] = self.task.instance.execution_node
|
_kw['node'] = self.task.instance.execution_node
|
||||||
|
use_stream_tls = True if get_conn_type(_kw['node'], receptor_ctl) == 2 else False
|
||||||
|
_kw['tlsclient'] = get_tls_client(use_stream_tls)
|
||||||
|
|
||||||
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, **_kw)
|
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, **_kw)
|
||||||
self.unit_id = result['unitid']
|
self.unit_id = result['unitid']
|
||||||
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
|
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
|
||||||
|
|||||||
@@ -7,10 +7,11 @@ from receptorctl.socket_interface import ReceptorControl
|
|||||||
|
|
||||||
logger = logging.getLogger('awx.main.utils.receptor')
|
logger = logging.getLogger('awx.main.utils.receptor')
|
||||||
|
|
||||||
|
__RECEPTOR_CONF = '/etc/receptor/receptor.conf'
|
||||||
|
|
||||||
|
|
||||||
def get_receptor_sockfile():
|
def get_receptor_sockfile():
|
||||||
receptor_conf = '/etc/receptor/receptor.conf'
|
with open(__RECEPTOR_CONF, 'r') as f:
|
||||||
with open(receptor_conf, 'r') as f:
|
|
||||||
data = yaml.safe_load(f)
|
data = yaml.safe_load(f)
|
||||||
for section in data:
|
for section in data:
|
||||||
for entry_name, entry_data in section.items():
|
for entry_name, entry_data in section.items():
|
||||||
@@ -18,23 +19,54 @@ def get_receptor_sockfile():
|
|||||||
if 'filename' in entry_data:
|
if 'filename' in entry_data:
|
||||||
return entry_data['filename']
|
return entry_data['filename']
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f'Receptor conf {receptor_conf} control-service entry does not have a filename parameter')
|
raise RuntimeError(f'Receptor conf {__RECEPTOR_CONF} control-service entry does not have a filename parameter')
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f'Receptor conf {receptor_conf} does not have control-service entry needed to get sockfile')
|
raise RuntimeError(f'Receptor conf {__RECEPTOR_CONF} does not have control-service entry needed to get sockfile')
|
||||||
|
|
||||||
|
|
||||||
|
def get_tls_client(use_stream_tls=None):
|
||||||
|
if not use_stream_tls:
|
||||||
|
return None
|
||||||
|
|
||||||
|
with open(__RECEPTOR_CONF, 'r') as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
for section in data:
|
||||||
|
for entry_name, entry_data in section.items():
|
||||||
|
if entry_name == 'tls-client':
|
||||||
|
if 'name' in entry_data:
|
||||||
|
return entry_data['name']
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_receptor_ctl():
|
def get_receptor_ctl():
|
||||||
receptor_sockfile = get_receptor_sockfile()
|
receptor_sockfile = get_receptor_sockfile()
|
||||||
return ReceptorControl(receptor_sockfile)
|
try:
|
||||||
|
return ReceptorControl(receptor_sockfile, config=__RECEPTOR_CONF, tlsclient=get_tls_client(True))
|
||||||
|
except RuntimeError:
|
||||||
|
return ReceptorControl(receptor_sockfile)
|
||||||
|
|
||||||
|
|
||||||
|
def get_conn_type(node_name, receptor_ctl):
|
||||||
|
"""
|
||||||
|
ConnType 0: Datagram
|
||||||
|
ConnType 1: Stream
|
||||||
|
ConnType 2: StreamTLS
|
||||||
|
"""
|
||||||
|
all_nodes = receptor_ctl.simple_command("status").get('Advertisements', None)
|
||||||
|
for node in all_nodes:
|
||||||
|
if node.get('NodeID') == node_name:
|
||||||
|
return node.get('ConnType')
|
||||||
|
|
||||||
|
|
||||||
def worker_info(node_name, work_type='ansible-runner'):
|
def worker_info(node_name, work_type='ansible-runner'):
|
||||||
receptor_ctl = get_receptor_ctl()
|
receptor_ctl = get_receptor_ctl()
|
||||||
|
use_stream_tls = True if get_conn_type(node_name, receptor_ctl) == 2 else False
|
||||||
transmit_start = time.time()
|
transmit_start = time.time()
|
||||||
error_list = []
|
error_list = []
|
||||||
data = {'errors': error_list, 'transmit_timing': 0.0}
|
data = {'errors': error_list, 'transmit_timing': 0.0}
|
||||||
|
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
|
kwargs['tlsclient'] = get_tls_client(use_stream_tls)
|
||||||
if work_type != 'local':
|
if work_type != 'local':
|
||||||
kwargs['ttl'] = '20s'
|
kwargs['ttl'] = '20s'
|
||||||
result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs)
|
result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs)
|
||||||
|
|||||||
Reference in New Issue
Block a user