Prevent pods from failing if the reason is because of a resource quota

Signed-off-by: Shane McDonald <me@shanemcd.com>
This commit is contained in:
Shane McDonald 2019-10-05 14:07:33 -04:00 committed by Ryan Petrello
parent 08df2cad68
commit b93164e1ed
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
3 changed files with 57 additions and 14 deletions

View File

@ -26,9 +26,6 @@ class PodManager(object):
namespace=self.namespace,
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT)
# We don't do any fancy timeout logic here because it is handled
# at a higher level in the job spawning process. See
# settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT
while True:
pod = self.kube_api.read_namespaced_pod(name=self.pod_name,
namespace=self.namespace,

View File

@ -40,6 +40,9 @@ from django.utils.translation import ugettext_lazy as _
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
# Kubernetes
from kubernetes.client.rest import ApiException
# Django-CRUM
from crum import impersonate
@ -1183,6 +1186,18 @@ class BaseTask(object):
'''
Run the job/task and capture its output.
'''
self.instance = self.model.objects.get(pk=pk)
containerized = self.instance.is_containerized
pod_manager = None
if containerized:
# Here we are trying to launch a pod before transitioning the job into a running
# state. For some scenarios (like waiting for resources to become available) we do this
# rather than marking the job as error or failed. This is not always desirable. Cases
# such as invalid authentication should surface as an error.
pod_manager = self.deploy_container_group_pod(self.instance)
if not pod_manager:
return
# self.instance because of the update_model pattern and when it's used in callback handlers
self.instance = self.update_model(pk, status='running',
start_args='') # blank field to remove encrypted passwords
@ -1208,7 +1223,6 @@ class BaseTask(object):
try:
isolated = self.instance.is_isolated()
containerized = self.instance.is_containerized
self.instance.send_notification_templates("running")
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
@ -1287,6 +1301,10 @@ class BaseTask(object):
},
}
if containerized:
# We don't want HOME passed through to container groups.
params['envvars'].pop('HOME')
if isinstance(self.instance, AdHocCommand):
params['module'] = self.build_module_name(self.instance)
params['module_args'] = self.build_module_args(self.instance)
@ -1316,16 +1334,6 @@ class BaseTask(object):
params.pop('inventory'),
os.path.join(private_data_dir, 'inventory')
)
pod_manager = None
if containerized:
from awx.main.scheduler.kubernetes import PodManager # Avoid circular import
params['envvars'].pop('HOME')
pod_manager = PodManager(self.instance)
self.cleanup_paths.append(pod_manager.kube_config)
pod_manager.deploy()
self.instance.execution_node = pod_manager.pod_name
self.instance.save(update_fields=['execution_node'])
ansible_runner.utils.dump_artifacts(params)
isolated_manager_instance = isolated_manager.IsolatedManager(
@ -1385,6 +1393,41 @@ class BaseTask(object):
raise AwxTaskError.TaskError(self.instance, rc)
def deploy_container_group_pod(self, task):
from awx.main.scheduler.kubernetes import PodManager # Avoid circular import
pod_manager = PodManager(self.instance)
self.cleanup_paths.append(pod_manager.kube_config)
try:
log_name = task.log_format
logger.debug(f"Launching pod for {log_name}.")
pod_manager.deploy()
except (ApiException, Exception) as exc:
if isinstance(exc, ApiException) and exc.status == 403:
try:
if 'exceeded quota' in json.loads(exc.body)['message']:
# If the k8s cluster does not have capacity, we move the job back into
# pending and immediately reschedule the task manager.
logger.warn(exc.body)
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
time.sleep(10)
self.update_model(task.pk, status='pending')
schedule_task_manager()
return
except Exception:
logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.")
logger.exception(f"Error when launching pod for {log_name}")
self.update_model(task.pk, status='error', result_traceback=exc.body)
return
logger.debug(f"Pod online. Starting {log_name}.")
self.update_model(task.pk, execution_node=pod_manager.pod_name)
return pod_manager
@task()
class RunJob(BaseTask):
'''

View File

@ -368,6 +368,7 @@ class TestGenericRun():
task = tasks.RunJob()
task.update_model = mock.Mock(return_value=job)
task.model.objects.get = mock.Mock(return_value=job)
task.build_private_data_files = mock.Mock(side_effect=OSError())
with mock.patch('awx.main.tasks.copy_tree'):
@ -387,6 +388,7 @@ class TestGenericRun():
task = tasks.RunJob()
task.update_model = mock.Mock(wraps=update_model_wrapper)
task.model.objects.get = mock.Mock(return_value=job)
task.build_private_data_files = mock.Mock()
with mock.patch('awx.main.tasks.copy_tree'):
@ -578,6 +580,7 @@ class TestAdhocRun(TestJobExecution):
task = tasks.RunAdHocCommand()
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
task.model.objects.get = mock.Mock(return_value=adhoc_job)
task.build_inventory = mock.Mock()
with pytest.raises(Exception):