Reimplement transmitter thread as future

This avoids the need for an explicit `.join()`, and removes the need for the TransmitterThread wrapper class.
This commit is contained in:
Shane McDonald 2022-02-28 08:28:36 -05:00
parent cb57752903
commit 2df3ca547b

View File

@ -7,8 +7,6 @@ import logging
import os
import shutil
import socket
import sys
import threading
import time
import yaml
@ -247,16 +245,6 @@ def worker_cleanup(node_name, vargs, timeout=300.0):
return stdout
class TransmitterThread(threading.Thread):
def run(self):
self.exc = None
try:
super().run()
except Exception:
self.exc = sys.exc_info()
class AWXReceptorJob:
def __init__(self, task, runner_params=None):
self.task = task
@ -296,41 +284,43 @@ class AWXReceptorJob:
# reading.
sockin, sockout = socket.socketpair()
transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin])
transmitter_thread.start()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
transmitter_future = executor.submit(self.transmit, sockin)
# submit our work, passing
# in the right side of our socketpair for reading.
_kw = {}
if self.work_type == 'ansible-runner':
_kw['node'] = self.task.instance.execution_node
use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS"
_kw['tlsclient'] = get_tls_client(use_stream_tls)
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw)
self.unit_id = result['unitid']
# Update the job with the work unit in-memory so that the log_lifecycle
# will print out the work unit that is to be associated with the job in the database
# via the update_model() call.
# We want to log the work_unit_id as early as possible. A failure can happen in between
# when we start the job in receptor and when we associate the job <-> work_unit_id.
# In that case, there will be work running in receptor and Controller will not know
# which Job it is associated with.
# We do not programatically handle this case. Ideally, we would handle this with a reaper case.
# The two distinct job lifecycle log events below allow for us to at least detect when this
# edge case occurs. If the lifecycle event work_unit_id_received occurs without the
# work_unit_id_assigned event then this case may have occured.
self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only
self.task.instance.log_lifecycle("work_unit_id_received")
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
self.task.instance.log_lifecycle("work_unit_id_assigned")
_kw = {}
if self.work_type == 'ansible-runner':
_kw['node'] = self.task.instance.execution_node
use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS"
_kw['tlsclient'] = get_tls_client(use_stream_tls)
sockin.close()
sockout.close()
# submit our work, passing in the right side of our socketpair for reading.
result = receptor_ctl.submit_work(
worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw
)
if transmitter_thread.exc:
raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2])
sockin.close()
sockout.close()
transmitter_thread.join()
self.unit_id = result['unitid']
# Update the job with the work unit in-memory so that the log_lifecycle
# will print out the work unit that is to be associated with the job in the database
# via the update_model() call.
# We want to log the work_unit_id as early as possible. A failure can happen in between
# when we start the job in receptor and when we associate the job <-> work_unit_id.
# In that case, there will be work running in receptor and Controller will not know
# which Job it is associated with.
# We do not programatically handle this case. Ideally, we would handle this with a reaper case.
# The two distinct job lifecycle log events below allow for us to at least detect when this
# edge case occurs. If the lifecycle event work_unit_id_received occurs without the
# work_unit_id_assigned event then this case may have occured.
self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only
self.task.instance.log_lifecycle("work_unit_id_received")
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
self.task.instance.log_lifecycle("work_unit_id_assigned")
# Throws an exception if the transmit failed.
# Will be caught by the try/except in BaseTask#run.
transmitter_future.result()
# Artifacts are an output, but sometimes they are an input as well
# this is the case with fact cache, where clearing facts deletes a file, and this must be captured