diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py index 1aa1978276..90f2849c3d 100644 --- a/awx/main/scheduler/kubernetes.py +++ b/awx/main/scheduler/kubernetes.py @@ -3,6 +3,7 @@ import stat import time import yaml import tempfile +import logging from base64 import b64encode from django.conf import settings @@ -11,6 +12,8 @@ from django.utils.functional import cached_property from awx.main.utils.common import parse_yaml_or_json +logger = logging.getLogger('awx.main.scheduler') + class PodManager(object): @@ -21,32 +24,33 @@ class PodManager(object): if not self.credential.kubernetes: raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential') - self.kube_api.create_namespaced_pod(body=self.pod_definition, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) - # We don't do any fancy timeout logic here because it is handled - # at a higher level in the job spawning process. See - # settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT - while True: + num_retries = settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES + for retry_attempt in range(num_retries - 1): + logger.debug(f"Checking for pod {self.pod_name}. Attempt {retry_attempt + 1} of {num_retries}") pod = self.kube_api.read_namespaced_pod(name=self.pod_name, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) if pod.status.phase != 'Pending': break - time.sleep(1) + else: + logger.debug(f"Pod {self.pod_name} is Pending.") + time.sleep(settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY) + continue if pod.status.phase == 'Running': + logger.debug(f"Pod {self.pod_name} is online.") return pod else: - raise RuntimeError(f"Unhandled Pod phase: {pod.status.phase}") - + logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.") def delete(self): return self.kube_api.delete_namespaced_pod(name=self.pod_name, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) @property def namespace(self): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index eb2d48546d..66394a72d5 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -40,6 +40,9 @@ from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist +# Kubernetes +from kubernetes.client.rest import ApiException + # Django-CRUM from crum import impersonate @@ -1183,6 +1186,18 @@ class BaseTask(object): ''' Run the job/task and capture its output. ''' + self.instance = self.model.objects.get(pk=pk) + containerized = self.instance.is_containerized + pod_manager = None + if containerized: + # Here we are trying to launch a pod before transitioning the job into a running + # state. For some scenarios (like waiting for resources to become available) we do this + # rather than marking the job as error or failed. This is not always desirable. Cases + # such as invalid authentication should surface as an error. + pod_manager = self.deploy_container_group_pod(self.instance) + if not pod_manager: + return + # self.instance because of the update_model pattern and when it's used in callback handlers self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords @@ -1208,7 +1223,6 @@ class BaseTask(object): try: isolated = self.instance.is_isolated() - containerized = self.instance.is_containerized self.instance.send_notification_templates("running") private_data_dir = self.build_private_data_dir(self.instance) self.pre_run_hook(self.instance, private_data_dir) @@ -1287,6 +1301,10 @@ class BaseTask(object): }, } + if containerized: + # We don't want HOME passed through to container groups. + params['envvars'].pop('HOME') + if isinstance(self.instance, AdHocCommand): params['module'] = self.build_module_name(self.instance) params['module_args'] = self.build_module_args(self.instance) @@ -1316,16 +1334,6 @@ class BaseTask(object): params.pop('inventory'), os.path.join(private_data_dir, 'inventory') ) - pod_manager = None - if containerized: - from awx.main.scheduler.kubernetes import PodManager # Avoid circular import - params['envvars'].pop('HOME') - pod_manager = PodManager(self.instance) - self.cleanup_paths.append(pod_manager.kube_config) - pod_manager.deploy() - self.instance.execution_node = pod_manager.pod_name - self.instance.save(update_fields=['execution_node']) - ansible_runner.utils.dump_artifacts(params) isolated_manager_instance = isolated_manager.IsolatedManager( @@ -1385,6 +1393,42 @@ class BaseTask(object): raise AwxTaskError.TaskError(self.instance, rc) + def deploy_container_group_pod(self, task): + from awx.main.scheduler.kubernetes import PodManager # Avoid circular import + pod_manager = PodManager(self.instance) + self.cleanup_paths.append(pod_manager.kube_config) + try: + log_name = task.log_format + logger.debug(f"Launching pod for {log_name}.") + pod_manager.deploy() + except (ApiException, Exception) as exc: + if isinstance(exc, ApiException) and exc.status == 403: + try: + if 'exceeded quota' in json.loads(exc.body)['message']: + # If the k8s cluster does not have capacity, we move the + # job back into pending and wait until the next run of + # the task manager. This does not exactly play well with + # our current instance group precendence logic, since it + # will just sit here forever if kubernetes returns this + # error. + logger.warn(exc.body) + logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") + self.update_model(task.pk, status='pending') + return + except Exception: + logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") + + logger.exception(f"Error when launching pod for {log_name}") + self.update_model(task.pk, status='error', result_traceback=traceback.format_exc()) + return + + self.update_model(task.pk, execution_node=pod_manager.pod_name) + return pod_manager + + + + + @task() class RunJob(BaseTask): ''' @@ -1790,7 +1834,10 @@ class RunJob(BaseTask): if job.is_containerized: from awx.main.scheduler.kubernetes import PodManager # prevent circular import - PodManager(job).delete() + pm = PodManager(job) + logger.debug(f"Deleting pod {pm.pod_name}") + pm.delete() + try: inventory = job.inventory diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 77c13fffd2..2bae5aef9a 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -368,6 +368,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(return_value=job) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock(side_effect=OSError()) with mock.patch('awx.main.tasks.copy_tree'): @@ -387,6 +388,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(wraps=update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock() with mock.patch('awx.main.tasks.copy_tree'): @@ -578,6 +580,7 @@ class TestAdhocRun(TestJobExecution): task = tasks.RunAdHocCommand() task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=adhoc_job) task.build_inventory = mock.Mock() with pytest.raises(Exception): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 639d19f9e8..07c76f8b01 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -67,7 +67,9 @@ DATABASES = { } } -AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT = 10 +AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10 +AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES = 100 +AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY = 5 AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default' AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner'