Minimally functional container group v2 w/ receptor

This commit is contained in:
Shane McDonald
2021-01-13 20:02:45 -05:00
parent 9df29e8fc4
commit 70f7a082bb
3 changed files with 146 additions and 24 deletions

View File

@@ -78,7 +78,8 @@ from awx.main.dispatch import get_local_queuename, reaper
from awx.main.utils import (update_scm_url,
ignore_inventory_computed_fields,
ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager,
get_awx_version)
get_awx_version,
deepmerge)
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.external_logging import reconfigure_rsyslog
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
@@ -1379,7 +1380,6 @@ class BaseTask(object):
args = self.build_args(self.instance, private_data_dir, passwords)
resource_profiling_params = self.build_params_resource_profiling(self.instance,
private_data_dir)
execution_environment_params = self.build_execution_environment_params(self.instance)
env = self.build_env(self.instance, private_data_dir, isolated,
private_data_files=private_data_files)
self.safe_env = build_safe_env(env)
@@ -1413,12 +1413,6 @@ class BaseTask(object):
},
}
if containerized:
# We don't want HOME passed through to container groups.
# TODO: remove this conditional after everything is containerized
params['envvars'].pop('HOME', None)
if isinstance(self.instance, AdHocCommand):
params['module'] = self.build_module_name(self.instance)
params['module_args'] = self.build_module_args(self.instance)
@@ -1439,17 +1433,8 @@ class BaseTask(object):
self.dispatcher = CallbackQueueDispatcher()
if not isinstance(self.instance, ProjectUpdate):
work_type='worker'
# TODO: container group jobs will not work with container isolation settings
# but both will run with same settings when worker_in and worker_out are added
params['settings'].update(execution_environment_params)
else:
work_type='worker'
params['settings'].update(execution_environment_params)
self.instance.log_lifecycle("running_playbook")
receptor_job = AWXReceptorJob(self, work_type, params)
receptor_job = AWXReceptorJob(self, params)
res = receptor_job.run()
status = res.status
@@ -3045,11 +3030,14 @@ def deep_copy_model_obj(
class AWXReceptorJob:
def __init__(self, task, work_type, runner_params):
def __init__(self, task, runner_params):
self.task = task
self.work_type = work_type
self.runner_params = runner_params
if not self.task.instance.is_container_group_task:
execution_environment_params = self.task.build_execution_environment_params(self.task.instance)
self.runner_params['settings'].update(execution_environment_params)
def run(self):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
@@ -3062,7 +3050,8 @@ class AWXReceptorJob:
# in the right side of our socketpair for reading.
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=self.work_type,
payload=sockout.makefile('rb'))
payload=sockout.makefile('rb'),
params=self.receptor_params)
unit_id = result['unitid']
sockin.close()
@@ -3094,7 +3083,6 @@ class AWXReceptorJob:
receptor_work_list = receptor_ctl.simple_command("work list")
raise RuntimeError(receptor_work_list[unit_id]['Detail'])
receptor_ctl.simple_command(f"work release {unit_id}")
return res
@@ -3117,6 +3105,31 @@ class AWXReceptorJob:
status_handler=self.task.status_handler,
**self.runner_params)
@property
def receptor_params(self):
receptor_params = {}
if self.task.instance.is_container_group_task:
spec_yaml = yaml.dump(self.pod_definition, explicit_start=True)
kubeconfig_yaml = yaml.dump(self.kube_config, explicit_start=True)
receptor_params = {
"secret_kube_pod": spec_yaml,
"secret_kube_config": kubeconfig_yaml
}
return receptor_params
@property
def work_type(self):
if self.task.instance.is_container_group_task:
work_type = 'ocp'
else:
work_type = 'local'
return work_type
def cancel_watcher(self, processor_future):
while True:
if processor_future.done():
@@ -3126,3 +3139,93 @@ class AWXReceptorJob:
result = namedtuple('result', ['status', 'rc'])
return result('canceled', 1)
time.sleep(1)
@property
def pod_definition(self):
default_pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE
},
"spec": {
"containers": [{
"image": settings.AWX_CONTAINER_GROUP_DEFAULT_IMAGE,
"name": 'worker',
"args": ['ansible-runner', 'worker']
}]
}
}
pod_spec_override = {}
if self.task and self.task.instance.instance_group.pod_spec_override:
pod_spec_override = parse_yaml_or_json(
self.task.instance_group.pod_spec_override)
pod_spec = {**default_pod_spec, **pod_spec_override}
if self.task:
pod_spec['metadata'] = deepmerge(
pod_spec.get('metadata', {}),
dict(name=self.pod_name,
labels={
'ansible-awx': settings.INSTALL_UUID,
'ansible-awx-job-id': str(self.task.instance.id)
}))
return pod_spec
@property
def pod_name(self):
return f"awx-job-{self.task.instance.id}"
@property
def credential(self):
return self.task.instance.instance_group.credential
@property
def namespace(self):
return self.pod_definition['metadata']['namespace']
@property
def kube_config(self):
host_input = self.credential.get_input('host')
config = {
"apiVersion": "v1",
"kind": "Config",
"preferences": {},
"clusters": [
{
"name": host_input,
"cluster": {
"server": host_input
}
}
],
"users": [
{
"name": host_input,
"user": {
"token": self.credential.get_input('bearer_token')
}
}
],
"contexts": [
{
"name": host_input,
"context": {
"cluster": host_input,
"user": host_input,
"namespace": self.namespace
}
}
],
"current-context": host_input
}
if self.credential.get_input('verify_ssl') and 'ssl_ca_cert' in self.credential.inputs:
config["clusters"][0]["cluster"]["certificate-authority-data"] = b64encode(
self.credential.get_input('ssl_ca_cert').encode() # encode to bytes
).decode() # decode the base64 data into a str
else:
config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] = True
return config

View File

@@ -55,7 +55,8 @@ __all__ = [
'model_instance_diff', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError',
'get_custom_venv_choices', 'get_external_account', 'task_manager_bulk_reschedule',
'schedule_task_manager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout'
'schedule_task_manager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout',
'deepmerge'
]
@@ -1079,3 +1080,21 @@ def truncate_stdout(stdout, size):
set_count += 1
return stdout + u'\u001b[0m' * (set_count - reset_count)
def deepmerge(a, b):
"""
Merge dict structures and return the result.
>>> a = {'first': {'all_rows': {'pass': 'dog', 'number': '1'}}}
>>> b = {'first': {'all_rows': {'fail': 'cat', 'number': '5'}}}
>>> import pprint; pprint.pprint(deepmerge(a, b))
{'first': {'all_rows': {'fail': 'cat', 'number': '5', 'pass': 'dog'}}}
"""
if isinstance(a, dict) and isinstance(b, dict):
return dict([(k, deepmerge(a.get(k), b.get(k)))
for k in set(a.keys()).union(b.keys())])
elif b is None:
return a
else:
return b

View File

@@ -8,7 +8,7 @@
- local-only:
- work-command:
worktype: worker
worktype: local
command: ansible-runner
params: worker