diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 4cb0a543a2..0321a50c80 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -7,8 +7,6 @@ import logging import os import shutil import socket -import sys -import threading import time import yaml @@ -247,16 +245,6 @@ def worker_cleanup(node_name, vargs, timeout=300.0): return stdout -class TransmitterThread(threading.Thread): - def run(self): - self.exc = None - - try: - super().run() - except Exception: - self.exc = sys.exc_info() - - class AWXReceptorJob: def __init__(self, task, runner_params=None): self.task = task @@ -296,41 +284,43 @@ class AWXReceptorJob: # reading. sockin, sockout = socket.socketpair() - transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin]) - transmitter_thread.start() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + transmitter_future = executor.submit(self.transmit, sockin) - # submit our work, passing - # in the right side of our socketpair for reading. - _kw = {} - if self.work_type == 'ansible-runner': - _kw['node'] = self.task.instance.execution_node - use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS" - _kw['tlsclient'] = get_tls_client(use_stream_tls) - result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw) - self.unit_id = result['unitid'] - # Update the job with the work unit in-memory so that the log_lifecycle - # will print out the work unit that is to be associated with the job in the database - # via the update_model() call. - # We want to log the work_unit_id as early as possible. A failure can happen in between - # when we start the job in receptor and when we associate the job <-> work_unit_id. - # In that case, there will be work running in receptor and Controller will not know - # which Job it is associated with. - # We do not programatically handle this case. Ideally, we would handle this with a reaper case. - # The two distinct job lifecycle log events below allow for us to at least detect when this - # edge case occurs. If the lifecycle event work_unit_id_received occurs without the - # work_unit_id_assigned event then this case may have occured. - self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only - self.task.instance.log_lifecycle("work_unit_id_received") - self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid']) - self.task.instance.log_lifecycle("work_unit_id_assigned") + _kw = {} + if self.work_type == 'ansible-runner': + _kw['node'] = self.task.instance.execution_node + use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS" + _kw['tlsclient'] = get_tls_client(use_stream_tls) - sockin.close() - sockout.close() + # submit our work, passing in the right side of our socketpair for reading. + result = receptor_ctl.submit_work( + worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw + ) - if transmitter_thread.exc: - raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2]) + sockin.close() + sockout.close() - transmitter_thread.join() + self.unit_id = result['unitid'] + # Update the job with the work unit in-memory so that the log_lifecycle + # will print out the work unit that is to be associated with the job in the database + # via the update_model() call. + # We want to log the work_unit_id as early as possible. A failure can happen in between + # when we start the job in receptor and when we associate the job <-> work_unit_id. + # In that case, there will be work running in receptor and Controller will not know + # which Job it is associated with. + # We do not programatically handle this case. Ideally, we would handle this with a reaper case. + # The two distinct job lifecycle log events below allow for us to at least detect when this + # edge case occurs. If the lifecycle event work_unit_id_received occurs without the + # work_unit_id_assigned event then this case may have occured. + self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only + self.task.instance.log_lifecycle("work_unit_id_received") + self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid']) + self.task.instance.log_lifecycle("work_unit_id_assigned") + + # Throws an exception if the transmit failed. + # Will be caught by the try/except in BaseTask#run. + transmitter_future.result() # Artifacts are an output, but sometimes they are an input as well # this is the case with fact cache, where clearing facts deletes a file, and this must be captured