Pull awx -> receptor job code into its own class

This commit is contained in:
Shane McDonald
2020-11-23 14:56:50 -05:00
committed by Shane McDonald
parent be8168b555
commit cf96275f1b
2 changed files with 90 additions and 81 deletions

View File

@@ -1433,7 +1433,6 @@ class BaseTask(object):
passwords = self.build_passwords(self.instance, kwargs) passwords = self.build_passwords(self.instance, kwargs)
self.build_extra_vars_file(self.instance, private_data_dir) self.build_extra_vars_file(self.instance, private_data_dir)
args = self.build_args(self.instance, private_data_dir, passwords) args = self.build_args(self.instance, private_data_dir, passwords)
cwd = self.build_cwd(self.instance, private_data_dir)
resource_profiling_params = self.build_params_resource_profiling(self.instance, resource_profiling_params = self.build_params_resource_profiling(self.instance,
private_data_dir) private_data_dir)
execution_environment_params = self.build_execution_environment_params(self.instance) execution_environment_params = self.build_execution_environment_params(self.instance)
@@ -1497,89 +1496,18 @@ class BaseTask(object):
self.dispatcher = CallbackQueueDispatcher() self.dispatcher = CallbackQueueDispatcher()
if not isinstance(self.instance, ProjectUpdate): if not isinstance(self.instance, ProjectUpdate):
worktype='worker' work_type='worker'
# TODO: container group jobs will not work with container isolation settings # TODO: container group jobs will not work with container isolation settings
# but both will run with same settings when worker_in and worker_out are added # but both will run with same settings when worker_in and worker_out are added
params['settings'].update(execution_environment_params) params['settings'].update(execution_environment_params)
else: else:
worktype='worker' work_type='worker'
params['settings'].update(execution_environment_params) params['settings'].update(execution_environment_params)
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
sockin, sockout = socket.socketpair()
# Spawned in a thread so Receptor can start reading before we finish writing, we
# write our payload to the left side of our socketpair.
def transmit(_socket):
ansible_runner.interface.run(streamer='transmit',
_output=_socket.makefile('wb'),
**params)
# Socket must be shutdown here, or the reader will hang forever.
_socket.shutdown(socket.SHUT_WR)
threading.Thread(target=transmit, args=[sockin]).start()
self.instance.log_lifecycle("running_playbook") self.instance.log_lifecycle("running_playbook")
# We establish a connection to the Receptor socket and submit our work, passing receptor_job = AWXReceptorJob(self, work_type, params)
# in the right side of our socketpair for reading. res = receptor_job.run()
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=worktype,
payload=sockout.makefile('rb'))
unit_id = result['unitid']
sockin.close()
sockout.close()
resultsock, resultfile = receptor_ctl.get_work_results(unit_id,
return_socket=True,
return_sockfile=True)
def processor():
return ansible_runner.interface.run(streamer='process',
quiet=True,
_input=resultfile,
event_handler=self.event_handler,
finished_callback=self.finished_callback,
status_handler=self.status_handler)
def cancel_watcher(processor_future):
while True:
if processor_future.done():
return processor_future.result()
if self.cancel_callback():
result = namedtuple('result', ['status', 'rc'])
return result('canceled', 1)
time.sleep(1)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
# We wait for the first one to return. If cancel_watcher returns first,
# we yank the socket out from underneath the processor, which will cause it
# to exit. A reference to the processor_future is passed into the cancel_watcher_future,
# Which exits if the job has finished normally. The context manager ensures we do not
# leave any threads laying around.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
processor_future = executor.submit(processor)
cancel_watcher_future = executor.submit(cancel_watcher, processor_future)
futures = [processor_future, cancel_watcher_future]
first_future = concurrent.futures.wait(futures,
return_when=concurrent.futures.FIRST_COMPLETED)
res = list(first_future.done)[0].result()
if res.status == 'canceled':
receptor_ctl.simple_command(f"work cancel {unit_id}")
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
elif res.status == 'error':
# TODO: There should be a more efficient way of getting this information
receptor_work_list = receptor_ctl.simple_command("work list")
raise RuntimeError(receptor_work_list[unit_id]['Detail'])
receptor_ctl.simple_command(f"work release {unit_id}")
status = res.status status = res.status
rc = res.rc rc = res.rc
@@ -3201,3 +3129,86 @@ def deep_copy_model_obj(
permission_check_func(creater, copy_mapping.values()) permission_check_func(creater, copy_mapping.values())
if isinstance(new_obj, Inventory): if isinstance(new_obj, Inventory):
update_inventory_computed_fields.delay(new_obj.id) update_inventory_computed_fields.delay(new_obj.id)
class AWXReceptorJob:
def __init__(self, task, work_type, runner_params):
self.task = task
self.work_type = work_type
self.runner_params = runner_params
def run(self):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
sockin, sockout = socket.socketpair()
threading.Thread(target=self.transmit, args=[sockin]).start()
# We establish a connection to the Receptor socket and submit our work, passing
# in the right side of our socketpair for reading.
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=self.work_type,
payload=sockout.makefile('rb'))
unit_id = result['unitid']
sockin.close()
sockout.close()
resultsock, resultfile = receptor_ctl.get_work_results(unit_id,
return_socket=True,
return_sockfile=True)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
# We wait for the first one to return. If cancel_watcher returns first,
# we yank the socket out from underneath the processor, which will cause it
# to exit. A reference to the processor_future is passed into the cancel_watcher_future,
# Which exits if the job has finished normally. The context manager ensures we do not
# leave any threads laying around.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
processor_future = executor.submit(self.processor, resultfile)
cancel_watcher_future = executor.submit(self.cancel_watcher, processor_future)
futures = [processor_future, cancel_watcher_future]
first_future = concurrent.futures.wait(futures,
return_when=concurrent.futures.FIRST_COMPLETED)
res = list(first_future.done)[0].result()
if res.status == 'canceled':
receptor_ctl.simple_command(f"work cancel {unit_id}")
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
elif res.status == 'error':
# TODO: There should be a more efficient way of getting this information
receptor_work_list = receptor_ctl.simple_command("work list")
raise RuntimeError(receptor_work_list[unit_id]['Detail'])
receptor_ctl.simple_command(f"work release {unit_id}")
return res
# Spawned in a thread so Receptor can start reading before we finish writing, we
# write our payload to the left side of our socketpair.
def transmit(self, _socket):
ansible_runner.interface.run(streamer='transmit',
_output=_socket.makefile('wb'),
**self.runner_params)
# Socket must be shutdown here, or the reader will hang forever.
_socket.shutdown(socket.SHUT_WR)
def processor(self, resultfile):
return ansible_runner.interface.run(streamer='process',
quiet=True,
_input=resultfile,
event_handler=self.task.event_handler,
finished_callback=self.task.finished_callback,
status_handler=self.task.status_handler)
def cancel_watcher(self, processor_future):
while True:
if processor_future.done():
return processor_future.result()
if self.task.cancel_callback():
result = namedtuple('result', ['status', 'rc'])
return result('canceled', 1)
time.sleep(1)

View File

@@ -200,15 +200,13 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential
inventory_update = inventory_source.create_unified_job() inventory_update = inventory_source.create_unified_job()
task = RunInventoryUpdate() task = RunInventoryUpdate()
def substitute_run(envvars=None, **_kw): def substitute_run(awx_receptor_job):
"""This method will replace run_pexpect """This method will replace run_pexpect
instead of running, it will read the private data directory contents instead of running, it will read the private data directory contents
It will make assertions that the contents are correct It will make assertions that the contents are correct
If MAKE_INVENTORY_REFERENCE_FILES is set, it will produce reference files If MAKE_INVENTORY_REFERENCE_FILES is set, it will produce reference files
""" """
if _kw.get('streamer') != 'transmit': envvars = awx_receptor_job.runner_params['envvars']
Res = namedtuple('Result', ['status', 'rc'])
return Res('successful', 0)
private_data_dir = envvars.pop('AWX_PRIVATE_DATA_DIR') private_data_dir = envvars.pop('AWX_PRIVATE_DATA_DIR')
assert envvars.pop('ANSIBLE_INVENTORY_ENABLED') == 'auto' assert envvars.pop('ANSIBLE_INVENTORY_ENABLED') == 'auto'
@@ -260,6 +258,6 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential
# Also do not send websocket status updates # Also do not send websocket status updates
with mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()): with mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()):
# The point of this test is that we replace run with assertions # The point of this test is that we replace run with assertions
with mock.patch('awx.main.tasks.ansible_runner.interface.run', substitute_run): with mock.patch('awx.main.tasks.AWXReceptorJob.run', substitute_run):
# so this sets up everything for a run and then yields control over to substitute_run # so this sets up everything for a run and then yields control over to substitute_run
task.run(inventory_update.pk) task.run(inventory_update.pk)