Exception handling to always release work units

This commit is contained in:
Alan Rominger 2021-01-25 15:15:14 -05:00 committed by Shane McDonald
parent f850f8d3e0
commit a435843f23

View File

@ -3038,6 +3038,17 @@ class AWXReceptorJob:
self.runner_params['settings'].update(execution_environment_params)
def run(self):
# We establish a connection to the Receptor socket
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
try:
return self._run_internal(receptor_ctl)
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None:
receptor_ctl.simple_command(f"work release {self.unit_id}")
def _run_internal(self, receptor_ctl):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
@ -3045,18 +3056,17 @@ class AWXReceptorJob:
threading.Thread(target=self.transmit, args=[sockin]).start()
# We establish a connection to the Receptor socket and submit our work, passing
# submit our work, passing
# in the right side of our socketpair for reading.
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=self.work_type,
payload=sockout.makefile('rb'),
params=self.receptor_params)
unit_id = result['unitid']
self.unit_id = result['unitid']
sockin.close()
sockout.close()
resultsock, resultfile = receptor_ctl.get_work_results(unit_id,
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id,
return_socket=True,
return_sockfile=True)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
@ -3074,15 +3084,14 @@ class AWXReceptorJob:
res = list(first_future.done)[0].result()
if res.status == 'canceled':
receptor_ctl.simple_command(f"work cancel {unit_id}")
receptor_ctl.simple_command(f"work cancel {self.unit_id}")
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
elif res.status == 'error':
# TODO: There should be a more efficient way of getting this information
receptor_work_list = receptor_ctl.simple_command("work list")
raise RuntimeError(receptor_work_list[unit_id]['Detail'])
raise RuntimeError(receptor_work_list[self.unit_id]['Detail'])
receptor_ctl.simple_command(f"work release {unit_id}")
return res
# Spawned in a thread so Receptor can start reading before we finish writing, we