mirror of
https://github.com/ansible/awx.git
synced 2026-05-24 00:57:48 -02:30
Merge pull request #3803 from ansible/back-to-pending
Prevent pods from failing if the reason is because of a resource quota
This commit is contained in:
@@ -3,6 +3,7 @@ import stat
|
|||||||
import time
|
import time
|
||||||
import yaml
|
import yaml
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import logging
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -11,6 +12,8 @@ from django.utils.functional import cached_property
|
|||||||
|
|
||||||
from awx.main.utils.common import parse_yaml_or_json
|
from awx.main.utils.common import parse_yaml_or_json
|
||||||
|
|
||||||
|
logger = logging.getLogger('awx.main.scheduler')
|
||||||
|
|
||||||
|
|
||||||
class PodManager(object):
|
class PodManager(object):
|
||||||
|
|
||||||
@@ -21,32 +24,33 @@ class PodManager(object):
|
|||||||
if not self.credential.kubernetes:
|
if not self.credential.kubernetes:
|
||||||
raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential')
|
raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential')
|
||||||
|
|
||||||
|
|
||||||
self.kube_api.create_namespaced_pod(body=self.pod_definition,
|
self.kube_api.create_namespaced_pod(body=self.pod_definition,
|
||||||
namespace=self.namespace,
|
namespace=self.namespace,
|
||||||
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT)
|
_request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
|
||||||
|
|
||||||
# We don't do any fancy timeout logic here because it is handled
|
num_retries = settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES
|
||||||
# at a higher level in the job spawning process. See
|
for retry_attempt in range(num_retries - 1):
|
||||||
# settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT
|
logger.debug(f"Checking for pod {self.pod_name}. Attempt {retry_attempt + 1} of {num_retries}")
|
||||||
while True:
|
|
||||||
pod = self.kube_api.read_namespaced_pod(name=self.pod_name,
|
pod = self.kube_api.read_namespaced_pod(name=self.pod_name,
|
||||||
namespace=self.namespace,
|
namespace=self.namespace,
|
||||||
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT)
|
_request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
|
||||||
if pod.status.phase != 'Pending':
|
if pod.status.phase != 'Pending':
|
||||||
break
|
break
|
||||||
time.sleep(1)
|
else:
|
||||||
|
logger.debug(f"Pod {self.pod_name} is Pending.")
|
||||||
|
time.sleep(settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY)
|
||||||
|
continue
|
||||||
|
|
||||||
if pod.status.phase == 'Running':
|
if pod.status.phase == 'Running':
|
||||||
|
logger.debug(f"Pod {self.pod_name} is online.")
|
||||||
return pod
|
return pod
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f"Unhandled Pod phase: {pod.status.phase}")
|
logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.")
|
||||||
|
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
return self.kube_api.delete_namespaced_pod(name=self.pod_name,
|
return self.kube_api.delete_namespaced_pod(name=self.pod_name,
|
||||||
namespace=self.namespace,
|
namespace=self.namespace,
|
||||||
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT)
|
_request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def namespace(self):
|
def namespace(self):
|
||||||
|
|||||||
@@ -40,6 +40,9 @@ from django.utils.translation import ugettext_lazy as _
|
|||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
|
|
||||||
|
# Kubernetes
|
||||||
|
from kubernetes.client.rest import ApiException
|
||||||
|
|
||||||
# Django-CRUM
|
# Django-CRUM
|
||||||
from crum import impersonate
|
from crum import impersonate
|
||||||
|
|
||||||
@@ -1183,6 +1186,18 @@ class BaseTask(object):
|
|||||||
'''
|
'''
|
||||||
Run the job/task and capture its output.
|
Run the job/task and capture its output.
|
||||||
'''
|
'''
|
||||||
|
self.instance = self.model.objects.get(pk=pk)
|
||||||
|
containerized = self.instance.is_containerized
|
||||||
|
pod_manager = None
|
||||||
|
if containerized:
|
||||||
|
# Here we are trying to launch a pod before transitioning the job into a running
|
||||||
|
# state. For some scenarios (like waiting for resources to become available) we do this
|
||||||
|
# rather than marking the job as error or failed. This is not always desirable. Cases
|
||||||
|
# such as invalid authentication should surface as an error.
|
||||||
|
pod_manager = self.deploy_container_group_pod(self.instance)
|
||||||
|
if not pod_manager:
|
||||||
|
return
|
||||||
|
|
||||||
# self.instance because of the update_model pattern and when it's used in callback handlers
|
# self.instance because of the update_model pattern and when it's used in callback handlers
|
||||||
self.instance = self.update_model(pk, status='running',
|
self.instance = self.update_model(pk, status='running',
|
||||||
start_args='') # blank field to remove encrypted passwords
|
start_args='') # blank field to remove encrypted passwords
|
||||||
@@ -1208,7 +1223,6 @@ class BaseTask(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
isolated = self.instance.is_isolated()
|
isolated = self.instance.is_isolated()
|
||||||
containerized = self.instance.is_containerized
|
|
||||||
self.instance.send_notification_templates("running")
|
self.instance.send_notification_templates("running")
|
||||||
private_data_dir = self.build_private_data_dir(self.instance)
|
private_data_dir = self.build_private_data_dir(self.instance)
|
||||||
self.pre_run_hook(self.instance, private_data_dir)
|
self.pre_run_hook(self.instance, private_data_dir)
|
||||||
@@ -1287,6 +1301,10 @@ class BaseTask(object):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if containerized:
|
||||||
|
# We don't want HOME passed through to container groups.
|
||||||
|
params['envvars'].pop('HOME')
|
||||||
|
|
||||||
if isinstance(self.instance, AdHocCommand):
|
if isinstance(self.instance, AdHocCommand):
|
||||||
params['module'] = self.build_module_name(self.instance)
|
params['module'] = self.build_module_name(self.instance)
|
||||||
params['module_args'] = self.build_module_args(self.instance)
|
params['module_args'] = self.build_module_args(self.instance)
|
||||||
@@ -1316,16 +1334,6 @@ class BaseTask(object):
|
|||||||
params.pop('inventory'),
|
params.pop('inventory'),
|
||||||
os.path.join(private_data_dir, 'inventory')
|
os.path.join(private_data_dir, 'inventory')
|
||||||
)
|
)
|
||||||
pod_manager = None
|
|
||||||
if containerized:
|
|
||||||
from awx.main.scheduler.kubernetes import PodManager # Avoid circular import
|
|
||||||
params['envvars'].pop('HOME')
|
|
||||||
pod_manager = PodManager(self.instance)
|
|
||||||
self.cleanup_paths.append(pod_manager.kube_config)
|
|
||||||
pod_manager.deploy()
|
|
||||||
self.instance.execution_node = pod_manager.pod_name
|
|
||||||
self.instance.save(update_fields=['execution_node'])
|
|
||||||
|
|
||||||
|
|
||||||
ansible_runner.utils.dump_artifacts(params)
|
ansible_runner.utils.dump_artifacts(params)
|
||||||
isolated_manager_instance = isolated_manager.IsolatedManager(
|
isolated_manager_instance = isolated_manager.IsolatedManager(
|
||||||
@@ -1385,6 +1393,42 @@ class BaseTask(object):
|
|||||||
raise AwxTaskError.TaskError(self.instance, rc)
|
raise AwxTaskError.TaskError(self.instance, rc)
|
||||||
|
|
||||||
|
|
||||||
|
def deploy_container_group_pod(self, task):
|
||||||
|
from awx.main.scheduler.kubernetes import PodManager # Avoid circular import
|
||||||
|
pod_manager = PodManager(self.instance)
|
||||||
|
self.cleanup_paths.append(pod_manager.kube_config)
|
||||||
|
try:
|
||||||
|
log_name = task.log_format
|
||||||
|
logger.debug(f"Launching pod for {log_name}.")
|
||||||
|
pod_manager.deploy()
|
||||||
|
except (ApiException, Exception) as exc:
|
||||||
|
if isinstance(exc, ApiException) and exc.status == 403:
|
||||||
|
try:
|
||||||
|
if 'exceeded quota' in json.loads(exc.body)['message']:
|
||||||
|
# If the k8s cluster does not have capacity, we move the
|
||||||
|
# job back into pending and wait until the next run of
|
||||||
|
# the task manager. This does not exactly play well with
|
||||||
|
# our current instance group precendence logic, since it
|
||||||
|
# will just sit here forever if kubernetes returns this
|
||||||
|
# error.
|
||||||
|
logger.warn(exc.body)
|
||||||
|
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
||||||
|
self.update_model(task.pk, status='pending')
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.")
|
||||||
|
|
||||||
|
logger.exception(f"Error when launching pod for {log_name}")
|
||||||
|
self.update_model(task.pk, status='error', result_traceback=traceback.format_exc())
|
||||||
|
return
|
||||||
|
|
||||||
|
self.update_model(task.pk, execution_node=pod_manager.pod_name)
|
||||||
|
return pod_manager
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@task()
|
@task()
|
||||||
class RunJob(BaseTask):
|
class RunJob(BaseTask):
|
||||||
'''
|
'''
|
||||||
@@ -1790,7 +1834,10 @@ class RunJob(BaseTask):
|
|||||||
|
|
||||||
if job.is_containerized:
|
if job.is_containerized:
|
||||||
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
||||||
PodManager(job).delete()
|
pm = PodManager(job)
|
||||||
|
logger.debug(f"Deleting pod {pm.pod_name}")
|
||||||
|
pm.delete()
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
inventory = job.inventory
|
inventory = job.inventory
|
||||||
|
|||||||
@@ -368,6 +368,7 @@ class TestGenericRun():
|
|||||||
|
|
||||||
task = tasks.RunJob()
|
task = tasks.RunJob()
|
||||||
task.update_model = mock.Mock(return_value=job)
|
task.update_model = mock.Mock(return_value=job)
|
||||||
|
task.model.objects.get = mock.Mock(return_value=job)
|
||||||
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
task.build_private_data_files = mock.Mock(side_effect=OSError())
|
||||||
|
|
||||||
with mock.patch('awx.main.tasks.copy_tree'):
|
with mock.patch('awx.main.tasks.copy_tree'):
|
||||||
@@ -387,6 +388,7 @@ class TestGenericRun():
|
|||||||
|
|
||||||
task = tasks.RunJob()
|
task = tasks.RunJob()
|
||||||
task.update_model = mock.Mock(wraps=update_model_wrapper)
|
task.update_model = mock.Mock(wraps=update_model_wrapper)
|
||||||
|
task.model.objects.get = mock.Mock(return_value=job)
|
||||||
task.build_private_data_files = mock.Mock()
|
task.build_private_data_files = mock.Mock()
|
||||||
|
|
||||||
with mock.patch('awx.main.tasks.copy_tree'):
|
with mock.patch('awx.main.tasks.copy_tree'):
|
||||||
@@ -578,6 +580,7 @@ class TestAdhocRun(TestJobExecution):
|
|||||||
|
|
||||||
task = tasks.RunAdHocCommand()
|
task = tasks.RunAdHocCommand()
|
||||||
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
|
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
|
||||||
|
task.model.objects.get = mock.Mock(return_value=adhoc_job)
|
||||||
task.build_inventory = mock.Mock()
|
task.build_inventory = mock.Mock()
|
||||||
|
|
||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
|
|||||||
@@ -67,7 +67,9 @@ DATABASES = {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT = 10
|
AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10
|
||||||
|
AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES = 100
|
||||||
|
AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY = 5
|
||||||
AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default'
|
AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default'
|
||||||
AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner'
|
AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner'
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user