Implement retry logic for container group pod launches

This commit is contained in:
Shane McDonald
2019-10-05 23:09:53 -04:00
committed by Ryan Petrello
parent b93164e1ed
commit 8f75382b81
3 changed files with 28 additions and 15 deletions

View File

@@ -3,6 +3,7 @@ import stat
import time import time
import yaml import yaml
import tempfile import tempfile
import logging
from base64 import b64encode from base64 import b64encode
from django.conf import settings from django.conf import settings
@@ -11,6 +12,8 @@ from django.utils.functional import cached_property
from awx.main.utils.common import parse_yaml_or_json from awx.main.utils.common import parse_yaml_or_json
logger = logging.getLogger('awx.main.scheduler')
class PodManager(object): class PodManager(object):
@@ -21,29 +24,33 @@ class PodManager(object):
if not self.credential.kubernetes: if not self.credential.kubernetes:
raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential') raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential')
self.kube_api.create_namespaced_pod(body=self.pod_definition, self.kube_api.create_namespaced_pod(body=self.pod_definition,
namespace=self.namespace, namespace=self.namespace,
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
while True: num_retries = settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES
for retry_attempt in range(num_retries - 1):
logger.debug(f"Checking for pod {self.pod_name}. Attempt {retry_attempt + 1} of {num_retries}")
pod = self.kube_api.read_namespaced_pod(name=self.pod_name, pod = self.kube_api.read_namespaced_pod(name=self.pod_name,
namespace=self.namespace, namespace=self.namespace,
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
if pod.status.phase != 'Pending': if pod.status.phase != 'Pending':
break break
time.sleep(1) else:
logger.debug(f"Pod {self.pod_name} is Pending.")
time.sleep(settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY)
continue
if pod.status.phase == 'Running': if pod.status.phase == 'Running':
logger.debug(f"Pod {self.pod_name} is online.")
return pod return pod
else: else:
raise RuntimeError(f"Unhandled Pod phase: {pod.status.phase}") logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.")
def delete(self): def delete(self):
return self.kube_api.delete_namespaced_pod(name=self.pod_name, return self.kube_api.delete_namespaced_pod(name=self.pod_name,
namespace=self.namespace, namespace=self.namespace,
_request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
@property @property
def namespace(self): def namespace(self):

View File

@@ -1405,13 +1405,15 @@ class BaseTask(object):
if isinstance(exc, ApiException) and exc.status == 403: if isinstance(exc, ApiException) and exc.status == 403:
try: try:
if 'exceeded quota' in json.loads(exc.body)['message']: if 'exceeded quota' in json.loads(exc.body)['message']:
# If the k8s cluster does not have capacity, we move the job back into # If the k8s cluster does not have capacity, we move the
# pending and immediately reschedule the task manager. # job back into pending and wait until the next run of
# the task manager. This does not exactly play well with
# our current instance group precendence logic, since it
# will just sit here forever if kubernetes returns this
# error.
logger.warn(exc.body) logger.warn(exc.body)
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
time.sleep(10)
self.update_model(task.pk, status='pending') self.update_model(task.pk, status='pending')
schedule_task_manager()
return return
except Exception: except Exception:
logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.")
@@ -1420,7 +1422,6 @@ class BaseTask(object):
self.update_model(task.pk, status='error', result_traceback=exc.body) self.update_model(task.pk, status='error', result_traceback=exc.body)
return return
logger.debug(f"Pod online. Starting {log_name}.")
self.update_model(task.pk, execution_node=pod_manager.pod_name) self.update_model(task.pk, execution_node=pod_manager.pod_name)
return pod_manager return pod_manager
@@ -1833,7 +1834,10 @@ class RunJob(BaseTask):
if job.is_containerized: if job.is_containerized:
from awx.main.scheduler.kubernetes import PodManager # prevent circular import from awx.main.scheduler.kubernetes import PodManager # prevent circular import
PodManager(job).delete() pm = PodManager(job)
logger.debug(f"Deleting pod {pm.pod_name}")
pm.delete()
try: try:
inventory = job.inventory inventory = job.inventory

View File

@@ -67,7 +67,9 @@ DATABASES = {
} }
} }
AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT = 10 AWX_CONTAINER_GROUP_K8S_API_TIMEOUT = 10
AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES = 100
AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY = 5
AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default' AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = 'default'
AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner' AWX_CONTAINER_GROUP_DEFAULT_IMAGE = 'ansible/ansible-runner'