From 70f7a082bba06ab0d2229eeb04741284264e66a5 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Wed, 13 Jan 2021 20:02:45 -0500 Subject: [PATCH] Minimally functional container group v2 w/ receptor --- awx/main/tasks.py | 147 ++++++++++++++++++++++++----- awx/main/utils/common.py | 21 ++++- tools/docker-compose/receptor.conf | 2 +- 3 files changed, 146 insertions(+), 24 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 2da516aacd..ee630dcf2e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -78,7 +78,8 @@ from awx.main.dispatch import get_local_queuename, reaper from awx.main.utils import (update_scm_url, ignore_inventory_computed_fields, ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager, - get_awx_version) + get_awx_version, + deepmerge) from awx.main.utils.ansible import read_ansible_config from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja @@ -1379,7 +1380,6 @@ class BaseTask(object): args = self.build_args(self.instance, private_data_dir, passwords) resource_profiling_params = self.build_params_resource_profiling(self.instance, private_data_dir) - execution_environment_params = self.build_execution_environment_params(self.instance) env = self.build_env(self.instance, private_data_dir, isolated, private_data_files=private_data_files) self.safe_env = build_safe_env(env) @@ -1413,12 +1413,6 @@ class BaseTask(object): }, } - if containerized: - # We don't want HOME passed through to container groups. - # TODO: remove this conditional after everything is containerized - params['envvars'].pop('HOME', None) - - if isinstance(self.instance, AdHocCommand): params['module'] = self.build_module_name(self.instance) params['module_args'] = self.build_module_args(self.instance) @@ -1439,17 +1433,8 @@ class BaseTask(object): self.dispatcher = CallbackQueueDispatcher() - if not isinstance(self.instance, ProjectUpdate): - work_type='worker' - # TODO: container group jobs will not work with container isolation settings - # but both will run with same settings when worker_in and worker_out are added - params['settings'].update(execution_environment_params) - else: - work_type='worker' - params['settings'].update(execution_environment_params) - self.instance.log_lifecycle("running_playbook") - receptor_job = AWXReceptorJob(self, work_type, params) + receptor_job = AWXReceptorJob(self, params) res = receptor_job.run() status = res.status @@ -3045,11 +3030,14 @@ def deep_copy_model_obj( class AWXReceptorJob: - def __init__(self, task, work_type, runner_params): + def __init__(self, task, runner_params): self.task = task - self.work_type = work_type self.runner_params = runner_params + if not self.task.instance.is_container_group_task: + execution_environment_params = self.task.build_execution_environment_params(self.task.instance) + self.runner_params['settings'].update(execution_environment_params) + def run(self): # Create a socketpair. Where the left side will be used for writing our payload # (private data dir, kwargs). The right side will be passed to Receptor for @@ -3062,7 +3050,8 @@ class AWXReceptorJob: # in the right side of our socketpair for reading. receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') result = receptor_ctl.submit_work(worktype=self.work_type, - payload=sockout.makefile('rb')) + payload=sockout.makefile('rb'), + params=self.receptor_params) unit_id = result['unitid'] sockin.close() @@ -3094,7 +3083,6 @@ class AWXReceptorJob: receptor_work_list = receptor_ctl.simple_command("work list") raise RuntimeError(receptor_work_list[unit_id]['Detail']) - receptor_ctl.simple_command(f"work release {unit_id}") return res @@ -3117,6 +3105,31 @@ class AWXReceptorJob: status_handler=self.task.status_handler, **self.runner_params) + @property + def receptor_params(self): + receptor_params = {} + if self.task.instance.is_container_group_task: + spec_yaml = yaml.dump(self.pod_definition, explicit_start=True) + kubeconfig_yaml = yaml.dump(self.kube_config, explicit_start=True) + + receptor_params = { + "secret_kube_pod": spec_yaml, + "secret_kube_config": kubeconfig_yaml + } + + return receptor_params + + + + @property + def work_type(self): + if self.task.instance.is_container_group_task: + work_type = 'ocp' + else: + work_type = 'local' + + return work_type + def cancel_watcher(self, processor_future): while True: if processor_future.done(): @@ -3126,3 +3139,93 @@ class AWXReceptorJob: result = namedtuple('result', ['status', 'rc']) return result('canceled', 1) time.sleep(1) + + @property + def pod_definition(self): + default_pod_spec = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE + }, + "spec": { + "containers": [{ + "image": settings.AWX_CONTAINER_GROUP_DEFAULT_IMAGE, + "name": 'worker', + "args": ['ansible-runner', 'worker'] + }] + } + } + + pod_spec_override = {} + if self.task and self.task.instance.instance_group.pod_spec_override: + pod_spec_override = parse_yaml_or_json( + self.task.instance_group.pod_spec_override) + pod_spec = {**default_pod_spec, **pod_spec_override} + + if self.task: + pod_spec['metadata'] = deepmerge( + pod_spec.get('metadata', {}), + dict(name=self.pod_name, + labels={ + 'ansible-awx': settings.INSTALL_UUID, + 'ansible-awx-job-id': str(self.task.instance.id) + })) + + return pod_spec + + @property + def pod_name(self): + return f"awx-job-{self.task.instance.id}" + + @property + def credential(self): + return self.task.instance.instance_group.credential + + @property + def namespace(self): + return self.pod_definition['metadata']['namespace'] + + @property + def kube_config(self): + host_input = self.credential.get_input('host') + config = { + "apiVersion": "v1", + "kind": "Config", + "preferences": {}, + "clusters": [ + { + "name": host_input, + "cluster": { + "server": host_input + } + } + ], + "users": [ + { + "name": host_input, + "user": { + "token": self.credential.get_input('bearer_token') + } + } + ], + "contexts": [ + { + "name": host_input, + "context": { + "cluster": host_input, + "user": host_input, + "namespace": self.namespace + } + } + ], + "current-context": host_input + } + + if self.credential.get_input('verify_ssl') and 'ssl_ca_cert' in self.credential.inputs: + config["clusters"][0]["cluster"]["certificate-authority-data"] = b64encode( + self.credential.get_input('ssl_ca_cert').encode() # encode to bytes + ).decode() # decode the base64 data into a str + else: + config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] = True + return config diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 283a028f3f..ad90d5e4ec 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -55,7 +55,8 @@ __all__ = [ 'model_instance_diff', 'parse_yaml_or_json', 'RequireDebugTrueOrTest', 'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices', 'get_external_account', 'task_manager_bulk_reschedule', - 'schedule_task_manager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout' + 'schedule_task_manager', 'classproperty', 'create_temporary_fifo', 'truncate_stdout', + 'deepmerge' ] @@ -1079,3 +1080,21 @@ def truncate_stdout(stdout, size): set_count += 1 return stdout + u'\u001b[0m' * (set_count - reset_count) + + +def deepmerge(a, b): + """ + Merge dict structures and return the result. + + >>> a = {'first': {'all_rows': {'pass': 'dog', 'number': '1'}}} + >>> b = {'first': {'all_rows': {'fail': 'cat', 'number': '5'}}} + >>> import pprint; pprint.pprint(deepmerge(a, b)) + {'first': {'all_rows': {'fail': 'cat', 'number': '5', 'pass': 'dog'}}} + """ + if isinstance(a, dict) and isinstance(b, dict): + return dict([(k, deepmerge(a.get(k), b.get(k))) + for k in set(a.keys()).union(b.keys())]) + elif b is None: + return a + else: + return b diff --git a/tools/docker-compose/receptor.conf b/tools/docker-compose/receptor.conf index ba975b8820..4c498243e2 100644 --- a/tools/docker-compose/receptor.conf +++ b/tools/docker-compose/receptor.conf @@ -8,7 +8,7 @@ - local-only: - work-command: - worktype: worker + worktype: local command: ansible-runner params: worker