diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 994209aaf2..c3c9983656 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -28,6 +28,7 @@ import threading import concurrent.futures from base64 import b64encode import subprocess +import sys # Django from django.conf import settings @@ -2893,6 +2894,16 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, u update_inventory_computed_fields.delay(new_obj.id) +class TransmitterThread(threading.Thread): + def run(self): + self.exc = None + + try: + super().run() + except Exception: + self.exc = sys.exc_info() + + class AWXReceptorJob: def __init__(self, task=None, runner_params=None): self.task = task @@ -2920,7 +2931,8 @@ class AWXReceptorJob: # reading. sockin, sockout = socket.socketpair() - threading.Thread(target=self.transmit, args=[sockin]).start() + transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin]) + transmitter_thread.start() # submit our work, passing # in the right side of our socketpair for reading. @@ -2930,6 +2942,11 @@ class AWXReceptorJob: sockin.close() sockout.close() + if transmitter_thread.exc: + raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2]) + + transmitter_thread.join() + resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True) # Both "processor" and "cancel_watcher" are spawned in separate threads. # We wait for the first one to return. If cancel_watcher returns first, @@ -2960,7 +2977,6 @@ class AWXReceptorJob: logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") self.task.update_model(self.task.instance.pk, status='pending') return - # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if state_name == 'Succeeded': @@ -2976,10 +2992,11 @@ class AWXReceptorJob: if not settings.IS_K8S and self.work_type == 'local': self.runner_params['only_transmit_kwargs'] = True - ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params) - - # Socket must be shutdown here, or the reader will hang forever. - _socket.shutdown(socket.SHUT_WR) + try: + ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params) + finally: + # Socket must be shutdown here, or the reader will hang forever. + _socket.shutdown(socket.SHUT_WR) def processor(self, resultfile): return ansible_runner.interface.run(