diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 53bf2fdbbe..0f65b877da 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -106,7 +106,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import get_receptor_ctl, worker_info +from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -3049,6 +3049,9 @@ class AWXReceptorJob: _kw = {} if self.work_type == 'ansible-runner': _kw['node'] = self.task.instance.execution_node + use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS" + _kw['tlsclient'] = get_tls_client(use_stream_tls) + result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, **_kw) self.unit_id = result['unitid'] self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid']) diff --git a/awx/main/utils/receptor.py b/awx/main/utils/receptor.py index 9781d92720..fa1cb8f4cc 100644 --- a/awx/main/utils/receptor.py +++ b/awx/main/utils/receptor.py @@ -4,13 +4,22 @@ import time from receptorctl.socket_interface import ReceptorControl +from enum import Enum, unique logger = logging.getLogger('awx.main.utils.receptor') +__RECEPTOR_CONF = '/etc/receptor/receptor.conf' + + +@unique +class ReceptorConnectionType(Enum): + DATAGRAM = 0 + STREAM = 1 + STREAMTLS = 2 + def get_receptor_sockfile(): - receptor_conf = '/etc/receptor/receptor.conf' - with open(receptor_conf, 'r') as f: + with open(__RECEPTOR_CONF, 'r') as f: data = yaml.safe_load(f) for section in data: for entry_name, entry_data in section.items(): @@ -18,23 +27,49 @@ def get_receptor_sockfile(): if 'filename' in entry_data: return entry_data['filename'] else: - raise RuntimeError(f'Receptor conf {receptor_conf} control-service entry does not have a filename parameter') + raise RuntimeError(f'Receptor conf {__RECEPTOR_CONF} control-service entry does not have a filename parameter') else: - raise RuntimeError(f'Receptor conf {receptor_conf} does not have control-service entry needed to get sockfile') + raise RuntimeError(f'Receptor conf {__RECEPTOR_CONF} does not have control-service entry needed to get sockfile') + + +def get_tls_client(use_stream_tls=None): + if not use_stream_tls: + return None + + with open(__RECEPTOR_CONF, 'r') as f: + data = yaml.safe_load(f) + for section in data: + for entry_name, entry_data in section.items(): + if entry_name == 'tls-client': + if 'name' in entry_data: + return entry_data['name'] + return None def get_receptor_ctl(): receptor_sockfile = get_receptor_sockfile() - return ReceptorControl(receptor_sockfile) + try: + return ReceptorControl(receptor_sockfile, config=__RECEPTOR_CONF, tlsclient=get_tls_client(True)) + except RuntimeError: + return ReceptorControl(receptor_sockfile) + + +def get_conn_type(node_name, receptor_ctl): + all_nodes = receptor_ctl.simple_command("status").get('Advertisements', None) + for node in all_nodes: + if node.get('NodeID') == node_name: + return ReceptorConnectionType(node.get('ConnType')) def worker_info(node_name, work_type='ansible-runner'): receptor_ctl = get_receptor_ctl() + use_stream_tls = get_conn_type(node_name, receptor_ctl).name == "STREAMTLS" transmit_start = time.time() error_list = [] data = {'errors': error_list, 'transmit_timing': 0.0} kwargs = {} + kwargs['tlsclient'] = get_tls_client(use_stream_tls) if work_type != 'local': kwargs['ttl'] = '20s' result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs)