From cf96275f1b52f089892a1d648212d500841cca90 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Mon, 23 Nov 2020 14:56:50 -0500 Subject: [PATCH] Pull awx -> receptor job code into its own class --- awx/main/tasks.py | 163 ++++++++++-------- .../test_inventory_source_injectors.py | 8 +- 2 files changed, 90 insertions(+), 81 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index afa6982ada..74a414026c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1433,7 +1433,6 @@ class BaseTask(object): passwords = self.build_passwords(self.instance, kwargs) self.build_extra_vars_file(self.instance, private_data_dir) args = self.build_args(self.instance, private_data_dir, passwords) - cwd = self.build_cwd(self.instance, private_data_dir) resource_profiling_params = self.build_params_resource_profiling(self.instance, private_data_dir) execution_environment_params = self.build_execution_environment_params(self.instance) @@ -1497,89 +1496,18 @@ class BaseTask(object): self.dispatcher = CallbackQueueDispatcher() if not isinstance(self.instance, ProjectUpdate): - worktype='worker' + work_type='worker' # TODO: container group jobs will not work with container isolation settings # but both will run with same settings when worker_in and worker_out are added params['settings'].update(execution_environment_params) else: - worktype='worker' + work_type='worker' params['settings'].update(execution_environment_params) - # Create a socketpair. Where the left side will be used for writing our payload - # (private data dir, kwargs). The right side will be passed to Receptor for - # reading. - sockin, sockout = socket.socketpair() - - # Spawned in a thread so Receptor can start reading before we finish writing, we - # write our payload to the left side of our socketpair. - def transmit(_socket): - ansible_runner.interface.run(streamer='transmit', - _output=_socket.makefile('wb'), - **params) - - # Socket must be shutdown here, or the reader will hang forever. - _socket.shutdown(socket.SHUT_WR) - - threading.Thread(target=transmit, args=[sockin]).start() - - self.instance.log_lifecycle("running_playbook") - # We establish a connection to the Receptor socket and submit our work, passing - # in the right side of our socketpair for reading. - receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') - result = receptor_ctl.submit_work(worktype=worktype, - payload=sockout.makefile('rb')) - unit_id = result['unitid'] + receptor_job = AWXReceptorJob(self, work_type, params) + res = receptor_job.run() - sockin.close() - sockout.close() - - resultsock, resultfile = receptor_ctl.get_work_results(unit_id, - return_socket=True, - return_sockfile=True) - - def processor(): - return ansible_runner.interface.run(streamer='process', - quiet=True, - _input=resultfile, - event_handler=self.event_handler, - finished_callback=self.finished_callback, - status_handler=self.status_handler) - - def cancel_watcher(processor_future): - while True: - if processor_future.done(): - return processor_future.result() - - if self.cancel_callback(): - result = namedtuple('result', ['status', 'rc']) - return result('canceled', 1) - time.sleep(1) - - # Both "processor" and "cancel_watcher" are spawned in separate threads. - # We wait for the first one to return. If cancel_watcher returns first, - # we yank the socket out from underneath the processor, which will cause it - # to exit. A reference to the processor_future is passed into the cancel_watcher_future, - # Which exits if the job has finished normally. The context manager ensures we do not - # leave any threads laying around. - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - processor_future = executor.submit(processor) - cancel_watcher_future = executor.submit(cancel_watcher, processor_future) - futures = [processor_future, cancel_watcher_future] - first_future = concurrent.futures.wait(futures, - return_when=concurrent.futures.FIRST_COMPLETED) - - res = list(first_future.done)[0].result() - if res.status == 'canceled': - receptor_ctl.simple_command(f"work cancel {unit_id}") - resultsock.shutdown(socket.SHUT_RDWR) - resultfile.close() - elif res.status == 'error': - # TODO: There should be a more efficient way of getting this information - receptor_work_list = receptor_ctl.simple_command("work list") - raise RuntimeError(receptor_work_list[unit_id]['Detail']) - - receptor_ctl.simple_command(f"work release {unit_id}") status = res.status rc = res.rc @@ -3201,3 +3129,86 @@ def deep_copy_model_obj( permission_check_func(creater, copy_mapping.values()) if isinstance(new_obj, Inventory): update_inventory_computed_fields.delay(new_obj.id) + + +class AWXReceptorJob: + def __init__(self, task, work_type, runner_params): + self.task = task + self.work_type = work_type + self.runner_params = runner_params + + def run(self): + # Create a socketpair. Where the left side will be used for writing our payload + # (private data dir, kwargs). The right side will be passed to Receptor for + # reading. + sockin, sockout = socket.socketpair() + + threading.Thread(target=self.transmit, args=[sockin]).start() + + # We establish a connection to the Receptor socket and submit our work, passing + # in the right side of our socketpair for reading. + receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') + result = receptor_ctl.submit_work(worktype=self.work_type, + payload=sockout.makefile('rb')) + unit_id = result['unitid'] + + sockin.close() + sockout.close() + + resultsock, resultfile = receptor_ctl.get_work_results(unit_id, + return_socket=True, + return_sockfile=True) + # Both "processor" and "cancel_watcher" are spawned in separate threads. + # We wait for the first one to return. If cancel_watcher returns first, + # we yank the socket out from underneath the processor, which will cause it + # to exit. A reference to the processor_future is passed into the cancel_watcher_future, + # Which exits if the job has finished normally. The context manager ensures we do not + # leave any threads laying around. + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + processor_future = executor.submit(self.processor, resultfile) + cancel_watcher_future = executor.submit(self.cancel_watcher, processor_future) + futures = [processor_future, cancel_watcher_future] + first_future = concurrent.futures.wait(futures, + return_when=concurrent.futures.FIRST_COMPLETED) + + res = list(first_future.done)[0].result() + if res.status == 'canceled': + receptor_ctl.simple_command(f"work cancel {unit_id}") + resultsock.shutdown(socket.SHUT_RDWR) + resultfile.close() + elif res.status == 'error': + # TODO: There should be a more efficient way of getting this information + receptor_work_list = receptor_ctl.simple_command("work list") + raise RuntimeError(receptor_work_list[unit_id]['Detail']) + + + receptor_ctl.simple_command(f"work release {unit_id}") + return res + + # Spawned in a thread so Receptor can start reading before we finish writing, we + # write our payload to the left side of our socketpair. + def transmit(self, _socket): + ansible_runner.interface.run(streamer='transmit', + _output=_socket.makefile('wb'), + **self.runner_params) + + # Socket must be shutdown here, or the reader will hang forever. + _socket.shutdown(socket.SHUT_WR) + + def processor(self, resultfile): + return ansible_runner.interface.run(streamer='process', + quiet=True, + _input=resultfile, + event_handler=self.task.event_handler, + finished_callback=self.task.finished_callback, + status_handler=self.task.status_handler) + + def cancel_watcher(self, processor_future): + while True: + if processor_future.done(): + return processor_future.result() + + if self.task.cancel_callback(): + result = namedtuple('result', ['status', 'rc']) + return result('canceled', 1) + time.sleep(1) diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index c4f7e6a17d..f9edfdcd22 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -200,15 +200,13 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential inventory_update = inventory_source.create_unified_job() task = RunInventoryUpdate() - def substitute_run(envvars=None, **_kw): + def substitute_run(awx_receptor_job): """This method will replace run_pexpect instead of running, it will read the private data directory contents It will make assertions that the contents are correct If MAKE_INVENTORY_REFERENCE_FILES is set, it will produce reference files """ - if _kw.get('streamer') != 'transmit': - Res = namedtuple('Result', ['status', 'rc']) - return Res('successful', 0) + envvars = awx_receptor_job.runner_params['envvars'] private_data_dir = envvars.pop('AWX_PRIVATE_DATA_DIR') assert envvars.pop('ANSIBLE_INVENTORY_ENABLED') == 'auto' @@ -260,6 +258,6 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential # Also do not send websocket status updates with mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()): # The point of this test is that we replace run with assertions - with mock.patch('awx.main.tasks.ansible_runner.interface.run', substitute_run): + with mock.patch('awx.main.tasks.AWXReceptorJob.run', substitute_run): # so this sets up everything for a run and then yields control over to substitute_run task.run(inventory_update.pk)