Begin integrating receptor

This commit is contained in:
Shane McDonald
2020-11-12 16:34:18 -05:00
committed by Shane McDonald
parent 521d3d5edb
commit f1df4c54f8
7 changed files with 99 additions and 58 deletions

View File

@@ -23,6 +23,9 @@ import fcntl
from pathlib import Path
from uuid import uuid4
import urllib.parse as urlparse
import socket
import threading
import concurrent.futures
# Django
from django.conf import settings
@@ -49,6 +52,9 @@ from gitdb.exc import BadName as BadGitName
# Runner
import ansible_runner
# Receptor
from receptorctl.socket_interface import ReceptorControl
# AWX
from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
@@ -1453,15 +1459,10 @@ class BaseTask(object):
params = {
'ident': self.instance.id,
'private_data_dir': private_data_dir,
'project_dir': cwd,
'playbook': self.build_playbook_path_relative_to_cwd(self.instance, private_data_dir),
'inventory': self.build_inventory(self.instance, private_data_dir),
'passwords': expect_passwords,
'envvars': env,
'event_handler': self.event_handler,
'cancel_callback': self.cancel_callback,
'finished_callback': self.finished_callback,
'status_handler': self.status_handler,
'settings': {
'job_timeout': self.get_instance_timeout(self.instance),
'suppress_ansible_output': True,
@@ -1473,10 +1474,7 @@ class BaseTask(object):
# We don't want HOME passed through to container groups.
# TODO: remove this conditional after everything is containerized
params['envvars'].pop('HOME', None)
else:
# TODO: container group jobs will not work with container isolation settings
# but both will run with same settings when worker_in and worker_out are added
params['settings'].update(execution_environment_params)
if isinstance(self.instance, AdHocCommand):
params['module'] = self.build_module_name(self.instance)
@@ -1497,39 +1495,85 @@ class BaseTask(object):
del params[v]
self.dispatcher = CallbackQueueDispatcher()
if self.instance.is_isolated() or containerized:
module_args = None
if 'module_args' in params:
# if it's adhoc, copy the module args
module_args = ansible_runner.utils.args2cmdline(
params.get('module_args'),
)
# TODO on merge: delete if https://github.com/ansible/awx/pull/8185 is merged
if not os.path.exists(os.path.join(private_data_dir, 'inventory')):
shutil.move(
params.pop('inventory'),
os.path.join(private_data_dir, 'inventory')
)
ansible_runner.utils.dump_artifacts(params)
isolated_manager_instance = isolated_manager.IsolatedManager(
self.event_handler,
canceled_callback=lambda: self.update_model(self.instance.pk).cancel_flag,
check_callback=self.check_handler,
pod_manager=pod_manager
)
status, rc = isolated_manager_instance.run(self.instance,
private_data_dir,
params.get('playbook'),
params.get('module'),
module_args,
ident=str(self.instance.pk))
self.finished_callback(None)
if not isinstance(self.instance, ProjectUpdate):
worktype='worker'
# TODO: container group jobs will not work with container isolation settings
# but both will run with same settings when worker_in and worker_out are added
params['settings'].update(execution_environment_params)
else:
res = ansible_runner.interface.run(**params)
status = res.status
rc = res.rc
worktype='worker'
params['settings'].update(execution_environment_params)
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
sockin, sockout = socket.socketpair()
# Spawned in a thread so Receptor can start reading before we finish writing, we
# write our payload to the left side of our socketpair.
def transmit(_socket):
ansible_runner.interface.run(streamer='transmit',
_output=_socket.makefile('wb'),
**params)
# Socket must be shutdown here, or the reader will hang forever.
_socket.shutdown(socket.SHUT_WR)
threading.Thread(target=transmit, args=[sockin]).start()
self.instance.log_lifecycle("running_playbook")
# We establish a connection to the Receptor socket and submit our work, passing
# in the right side of our socketpair for reading.
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=worktype,
payload=sockout.makefile('rb'))
sockin.close()
sockout.close()
resultsock, resultfile = receptor_ctl.get_work_results(result['unitid'],
return_socket=True,
return_sockfile=True)
def processor():
return ansible_runner.interface.run(streamer='process',
quiet=True,
_input=resultfile,
event_handler=self.event_handler,
finished_callback=self.finished_callback,
status_handler=self.status_handler)
def cancel_watcher(processor_future):
while True:
if processor_future.done():
return
if self.cancel_callback():
result = namedtuple('result', ['status', 'rc'])
return result('canceled', 1)
time.sleep(1)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
# We wait for the first one to return. If cancel_watcher returns first,
# we yank the socket out from underneath the processor, which will cause it
# to exit. A reference to the processor_future is passed into the cancel_watcher_future,
# Which exits if the job has finished normally. The context manager ensures we do not
# leave any threads laying around.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
processor_future = executor.submit(processor)
cancel_watcher_future = executor.submit(cancel_watcher, processor_future)
futures = [processor_future, cancel_watcher_future]
first_future = concurrent.futures.wait(futures,
return_when=concurrent.futures.FIRST_COMPLETED)
res = list(first_future.done)[0].result()
if res.status == 'canceled':
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
status = res.status
rc = res.rc
if status == 'timeout':
self.instance.job_explanation = "Job terminated due to timeout"

View File

@@ -206,6 +206,10 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential
It will make assertions that the contents are correct
If MAKE_INVENTORY_REFERENCE_FILES is set, it will produce reference files
"""
if _kw.get('streamer') != 'transmit':
Res = namedtuple('Result', ['status', 'rc'])
return Res('successful', 0)
private_data_dir = envvars.pop('AWX_PRIVATE_DATA_DIR')
assert envvars.pop('ANSIBLE_INVENTORY_ENABLED') == 'auto'
set_files = bool(os.getenv("MAKE_INVENTORY_REFERENCE_FILES", 'false').lower()[0] not in ['f', '0'])