From b93164e1ed3fe664941776147ff5166b53c23b7b Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Sat, 5 Oct 2019 14:07:33 -0400 Subject: [PATCH 1/3] Prevent pods from failing if the reason is because of a resource quota Signed-off-by: Shane McDonald --- awx/main/scheduler/kubernetes.py | 3 -- awx/main/tasks.py | 65 +++++++++++++++++++++++++------ awx/main/tests/unit/test_tasks.py | 3 ++ 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py index 1aa1978276..f4e35caec9 100644 --- a/awx/main/scheduler/kubernetes.py +++ b/awx/main/scheduler/kubernetes.py @@ -26,9 +26,6 @@ class PodManager(object): namespace=self.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) - # We don't do any fancy timeout logic here because it is handled - # at a higher level in the job spawning process. See - # settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT while True: pod = self.kube_api.read_namespaced_pod(name=self.pod_name, namespace=self.namespace, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index eb2d48546d..6253446cd5 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -40,6 +40,9 @@ from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist +# Kubernetes +from kubernetes.client.rest import ApiException + # Django-CRUM from crum import impersonate @@ -1183,6 +1186,18 @@ class BaseTask(object): ''' Run the job/task and capture its output. ''' + self.instance = self.model.objects.get(pk=pk) + containerized = self.instance.is_containerized + pod_manager = None + if containerized: + # Here we are trying to launch a pod before transitioning the job into a running + # state. For some scenarios (like waiting for resources to become available) we do this + # rather than marking the job as error or failed. This is not always desirable. Cases + # such as invalid authentication should surface as an error. + pod_manager = self.deploy_container_group_pod(self.instance) + if not pod_manager: + return + # self.instance because of the update_model pattern and when it's used in callback handlers self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords @@ -1208,7 +1223,6 @@ class BaseTask(object): try: isolated = self.instance.is_isolated() - containerized = self.instance.is_containerized self.instance.send_notification_templates("running") private_data_dir = self.build_private_data_dir(self.instance) self.pre_run_hook(self.instance, private_data_dir) @@ -1287,6 +1301,10 @@ class BaseTask(object): }, } + if containerized: + # We don't want HOME passed through to container groups. + params['envvars'].pop('HOME') + if isinstance(self.instance, AdHocCommand): params['module'] = self.build_module_name(self.instance) params['module_args'] = self.build_module_args(self.instance) @@ -1316,16 +1334,6 @@ class BaseTask(object): params.pop('inventory'), os.path.join(private_data_dir, 'inventory') ) - pod_manager = None - if containerized: - from awx.main.scheduler.kubernetes import PodManager # Avoid circular import - params['envvars'].pop('HOME') - pod_manager = PodManager(self.instance) - self.cleanup_paths.append(pod_manager.kube_config) - pod_manager.deploy() - self.instance.execution_node = pod_manager.pod_name - self.instance.save(update_fields=['execution_node']) - ansible_runner.utils.dump_artifacts(params) isolated_manager_instance = isolated_manager.IsolatedManager( @@ -1385,6 +1393,41 @@ class BaseTask(object): raise AwxTaskError.TaskError(self.instance, rc) + def deploy_container_group_pod(self, task): + from awx.main.scheduler.kubernetes import PodManager # Avoid circular import + pod_manager = PodManager(self.instance) + self.cleanup_paths.append(pod_manager.kube_config) + try: + log_name = task.log_format + logger.debug(f"Launching pod for {log_name}.") + pod_manager.deploy() + except (ApiException, Exception) as exc: + if isinstance(exc, ApiException) and exc.status == 403: + try: + if 'exceeded quota' in json.loads(exc.body)['message']: + # If the k8s cluster does not have capacity, we move the job back into + # pending and immediately reschedule the task manager. + logger.warn(exc.body) + logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") + time.sleep(10) + self.update_model(task.pk, status='pending') + schedule_task_manager() + return + except Exception: + logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") + + logger.exception(f"Error when launching pod for {log_name}") + self.update_model(task.pk, status='error', result_traceback=exc.body) + return + + logger.debug(f"Pod online. Starting {log_name}.") + self.update_model(task.pk, execution_node=pod_manager.pod_name) + return pod_manager + + + + + @task() class RunJob(BaseTask): ''' diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 77c13fffd2..2bae5aef9a 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -368,6 +368,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(return_value=job) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock(side_effect=OSError()) with mock.patch('awx.main.tasks.copy_tree'): @@ -387,6 +388,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(wraps=update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock() with mock.patch('awx.main.tasks.copy_tree'): @@ -578,6 +580,7 @@ class TestAdhocRun(TestJobExecution): task = tasks.RunAdHocCommand() task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=adhoc_job) task.build_inventory = mock.Mock() with pytest.raises(Exception): From 8f75382b81fa19bbdf966428eb4f80bdb22dfde2 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Sat, 5 Oct 2019 23:09:53 -0400 Subject: [PATCH 2/3] Implement retry logic for container group pod launches --- awx/main/scheduler/kubernetes.py | 23 +++++++++++++++-------- awx/main/tasks.py | 16 ++++++++++------ awx/settings/defaults.py | 4 +++- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py index f4e35caec9..90f2849c3d 100644 --- a/awx/main/scheduler/kubernetes.py +++ b/awx/main/scheduler/kubernetes.py @@ -3,6 +3,7 @@ import stat import time import yaml import tempfile +import logging from base64 import b64encode from django.conf import settings @@ -11,6 +12,8 @@ from django.utils.functional import cached_property from awx.main.utils.common import parse_yaml_or_json +logger = logging.getLogger('awx.main.scheduler') + class PodManager(object): @@ -21,29 +24,33 @@ class PodManager(object): if not self.credential.kubernetes: raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential') - self.kube_api.create_namespaced_pod(body=self.pod_definition, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) - while True: + num_retries = settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES + for retry_attempt in range(num_retries - 1): + logger.debug(f"Checking for pod {self.pod_name}. Attempt {retry_attempt + 1} of {num_retries}") pod = self.kube_api.read_namespaced_pod(name=self.pod_name, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) if pod.status.phase != 'Pending': break - time.sleep(1) + else: + logger.debug(f"Pod {self.pod_name} is Pending.") + time.sleep(settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY) + continue if pod.status.phase == 'Running': + logger.debug(f"Pod {self.pod_name} is online.") return pod else: - raise RuntimeError(f"Unhandled Pod phase: {pod.status.phase}") - + logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.") def delete(self): return self.kube_api.delete_namespaced_pod(name=self.pod_name, namespace=self.namespace, - _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) + _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT) @property def namespace(self): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6253446cd5..d46965ff46 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1405,13 +1405,15 @@ class BaseTask(object): if isinstance(exc, ApiException) and exc.status == 403: try: if 'exceeded quota' in json.loads(exc.body)['message']: - # If the k8s cluster does not have capacity, we move the job back into - # pending and immediately reschedule the task manager. + # If the k8s cluster does not have capacity, we move the + # job back into pending and wait until the next run of + # the task manager. This does not exactly play well with + # our current instance group precendence logic, since it + # will just sit here forever if kubernetes returns this + # error. logger.warn(exc.body) logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") - time.sleep(10) self.update_model(task.pk, status='pending') - schedule_task_manager() return except Exception: logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") @@ -1420,7 +1422,6 @@ class BaseTask(object): self.update_model(task.pk, status='error', result_traceback=exc.body) return - logger.debug(f"Pod online. Starting {log_name}.") self.update_model(task.pk, execution_node=pod_manager.pod_name) return pod_manager @@ -1833,7 +1834,10 @@ class RunJob(BaseTask): if job.is_containerized: from awx.main.scheduler.kubernetes import PodManager # prevent circular import - PodManager(job).delete() + pm = PodManager(job) + logger.debug(f"Deleting pod {pm.pod_name}") + pm.delete() + try: inventory = job.inventory diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 639d19f9e8..07c76f8b01 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -67,7 +67,9 @@ DATABASES = { } } -AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT = 10 +AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10 +AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES = 100 +AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY = 5 AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default' AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner' From d5bdf554f1e15a9c6ad370137cbb43e478268565 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 9 Oct 2019 11:18:58 -0400 Subject: [PATCH 3/3] fix a programming error when k8s pods fail to launch --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d46965ff46..66394a72d5 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1419,7 +1419,7 @@ class BaseTask(object): logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") logger.exception(f"Error when launching pod for {log_name}") - self.update_model(task.pk, status='error', result_traceback=exc.body) + self.update_model(task.pk, status='error', result_traceback=traceback.format_exc()) return self.update_model(task.pk, execution_node=pod_manager.pod_name)