mirror of
https://github.com/ansible/awx.git
synced 2026-02-23 22:16:00 -03:30
Merge pull request #11819 from shanemcd/transmitter-future
Reimplement transmitter thread as future
This commit is contained in:
@@ -7,8 +7,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import socket
|
import socket
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
@@ -247,16 +245,6 @@ def worker_cleanup(node_name, vargs, timeout=300.0):
|
|||||||
return stdout
|
return stdout
|
||||||
|
|
||||||
|
|
||||||
class TransmitterThread(threading.Thread):
|
|
||||||
def run(self):
|
|
||||||
self.exc = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
super().run()
|
|
||||||
except Exception:
|
|
||||||
self.exc = sys.exc_info()
|
|
||||||
|
|
||||||
|
|
||||||
class AWXReceptorJob:
|
class AWXReceptorJob:
|
||||||
def __init__(self, task, runner_params=None):
|
def __init__(self, task, runner_params=None):
|
||||||
self.task = task
|
self.task = task
|
||||||
@@ -296,41 +284,43 @@ class AWXReceptorJob:
|
|||||||
# reading.
|
# reading.
|
||||||
sockin, sockout = socket.socketpair()
|
sockin, sockout = socket.socketpair()
|
||||||
|
|
||||||
transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin])
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||||
transmitter_thread.start()
|
transmitter_future = executor.submit(self.transmit, sockin)
|
||||||
|
|
||||||
# submit our work, passing
|
_kw = {}
|
||||||
# in the right side of our socketpair for reading.
|
if self.work_type == 'ansible-runner':
|
||||||
_kw = {}
|
_kw['node'] = self.task.instance.execution_node
|
||||||
if self.work_type == 'ansible-runner':
|
use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS"
|
||||||
_kw['node'] = self.task.instance.execution_node
|
_kw['tlsclient'] = get_tls_client(use_stream_tls)
|
||||||
use_stream_tls = get_conn_type(_kw['node'], receptor_ctl).name == "STREAMTLS"
|
|
||||||
_kw['tlsclient'] = get_tls_client(use_stream_tls)
|
|
||||||
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw)
|
|
||||||
self.unit_id = result['unitid']
|
|
||||||
# Update the job with the work unit in-memory so that the log_lifecycle
|
|
||||||
# will print out the work unit that is to be associated with the job in the database
|
|
||||||
# via the update_model() call.
|
|
||||||
# We want to log the work_unit_id as early as possible. A failure can happen in between
|
|
||||||
# when we start the job in receptor and when we associate the job <-> work_unit_id.
|
|
||||||
# In that case, there will be work running in receptor and Controller will not know
|
|
||||||
# which Job it is associated with.
|
|
||||||
# We do not programatically handle this case. Ideally, we would handle this with a reaper case.
|
|
||||||
# The two distinct job lifecycle log events below allow for us to at least detect when this
|
|
||||||
# edge case occurs. If the lifecycle event work_unit_id_received occurs without the
|
|
||||||
# work_unit_id_assigned event then this case may have occured.
|
|
||||||
self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only
|
|
||||||
self.task.instance.log_lifecycle("work_unit_id_received")
|
|
||||||
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
|
|
||||||
self.task.instance.log_lifecycle("work_unit_id_assigned")
|
|
||||||
|
|
||||||
sockin.close()
|
# submit our work, passing in the right side of our socketpair for reading.
|
||||||
sockout.close()
|
result = receptor_ctl.submit_work(
|
||||||
|
worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params, signwork=self.sign_work, **_kw
|
||||||
|
)
|
||||||
|
|
||||||
if transmitter_thread.exc:
|
sockin.close()
|
||||||
raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2])
|
sockout.close()
|
||||||
|
|
||||||
transmitter_thread.join()
|
self.unit_id = result['unitid']
|
||||||
|
# Update the job with the work unit in-memory so that the log_lifecycle
|
||||||
|
# will print out the work unit that is to be associated with the job in the database
|
||||||
|
# via the update_model() call.
|
||||||
|
# We want to log the work_unit_id as early as possible. A failure can happen in between
|
||||||
|
# when we start the job in receptor and when we associate the job <-> work_unit_id.
|
||||||
|
# In that case, there will be work running in receptor and Controller will not know
|
||||||
|
# which Job it is associated with.
|
||||||
|
# We do not programatically handle this case. Ideally, we would handle this with a reaper case.
|
||||||
|
# The two distinct job lifecycle log events below allow for us to at least detect when this
|
||||||
|
# edge case occurs. If the lifecycle event work_unit_id_received occurs without the
|
||||||
|
# work_unit_id_assigned event then this case may have occured.
|
||||||
|
self.task.instance.work_unit_id = result['unitid'] # Set work_unit_id in-memory only
|
||||||
|
self.task.instance.log_lifecycle("work_unit_id_received")
|
||||||
|
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
|
||||||
|
self.task.instance.log_lifecycle("work_unit_id_assigned")
|
||||||
|
|
||||||
|
# Throws an exception if the transmit failed.
|
||||||
|
# Will be caught by the try/except in BaseTask#run.
|
||||||
|
transmitter_future.result()
|
||||||
|
|
||||||
# Artifacts are an output, but sometimes they are an input as well
|
# Artifacts are an output, but sometimes they are an input as well
|
||||||
# this is the case with fact cache, where clearing facts deletes a file, and this must be captured
|
# this is the case with fact cache, where clearing facts deletes a file, and this must be captured
|
||||||
|
|||||||
Reference in New Issue
Block a user