diff --git a/awx/api/metadata.py b/awx/api/metadata.py index dda07502a6..09255d8d70 100644 --- a/awx/api/metadata.py +++ b/awx/api/metadata.py @@ -2,6 +2,7 @@ # All Rights Reserved. from collections import OrderedDict +import yaml # Django from django.core.exceptions import PermissionDenied @@ -22,6 +23,7 @@ from rest_framework.request import clone_request # AWX from awx.main.fields import JSONField, ImplicitRoleField from awx.main.models import InventorySource, NotificationTemplate +from awx.main.scheduler.kubernetes import PodManager class Metadata(metadata.SimpleMetadata): @@ -200,6 +202,9 @@ class Metadata(metadata.SimpleMetadata): if not isinstance(meta, dict): continue + if field == "pod_spec_override": + meta['default'] = yaml.dump(PodManager().pod_definition) + # Add type choices if available from the serializer. if field == 'type' and hasattr(serializer, 'get_type_choices'): meta['choices'] = serializer.get_type_choices() diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 86dda8bb7e..f59317faa4 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -116,7 +116,7 @@ SUMMARIZABLE_FK_FIELDS = { 'project': DEFAULT_SUMMARY_FIELDS + ('status', 'scm_type'), 'source_project': DEFAULT_SUMMARY_FIELDS + ('status', 'scm_type'), 'project_update': DEFAULT_SUMMARY_FIELDS + ('status', 'failed',), - 'credential': DEFAULT_SUMMARY_FIELDS + ('kind', 'cloud', 'credential_type_id'), + 'credential': DEFAULT_SUMMARY_FIELDS + ('kind', 'cloud', 'kubernetes', 'credential_type_id'), 'job': DEFAULT_SUMMARY_FIELDS + ('status', 'failed', 'elapsed', 'type'), 'job_template': DEFAULT_SUMMARY_FIELDS, 'workflow_job_template': DEFAULT_SUMMARY_FIELDS, @@ -2515,7 +2515,7 @@ class CredentialSerializer(BaseSerializer): class Meta: model = Credential - fields = ('*', 'organization', 'credential_type', 'inputs', 'kind', 'cloud') + fields = ('*', 'organization', 'credential_type', 'inputs', 'kind', 'cloud', 'kubernetes') extra_kwargs = { 'credential_type': { 'label': _('Credential Type'), @@ -4755,8 +4755,9 @@ class InstanceGroupSerializer(BaseSerializer): fields = ("id", "type", "url", "related", "name", "created", "modified", "capacity", "committed_capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running", "jobs_total", - "instances", "controller", "is_controller", "is_isolated", - "policy_instance_percentage", "policy_instance_minimum", "policy_instance_list") + "instances", "controller", "is_controller", "is_isolated", "credential", + "policy_instance_percentage", "policy_instance_minimum", "policy_instance_list", + "pod_spec_override") def get_related(self, obj): res = super(InstanceGroupSerializer, self).get_related(obj) @@ -4764,6 +4765,9 @@ class InstanceGroupSerializer(BaseSerializer): res['instances'] = self.reverse('api:instance_group_instance_list', kwargs={'pk': obj.pk}) if obj.controller_id: res['controller'] = self.reverse('api:instance_group_detail', kwargs={'pk': obj.controller_id}) + if obj.credential: + res['credential'] = self.reverse('api:credential_detail', kwargs={'pk': obj.credential_id}) + return res def validate_policy_instance_list(self, value): @@ -4783,6 +4787,11 @@ class InstanceGroupSerializer(BaseSerializer): raise serializers.ValidationError(_('tower instance group name may not be changed.')) return value + def validate_credential(self, value): + if value and not value.kubernetes: + raise serializers.ValidationError(_('Only Kubernetes credentials can be associated with an Instance Group')) + return value + def get_capacity_dict(self): # Store capacity values (globally computed) in the context if 'capacity_map' not in self.context: diff --git a/awx/main/isolated/manager.py b/awx/main/isolated/manager.py index 322cf2a95f..d78587cf7b 100644 --- a/awx/main/isolated/manager.py +++ b/awx/main/isolated/manager.py @@ -11,7 +11,9 @@ from django.conf import settings import ansible_runner import awx -from awx.main.utils import get_system_task_capacity +from awx.main.utils import ( + get_system_task_capacity +) from awx.main.queue import CallbackQueueDispatcher logger = logging.getLogger('awx.isolated.manager') @@ -29,7 +31,7 @@ def set_pythonpath(venv_libdir, env): class IsolatedManager(object): - def __init__(self, cancelled_callback=None, check_callback=None): + def __init__(self, cancelled_callback=None, check_callback=None, pod_manager=None): """ :param cancelled_callback: a callable - which returns `True` or `False` - signifying if the job has been prematurely @@ -40,6 +42,24 @@ class IsolatedManager(object): self.idle_timeout = max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT) self.started_at = None self.captured_command_artifact = False + self.instance = None + self.pod_manager = pod_manager + + def build_inventory(self, hosts): + if self.instance and self.instance.is_containerized: + inventory = {'all': {'hosts': {}}} + for host in hosts: + inventory['all']['hosts'][host] = { + "ansible_connection": "kubectl", + "ansible_kubectl_config": self.pod_manager.kube_config + } + else: + inventory = '\n'.join([ + '{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME) + for host in hosts + ]) + + return inventory def build_runner_params(self, hosts, verbosity=1): env = dict(os.environ.items()) @@ -69,17 +89,12 @@ class IsolatedManager(object): else: playbook_logger.info(runner_obj.stdout.read()) - inventory = '\n'.join([ - '{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME) - for host in hosts - ]) - return { 'project_dir': os.path.abspath(os.path.join( os.path.dirname(awx.__file__), 'playbooks' )), - 'inventory': inventory, + 'inventory': self.build_inventory(hosts), 'envvars': env, 'finished_callback': finished_callback, 'verbosity': verbosity, @@ -153,6 +168,11 @@ class IsolatedManager(object): runner_obj = self.run_management_playbook('run_isolated.yml', self.private_data_dir, extravars=extravars) + + if runner_obj.status == 'failed': + self.instance.result_traceback = runner_obj.stdout.read() + self.instance.save(update_fields=['result_traceback']) + return runner_obj.status, runner_obj.rc def check(self, interval=None): @@ -175,6 +195,7 @@ class IsolatedManager(object): rc = None last_check = time.time() dispatcher = CallbackQueueDispatcher() + while status == 'failed': canceled = self.cancelled_callback() if self.cancelled_callback else False if not canceled and time.time() - last_check < interval: @@ -279,7 +300,6 @@ class IsolatedManager(object): def cleanup(self): - # If the job failed for any reason, make a last-ditch effort at cleanup extravars = { 'private_data_dir': self.private_data_dir, 'cleanup_dirs': [ @@ -393,6 +413,7 @@ class IsolatedManager(object): [instance.execution_node], verbosity=min(5, self.instance.verbosity) ) + status, rc = self.dispatch(playbook, module, module_args) if status == 'successful': status, rc = self.check() diff --git a/awx/main/managers.py b/awx/main/managers.py index d554792699..a2af79af8c 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -221,8 +221,9 @@ class InstanceGroupManager(models.Manager): elif t.status == 'running': # Subtract capacity from all groups that contain the instance if t.execution_node not in instance_ig_mapping: - logger.warning('Detected %s running inside lost instance, ' - 'may still be waiting for reaper.', t.log_format) + if not t.is_containerized: + logger.warning('Detected %s running inside lost instance, ' + 'may still be waiting for reaper.', t.log_format) if t.instance_group: impacted_groups = [t.instance_group.name] else: diff --git a/awx/main/migrations/0085_v360_container_groups.py b/awx/main/migrations/0085_v360_container_groups.py deleted file mode 100644 index 67c55d9eea..0000000000 --- a/awx/main/migrations/0085_v360_container_groups.py +++ /dev/null @@ -1,24 +0,0 @@ -# Generated by Django 2.2.4 on 2019-08-19 15:47 - -from django.db import migrations, models -import django.db.models.deletion - - -class Migration(migrations.Migration): - - dependencies = [ - ('main', '0084_v360_token_description'), - ] - - operations = [ - migrations.AddField( - model_name='instancegroup', - name='credential', - field=models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='instancegroups', to='main.Credential'), - ), - migrations.AddField( - model_name='instancegroup', - name='pod_spec_override', - field=models.TextField(blank=True, default=''), - ), - ] diff --git a/awx/main/migrations/0096_v360_container_groups.py b/awx/main/migrations/0096_v360_container_groups.py new file mode 100644 index 0000000000..d5b5007cb1 --- /dev/null +++ b/awx/main/migrations/0096_v360_container_groups.py @@ -0,0 +1,38 @@ +# Generated by Django 2.2.4 on 2019-09-16 23:50 + +from django.db import migrations, models +import django.db.models.deletion + +from awx.main.models import CredentialType +from awx.main.utils.common import set_current_apps + + +def create_new_credential_types(apps, schema_editor): + set_current_apps(apps) + CredentialType.setup_tower_managed_defaults() + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0095_v360_increase_instance_version_length'), + ] + + operations = [ + migrations.AddField( + model_name='instancegroup', + name='credential', + field=models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='instancegroups', to='main.Credential'), + ), + migrations.AddField( + model_name='instancegroup', + name='pod_spec_override', + field=models.TextField(blank=True, default=''), + ), + migrations.AlterField( + model_name='credentialtype', + name='kind', + field=models.CharField(choices=[('ssh', 'Machine'), ('vault', 'Vault'), ('net', 'Network'), ('scm', 'Source Control'), ('cloud', 'Cloud'), ('token', 'Personal Access Token'), ('insights', 'Insights'), ('external', 'External'), ('kubernetes', 'Kubernetes')], max_length=32), + ), + migrations.RunPython(create_new_credential_types) + ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 287955f42e..9787f01423 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -150,6 +150,14 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): def supports_isolation(cls): return True + @property + def is_containerized(self): + return bool(self.instance_group and self.instance_group.is_containerized) + + @property + def can_run_containerized(self): + return True + def get_absolute_url(self, request=None): return reverse('api:ad_hoc_command_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/models/credential/__init__.py b/awx/main/models/credential/__init__.py index a839b87efa..7c84226d7d 100644 --- a/awx/main/models/credential/__init__.py +++ b/awx/main/models/credential/__init__.py @@ -329,6 +329,7 @@ class CredentialType(CommonModelNameNotUnique): ('token', _('Personal Access Token')), ('insights', _('Insights')), ('external', _('External')), + ('kubernetes', _('Kubernetes')), ) kind = models.CharField( @@ -1121,35 +1122,6 @@ ManagedCredentialType( }, ) -ManagedCredentialType( - namespace='openshift_username_password', - kind='kubernetes', - name=ugettext_noop('OpenShift Username / Password'), - managed_by_tower=True, - inputs={ - 'fields': [{ - 'id': 'host', - 'label': ugettext_noop('OpenShift API URL'), - 'type': 'string', - 'help_text': ugettext_noop('The OpenShift API URL to authenticate with.') - }, { - 'id': 'username', - 'label': ugettext_noop('Username'), - 'type': 'string' - }, { - 'id': 'password', - 'label': ugettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, { - 'id': 'verify_ssl', - 'label': ugettext_noop('Verify SSL'), - 'type': 'boolean', - 'secret': False - }], - 'required': ['host', 'username', 'password'], - } -) ManagedCredentialType( namespace='kubernetes_bearer_token', @@ -1158,19 +1130,25 @@ ManagedCredentialType( inputs={ 'fields': [{ 'id': 'host', - 'label': ugettext_noop('Kubernetes API Endpoint'), + 'label': ugettext_noop('OpenShift or Kubernetes API Endpoint'), 'type': 'string', - 'help_text': ugettext_noop('The Kubernetes API Endpoint to authenticate with.') + 'help_text': ugettext_noop('The OpenShift or Kubernetes API Endpoint to authenticate with.') },{ 'id': 'bearer_token', - 'label': ugettext_noop('Bearer token for service account'), + 'label': ugettext_noop('API authentication bearer token.'), 'type': 'string', 'secret': True, },{ 'id': 'verify_ssl', 'label': ugettext_noop('Verify SSL'), 'type': 'boolean', - 'secret': False + 'default': True, + },{ + 'id': 'ssl_ca_cert', + 'label': ugettext_noop('Certificate Authority data'), + 'type': 'string', + 'secret': True, + 'multiline': True, }], 'required': ['host', 'bearer_token'], } diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index b6cc2f816f..26b7124bf1 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -18,7 +18,7 @@ from awx import __version__ as awx_application_version from awx.api.versioning import reverse from awx.main.managers import InstanceManager, InstanceGroupManager from awx.main.fields import JSONField -from awx.main.models.base import BaseModel, HasEditsMixin +from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity from awx.main.models.mixins import RelatedJobsMixin @@ -184,6 +184,10 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): default=None, on_delete=models.SET_NULL, ) + pod_spec_override = prevent_search(models.TextField( + blank=True, + default='', + )) policy_instance_percentage = models.IntegerField( default=0, help_text=_("Percentage of Instances to automatically assign to this group") @@ -226,6 +230,10 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): def is_isolated(self): return bool(self.controller) + @property + def is_containerized(self): + return bool(self.credential and self.credential.kubernetes) + ''' RelatedJobsMixin ''' @@ -279,7 +287,8 @@ def schedule_policy_task(): @receiver(post_save, sender=InstanceGroup) def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs): if created or instance.has_policy_changes(): - schedule_policy_task() + if not instance.is_containerized: + schedule_policy_task() @receiver(post_save, sender=Instance) @@ -290,7 +299,8 @@ def on_instance_saved(sender, instance, created=False, raw=False, **kwargs): @receiver(post_delete, sender=InstanceGroup) def on_instance_group_deleted(sender, instance, using, **kwargs): - schedule_policy_task() + if not instance.is_containerized: + schedule_policy_task() @receiver(post_delete, sender=Instance) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 40f60c7705..65be4db925 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -718,6 +718,14 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana return "$hidden due to Ansible no_log flag$" return artifacts + @property + def can_run_containerized(self): + return any([ig for ig in self.preferred_instance_groups if ig.is_containerized]) + + @property + def is_containerized(self): + return bool(self.instance_group and self.instance_group.is_containerized) + @property def preferred_instance_groups(self): if self.project is not None and self.project.organization is not None: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 36ec2a9989..3613ac4d34 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -714,6 +714,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def supports_isolation(cls): return False + @property + def can_run_containerized(self): + return False + def _get_parent_field_name(self): return 'unified_job_template' # Override in subclasses. @@ -1425,3 +1429,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def is_isolated(self): return bool(self.controller_node) + + @property + def is_containerized(self): + return False diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 87e635fc01..02b967368d 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -1,4 +1,6 @@ # Copyright (c) 2017 Ansible, Inc. # -from awx.main.scheduler.task_manager import TaskManager # noqa +from .task_manager import TaskManager + +__all__ = ['TaskManager'] diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py new file mode 100644 index 0000000000..1aa1978276 --- /dev/null +++ b/awx/main/scheduler/kubernetes.py @@ -0,0 +1,153 @@ +import os +import stat +import time +import yaml +import tempfile +from base64 import b64encode + +from django.conf import settings +from kubernetes import client, config +from django.utils.functional import cached_property + +from awx.main.utils.common import parse_yaml_or_json + + +class PodManager(object): + + def __init__(self, task=None): + self.task = task + + def deploy(self): + if not self.credential.kubernetes: + raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential') + + + self.kube_api.create_namespaced_pod(body=self.pod_definition, + namespace=self.namespace, + _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + + # We don't do any fancy timeout logic here because it is handled + # at a higher level in the job spawning process. See + # settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT + while True: + pod = self.kube_api.read_namespaced_pod(name=self.pod_name, + namespace=self.namespace, + _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + if pod.status.phase != 'Pending': + break + time.sleep(1) + + if pod.status.phase == 'Running': + return pod + else: + raise RuntimeError(f"Unhandled Pod phase: {pod.status.phase}") + + + def delete(self): + return self.kube_api.delete_namespaced_pod(name=self.pod_name, + namespace=self.namespace, + _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + + @property + def namespace(self): + return self.pod_definition['metadata']['namespace'] + + @property + def credential(self): + return self.task.instance_group.credential + + @cached_property + def kube_config(self): + return generate_tmp_kube_config(self.credential, self.namespace) + + @cached_property + def kube_api(self): + my_client = config.new_client_from_config(config_file=self.kube_config) + return client.CoreV1Api(api_client=my_client) + + @property + def pod_name(self): + return f"job-{self.task.id}" + + @property + def pod_definition(self): + default_pod_spec = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE + }, + "spec": { + "containers": [{ + "image": settings.AWX_CONTAINER_GROUP_DEFAULT_IMAGE, + "tty": True, + "stdin": True, + "imagePullPolicy": "Always", + "args": [ + 'sleep', 'infinity' + ] + }] + } + } + + pod_spec_override = {} + if self.task and self.task.instance_group.pod_spec_override: + pod_spec_override = parse_yaml_or_json( + self.task.instance_group.pod_spec_override) + pod_spec = {**default_pod_spec, **pod_spec_override} + + if self.task: + pod_spec['metadata']['name'] = self.pod_name + pod_spec['spec']['containers'][0]['name'] = self.pod_name + + return pod_spec + + +def generate_tmp_kube_config(credential, namespace): + host_input = credential.get_input('host') + config = { + "apiVersion": "v1", + "kind": "Config", + "preferences": {}, + "clusters": [ + { + "name": host_input, + "cluster": { + "server": host_input + } + } + ], + "users": [ + { + "name": host_input, + "user": { + "token": credential.get_input('bearer_token') + } + } + ], + "contexts": [ + { + "name": host_input, + "context": { + "cluster": host_input, + "user": host_input, + "namespace": namespace + } + } + ], + "current-context": host_input + } + + if credential.get_input('verify_ssl'): + config["clusters"][0]["cluster"]["certificate-authority-data"] = b64encode( + credential.get_input('ssl_ca_cert').encode() # encode to bytes + ).decode() # decode the base64 data into a str + else: + config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] = True + + fd, path = tempfile.mkstemp(prefix='kubeconfig') + with open(path, 'wb') as temp: + temp.write(yaml.dump(config).encode()) + temp.flush() + os.chmod(temp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + return path diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 73218b6f81..df23b7136e 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -251,6 +251,8 @@ class TaskManager(): task.controller_node = controller_node logger.debug('Submitting isolated {} to queue {} controlled by {}.'.format( task.log_format, task.execution_node, controller_node)) + elif rampart_group.is_containerized: + task.instance_group = rampart_group else: task.instance_group = rampart_group if instance is not None: @@ -447,7 +449,7 @@ class TaskManager(): for rampart_group in preferred_instance_groups: if idle_instance_that_fits is None: idle_instance_that_fits = rampart_group.find_largest_idle_instance() - if self.get_remaining_capacity(rampart_group.name) <= 0: + if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) continue @@ -456,10 +458,11 @@ class TaskManager(): logger.debug("Starting dependent {} in group {} instance {}".format( task.log_format, rampart_group.name, execution_instance.hostname)) elif not execution_instance and idle_instance_that_fits: - execution_instance = idle_instance_that_fits - logger.debug("Starting dependent {} in group {} on idle instance {}".format( - task.log_format, rampart_group.name, execution_instance.hostname)) - if execution_instance: + if not rampart_group.is_containerized: + execution_instance = idle_instance_that_fits + logger.debug("Starting dependent {} in group {} on idle instance {}".format( + task.log_format, rampart_group.name, execution_instance.hostname)) + if execution_instance or rampart_group.is_containerized: self.graph[rampart_group.name]['graph'].add_job(task) tasks_to_fail = [t for t in dependency_tasks if t != task] tasks_to_fail += [dependent_task] @@ -492,10 +495,16 @@ class TaskManager(): self.start_task(task, None, task.get_jobs_fail_chain(), None) continue for rampart_group in preferred_instance_groups: + if task.can_run_containerized and rampart_group.is_containerized: + self.graph[rampart_group.name]['graph'].add_job(task) + self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None) + found_acceptable_queue = True + break + if idle_instance_that_fits is None: idle_instance_that_fits = rampart_group.find_largest_idle_instance() remaining_capacity = self.get_remaining_capacity(rampart_group.name) - if remaining_capacity <= 0: + if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {}, remaining_capacity {} <= 0".format( rampart_group.name, remaining_capacity)) continue @@ -505,10 +514,11 @@ class TaskManager(): logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) elif not execution_instance and idle_instance_that_fits: - execution_instance = idle_instance_that_fits - logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( - task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - if execution_instance: + if not rampart_group.is_containerized: + execution_instance = idle_instance_that_fits + logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( + task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) + if execution_instance or rampart_group.is_containerized: self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 03bdd924d6..03f0c8afe7 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -251,6 +251,9 @@ def apply_cluster_membership_policies(): # On a differential basis, apply instances to non-isolated groups with transaction.atomic(): for g in actual_groups: + if g.obj.is_containerized: + logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name)) + continue instances_to_add = set(g.instances) - set(g.prior_instances) instances_to_remove = set(g.prior_instances) - set(g.instances) if instances_to_add: @@ -878,7 +881,7 @@ class BaseTask(object): settings.AWX_PROOT_SHOW_PATHS pi_path = settings.AWX_PROOT_BASE_PATH - if not self.instance.is_isolated(): + if not self.instance.is_isolated() and not self.instance.is_containerized: pi_path = tempfile.mkdtemp( prefix='ansible_runner_pi_', dir=settings.AWX_PROOT_BASE_PATH @@ -1168,6 +1171,7 @@ class BaseTask(object): try: isolated = self.instance.is_isolated() + containerized = self.instance.is_containerized self.instance.send_notification_templates("running") private_data_dir = self.build_private_data_dir(self.instance) self.pre_run_hook(self.instance, private_data_dir) @@ -1261,7 +1265,7 @@ class BaseTask(object): if not params[v]: del params[v] - if self.instance.is_isolated() is True: + if self.instance.is_isolated() or containerized: module_args = None if 'module_args' in params: # if it's adhoc, copy the module args @@ -1272,10 +1276,22 @@ class BaseTask(object): params.pop('inventory'), os.path.join(private_data_dir, 'inventory') ) + pod_manager = None + if containerized: + from awx.main.scheduler.kubernetes import PodManager # Avoid circular import + params['envvars'].pop('HOME') + pod_manager = PodManager(self.instance) + self.cleanup_paths.append(pod_manager.kube_config) + pod_manager.deploy() + self.instance.execution_node = pod_manager.pod_name + self.instance.save(update_fields=['execution_node']) + + ansible_runner.utils.dump_artifacts(params) isolated_manager_instance = isolated_manager.IsolatedManager( cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag, check_callback=self.check_handler, + pod_manager=pod_manager ) status, rc = isolated_manager_instance.run(self.instance, private_data_dir, @@ -1600,6 +1616,8 @@ class RunJob(BaseTask): ''' Return whether this task should use proot. ''' + if job.is_containerized: + return False return getattr(settings, 'AWX_PROOT_ENABLED', False) def pre_run_hook(self, job, private_data_dir): @@ -1660,6 +1678,7 @@ class RunJob(BaseTask): if job.is_isolated() is True: pu_ig = pu_ig.controller pu_en = settings.CLUSTER_HOST_ID + sync_metafields = dict( launch_type="sync", job_type='run', @@ -1720,8 +1739,13 @@ class RunJob(BaseTask): os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'), fact_modification_times, ) - if isolated_manager_instance: + if isolated_manager_instance and not job.is_containerized: isolated_manager_instance.cleanup() + + if job.is_containerized: + from awx.main.scheduler.kubernetes import PodManager # prevent circular import + PodManager(job).delete() + try: inventory = job.inventory except Inventory.DoesNotExist: @@ -2537,6 +2561,8 @@ class RunAdHocCommand(BaseTask): ''' Return whether this task should use proot. ''' + if ad_hoc_command.is_containerized: + return False return getattr(settings, 'AWX_PROOT_ENABLED', False) def final_run_hook(self, adhoc_job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None): diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 314ef94713..d8ae10f902 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -203,6 +203,13 @@ def organization(instance): return Organization.objects.create(name="test-org", description="test-org-desc") +@pytest.fixture +def credentialtype_kube(): + kube = CredentialType.defaults['kubernetes_bearer_token']() + kube.save() + return kube + + @pytest.fixture def credentialtype_ssh(): ssh = CredentialType.defaults['ssh']() @@ -336,6 +343,12 @@ def other_external_credential(credentialtype_external): inputs={'url': 'http://testhost.com', 'token': 'secret2'}) +@pytest.fixture +def kube_credential(credentialtype_kube): + return Credential.objects.create(credential_type=credentialtype_kube, name='kube-cred', + inputs={'host': 'my.cluster', 'bearer_token': 'my-token', 'verify_ssl': False}) + + @pytest.fixture def inventory(organization): return organization.inventories.create(name="test-inv") diff --git a/awx/main/tests/functional/task_management/test_container_groups.py b/awx/main/tests/functional/task_management/test_container_groups.py new file mode 100644 index 0000000000..c1a1695bc9 --- /dev/null +++ b/awx/main/tests/functional/task_management/test_container_groups.py @@ -0,0 +1,56 @@ +import subprocess +import yaml +import base64 + +from unittest import mock # noqa +import pytest + +from awx.main.scheduler.kubernetes import PodManager +from awx.main.utils import ( + create_temporary_fifo, +) + + +@pytest.fixture +def containerized_job(default_instance_group, kube_credential, job_template_factory): + default_instance_group.credential = kube_credential + default_instance_group.save() + objects = job_template_factory('jt', organization='org1', project='proj', + inventory='inv', credential='cred', + jobs=['my_job']) + jt = objects.job_template + jt.instance_groups.add(default_instance_group) + + j1 = objects.jobs['my_job'] + j1.instance_group = default_instance_group + j1.status = 'pending' + j1.save() + return j1 + + +@pytest.mark.django_db +def test_containerized_job(containerized_job): + assert containerized_job.is_containerized + assert containerized_job.instance_group.is_containerized + assert containerized_job.instance_group.credential.kubernetes + + +@pytest.mark.django_db +def test_kubectl_ssl_verification(containerized_job): + cred = containerized_job.instance_group.credential + cred.inputs['verify_ssl'] = True + key_material = subprocess.run('openssl genrsa 2> /dev/null', + shell=True, check=True, + stdout=subprocess.PIPE) + key = create_temporary_fifo(key_material.stdout) + cmd = f""" + openssl req -x509 -sha256 -new -nodes \ + -key {key} -subj '/C=US/ST=North Carolina/L=Durham/O=Ansible/OU=AWX Development/CN=awx.localhost' + """ + cert = subprocess.run(cmd.strip(), shell=True, check=True, stdout=subprocess.PIPE) + cred.inputs['ssl_ca_cert'] = cert.stdout + cred.save() + pm = PodManager(containerized_job) + config = yaml.load(open(pm.kube_config), Loader=yaml.FullLoader) + ca_data = config['clusters'][0]['cluster']['certificate-authority-data'] + assert cert.stdout == base64.b64decode(ca_data.encode()) diff --git a/awx/main/tests/functional/test_credential.py b/awx/main/tests/functional/test_credential.py index c55d97d44e..721bf5c043 100644 --- a/awx/main/tests/functional/test_credential.py +++ b/awx/main/tests/functional/test_credential.py @@ -87,6 +87,7 @@ def test_default_cred_types(): 'hashivault_kv', 'hashivault_ssh', 'insights', + 'kubernetes_bearer_token', 'net', 'openstack', 'rhv', diff --git a/awx/main/tests/unit/scheduler/test_kubernetes.py b/awx/main/tests/unit/scheduler/test_kubernetes.py new file mode 100644 index 0000000000..4121d0133b --- /dev/null +++ b/awx/main/tests/unit/scheduler/test_kubernetes.py @@ -0,0 +1,55 @@ +import pytest +from unittest import mock +from django.conf import settings + +from awx.main.models import ( + InstanceGroup, + Job, + JobTemplate, + Project, + Inventory, +) +from awx.main.scheduler.kubernetes import PodManager + + +@pytest.fixture +def container_group(): + instance_group = mock.Mock(InstanceGroup(name='container-group')) + + return instance_group + + +@pytest.fixture +def job(container_group): + return Job(pk=1, + id=1, + project=Project(), + instance_group=container_group, + inventory=Inventory(), + job_template=JobTemplate(id=1, name='foo')) + + +def test_default_pod_spec(job): + default_image = PodManager(job).pod_definition['spec']['containers'][0]['image'] + assert default_image == settings.AWX_CONTAINER_GROUP_DEFAULT_IMAGE + + +def test_custom_pod_spec(job): + job.instance_group.pod_spec_override = """ + spec: + containers: + - image: my-custom-image + """ + custom_image = PodManager(job).pod_definition['spec']['containers'][0]['image'] + assert custom_image == 'my-custom-image' + + +def test_pod_manager_namespace_property(job): + pm = PodManager(job) + assert pm.namespace == settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE + + job.instance_group.pod_spec_override = """ + metadata: + namespace: my-namespace + """ + assert PodManager(job).namespace == 'my-namespace' diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py index 7817521f2e..16fe81053c 100644 --- a/awx/main/tests/unit/test_capacity.py +++ b/awx/main/tests/unit/test_capacity.py @@ -11,6 +11,7 @@ class FakeObject(object): class Job(FakeObject): task_impact = 43 + is_containerized = False def log_format(self): return 'job 382 (fake)' diff --git a/awx/playbooks/check_isolated.yml b/awx/playbooks/check_isolated.yml index e3bc9e2115..1fae25703e 100644 --- a/awx/playbooks/check_isolated.yml +++ b/awx/playbooks/check_isolated.yml @@ -20,12 +20,42 @@ mode: pull delete: yes recursive: yes + when: ansible_kubectl_config is not defined - name: Copy daemon log from the isolated host synchronize: src: "{{src}}/daemon.log" dest: "{{src}}/daemon.log" mode: pull + when: ansible_kubectl_config is not defined + + - name: Copy artifacts from pod + synchronize: + src: "{{src}}/artifacts/" + dest: "{{src}}/artifacts/" + mode: pull + delete: yes + recursive: yes + set_remote_user: no + rsync_opts: + - "--rsh=$RSH" + environment: + RSH: "oc rsh --config={{ ansible_kubectl_config }}" + delegate_to: localhost + when: ansible_kubectl_config is defined + + - name: Copy daemon log from pod + synchronize: + src: "{{src}}/daemon.log" + dest: "{{src}}/daemon.log" + mode: pull + set_remote_user: no + rsync_opts: + - "--rsh=$RSH" + environment: + RSH: "oc rsh --config={{ ansible_kubectl_config }}" + delegate_to: localhost + when: ansible_kubectl_config is defined - name: Fail if previous check determined that process is not alive. fail: diff --git a/awx/playbooks/run_isolated.yml b/awx/playbooks/run_isolated.yml index ae3c4b8875..467c183fab 100644 --- a/awx/playbooks/run_isolated.yml +++ b/awx/playbooks/run_isolated.yml @@ -11,12 +11,25 @@ secret: "{{ lookup('pipe', 'cat ' + src + '/env/ssh_key') }}" tasks: - - name: synchronize job environment with isolated host synchronize: - copy_links: true - src: "{{src}}" - dest: "{{dest}}" + copy_links: yes + src: "{{ src }}" + dest: "{{ dest }}" + when: ansible_kubectl_config is not defined + + - name: synchronize job environment with remote job container + synchronize: + copy_links: yes + src: "{{ src }}" + dest: "{{ dest }}" + set_remote_user: no + rsync_opts: + - "--rsh=$RSH" + environment: + RSH: "oc rsh --config={{ ansible_kubectl_config }}" + delegate_to: localhost + when: ansible_kubectl_config is defined - local_action: stat path="{{src}}/env/ssh_key" register: key diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index a2b1e9926b..ceb482a314 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -67,6 +67,10 @@ DATABASES = { } } +AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT = 10 +AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default' +AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner' + # Internationalization # https://docs.djangoproject.com/en/dev/topics/i18n/ # diff --git a/awx/settings/development.py b/awx/settings/development.py index 7219113110..86b13eb0cb 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -83,8 +83,7 @@ AWX_PROOT_ENABLED = True AWX_ISOLATED_USERNAME = 'root' AWX_ISOLATED_CHECK_INTERVAL = 1 -AWX_ISOLATED_PERIODIC_CHECK = 5 -AWX_ISOLATED_LAUNCH_TIMEOUT = 30 +AWX_ISOLATED_PERIODIC_CHECK = 30 # Disable Pendo on the UI for development/test. # Note: This setting may be overridden by database settings. diff --git a/docs/container_groups/README.md b/docs/container_groups/README.md new file mode 100644 index 0000000000..a13644abb2 --- /dev/null +++ b/docs/container_groups/README.md @@ -0,0 +1,72 @@ +# Container Groups + +In a traditional AWX installation, jobs (ansible-playbook runs) are executed +either directly on a member of the cluster or on a pre-provisioned "isolated" +node. + +The concept of a Container Group (working name) allows for job environments to +be provisioned on-demand as a Pod that exists only for the duration of the +playbook run. This is known as the ephemeral execution model and ensures a clean +environment for every job run. + +## Configuration + +A `ContainerGroup` is simply an `InstanceGroup` that has an associated Credential +that allows for connecting to an OpenShift or Kubernetes cluster. + +To create a new type, add a new `ManagedCredentialType` to +`awx/main/models/credential/__init__.py` where `kind='kubernetes'`. + +### Create Credential + +A `Credential` must be created where the associated `CredentialType` is one of: + +- `kubernetes_bearer_token` + +Other credential types (such as username/password) may be added in the future. + +### Create a Container Groupp + +Once this `Credential` has been associated with an `InstanceGroup`, the +`InstanceGroup.kubernetes` property will return `True`. + +#### Pod Customization + +There will be a very simple default pod spec that lives in code. + +A custom YAML document may be provided. This will allow the UI to implement +whatever fields necessary, because any custom fields (think 'image' or +'namespace') can be "serialized" as valid `Pod` JSON or YAML. A full list of +options can be found in the Kubernetes documentation +[here](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.15/#pod-v1-core). + +```bash +cat > api_request.json <=1.8 -setuptools_scm>=1.15.0,<3.0 +setuptools_scm>=3.1.0 vcversioner>=2.16.0.0 pytest-runner isort diff --git a/tools/docker-compose/Dockerfile b/tools/docker-compose/Dockerfile index af59fa4e45..c5c017fbe7 100644 --- a/tools/docker-compose/Dockerfile +++ b/tools/docker-compose/Dockerfile @@ -124,6 +124,14 @@ RUN cd sqlite-autoconf-3290000 && ./configure && make && make install RUN mv -f /usr/local/lib/libsqlite3.so.0 /lib64/ RUN mv -f /usr/local/lib/libsqlite3.so.0.8.6 /lib64/ +# Install OpenShift CLI +RUN cd /usr/local/bin && \ + curl -L https://github.com/openshift/origin/releases/download/v3.9.0/openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz | \ + tar -xz --strip-components=1 --wildcards --no-anchored 'oc' + +ADD tools/docker-compose/google-cloud-sdk.repo /etc/yum.repos.d/ +RUN yum install -y kubectl + RUN yum -y remove cyrus-sasl-devel \ gcc \ gcc-c++ \ diff --git a/tools/docker-compose/google-cloud-sdk.repo b/tools/docker-compose/google-cloud-sdk.repo new file mode 100644 index 0000000000..098f56bc25 --- /dev/null +++ b/tools/docker-compose/google-cloud-sdk.repo @@ -0,0 +1,8 @@ +[google-cloud-sdk] +name=Google Cloud SDK +baseurl=https://packages.cloud.google.com/yum/repos/cloud-sdk-el7-x86_64 +enabled=1 +gpgcheck=1 +repo_gpgcheck=1 +gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg + https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg \ No newline at end of file