mirror of
https://github.com/ansible/awx.git
synced 2026-03-18 01:17:35 -02:30
Merge pull request #10212 from sarabrajsingh/bugfix/create-seperator-worker-thread-for-transmit-jobs-9952
created a TransmitterThread class to start transmit jobs within. Surr… SUMMARY Surfaced this bug while fixing issue #9952. Receptor-ctl would hang trying to read from a socket pair that is never closed, when the transmit job was executed yet threw an exception. This anomaly occurred when using a k8s cluster (Openshift in our case) as the execution environment. FYI - for now, this only applies to container groups. We moved the transmit function to execute in its own thread, and rejoin the calling function (_run_internal) when it finishes transmitting, and bubbling up exceptions if any occurred in the thread. ISSUE TYPE Bugfix Pull Request COMPONENT NAME API AWX VERSION awx: 19.1.0 ADDITIONAL INFORMATION Steps to reproduce bug: Create an ansible project with some files (or roles) and commit to a git repository Purposely commit a broken symlink on any file in the project files to git Create a credential set that authenticates to your k8s cluster in AWX Create a container instance group that leverages the credentials from Step 3 in AWX Create a project in AWX that leverages the ansible project/git repo from Step 1 Create a job template that leverages the AWX project from Step 4 and set your EE to the container instance group from Step 3 Execute the job from Step 5 Reviewed-by: Shane McDonald <me@shanemcd.com>
This commit is contained in:
@@ -28,6 +28,7 @@ import threading
|
|||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -2893,6 +2894,16 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, u
|
|||||||
update_inventory_computed_fields.delay(new_obj.id)
|
update_inventory_computed_fields.delay(new_obj.id)
|
||||||
|
|
||||||
|
|
||||||
|
class TransmitterThread(threading.Thread):
|
||||||
|
def run(self):
|
||||||
|
self.exc = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
super().run()
|
||||||
|
except Exception:
|
||||||
|
self.exc = sys.exc_info()
|
||||||
|
|
||||||
|
|
||||||
class AWXReceptorJob:
|
class AWXReceptorJob:
|
||||||
def __init__(self, task=None, runner_params=None):
|
def __init__(self, task=None, runner_params=None):
|
||||||
self.task = task
|
self.task = task
|
||||||
@@ -2920,7 +2931,8 @@ class AWXReceptorJob:
|
|||||||
# reading.
|
# reading.
|
||||||
sockin, sockout = socket.socketpair()
|
sockin, sockout = socket.socketpair()
|
||||||
|
|
||||||
threading.Thread(target=self.transmit, args=[sockin]).start()
|
transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin])
|
||||||
|
transmitter_thread.start()
|
||||||
|
|
||||||
# submit our work, passing
|
# submit our work, passing
|
||||||
# in the right side of our socketpair for reading.
|
# in the right side of our socketpair for reading.
|
||||||
@@ -2930,6 +2942,11 @@ class AWXReceptorJob:
|
|||||||
sockin.close()
|
sockin.close()
|
||||||
sockout.close()
|
sockout.close()
|
||||||
|
|
||||||
|
if transmitter_thread.exc:
|
||||||
|
raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2])
|
||||||
|
|
||||||
|
transmitter_thread.join()
|
||||||
|
|
||||||
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)
|
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)
|
||||||
# Both "processor" and "cancel_watcher" are spawned in separate threads.
|
# Both "processor" and "cancel_watcher" are spawned in separate threads.
|
||||||
# We wait for the first one to return. If cancel_watcher returns first,
|
# We wait for the first one to return. If cancel_watcher returns first,
|
||||||
@@ -2960,7 +2977,6 @@ class AWXReceptorJob:
|
|||||||
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
||||||
self.task.update_model(self.task.instance.pk, status='pending')
|
self.task.update_model(self.task.instance.pk, status='pending')
|
||||||
return
|
return
|
||||||
|
|
||||||
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
||||||
# is saved via the status_handler passed in to the processor.
|
# is saved via the status_handler passed in to the processor.
|
||||||
if state_name == 'Succeeded':
|
if state_name == 'Succeeded':
|
||||||
@@ -2976,10 +2992,11 @@ class AWXReceptorJob:
|
|||||||
if not settings.IS_K8S and self.work_type == 'local':
|
if not settings.IS_K8S and self.work_type == 'local':
|
||||||
self.runner_params['only_transmit_kwargs'] = True
|
self.runner_params['only_transmit_kwargs'] = True
|
||||||
|
|
||||||
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
|
try:
|
||||||
|
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
|
||||||
# Socket must be shutdown here, or the reader will hang forever.
|
finally:
|
||||||
_socket.shutdown(socket.SHUT_WR)
|
# Socket must be shutdown here, or the reader will hang forever.
|
||||||
|
_socket.shutdown(socket.SHUT_WR)
|
||||||
|
|
||||||
def processor(self, resultfile):
|
def processor(self, resultfile):
|
||||||
return ansible_runner.interface.run(
|
return ansible_runner.interface.run(
|
||||||
|
|||||||
Reference in New Issue
Block a user