created a TransmitterThread class to start transmit jobs within. Surrounded ansible_runner.interface.run() in a try/except block to prevent the call from hanging on a socket pair that doesn't close

This commit is contained in:
Sarabraj Singh 2021-05-17 14:35:13 -04:00
parent 2c2aaa7fea
commit 12cea1191e

View File

@ -28,6 +28,7 @@ import threading
import concurrent.futures
from base64 import b64encode
import subprocess
import sys
# Django
from django.conf import settings
@ -2893,6 +2894,16 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, u
update_inventory_computed_fields.delay(new_obj.id)
class TransmitterThread(threading.Thread):
def run(self):
self.exc = None
try:
super().run()
except Exception:
self.exc = sys.exc_info()
class AWXReceptorJob:
def __init__(self, task=None, runner_params=None):
self.task = task
@ -2920,7 +2931,8 @@ class AWXReceptorJob:
# reading.
sockin, sockout = socket.socketpair()
threading.Thread(target=self.transmit, args=[sockin]).start()
transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin])
transmitter_thread.start()
# submit our work, passing
# in the right side of our socketpair for reading.
@ -2930,6 +2942,11 @@ class AWXReceptorJob:
sockin.close()
sockout.close()
if transmitter_thread.exc:
raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2])
transmitter_thread.join()
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
# We wait for the first one to return. If cancel_watcher returns first,
@ -2960,7 +2977,6 @@ class AWXReceptorJob:
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
self.task.update_model(self.task.instance.pk, status='pending')
return
# If ansible-runner ran, but an error occured at runtime, the traceback information
# is saved via the status_handler passed in to the processor.
if state_name == 'Succeeded':
@ -2976,10 +2992,11 @@ class AWXReceptorJob:
if not settings.IS_K8S and self.work_type == 'local':
self.runner_params['only_transmit_kwargs'] = True
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
# Socket must be shutdown here, or the reader will hang forever.
_socket.shutdown(socket.SHUT_WR)
try:
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
finally:
# Socket must be shutdown here, or the reader will hang forever.
_socket.shutdown(socket.SHUT_WR)
def processor(self, resultfile):
return ansible_runner.interface.run(