Update pod reaper to work with receptor launched pods

This commit is contained in:
Shane McDonald 2021-04-05 17:45:15 -04:00
parent 6294ddfded
commit 2d48b24ef2
No known key found for this signature in database
GPG Key ID: 6F374AF6E9EB9374
6 changed files with 52 additions and 76 deletions

View File

@ -29,7 +29,7 @@ from awx.main.utils.safe_yaml import sanitize_jinja
# other AWX imports
from awx.main.models.rbac import batch_role_ancestor_rebuilding
from awx.main.utils import ignore_inventory_computed_fields, get_licenser
from awx.main.utils.execution_environments import get_execution_environment_default
from awx.main.utils.execution_environments import get_default_execution_environment
from awx.main.signals import disable_activity_stream
from awx.main.constants import STANDARD_INVENTORY_UPDATE_ENV
from awx.main.utils.pglock import advisory_lock
@ -91,7 +91,7 @@ class AnsibleInventoryLoader(object):
bargs.extend(['-v', '{0}:{0}:Z'.format(self.source)])
for key, value in STANDARD_INVENTORY_UPDATE_ENV.items():
bargs.extend(['-e', '{0}={1}'.format(key, value)])
bargs.extend([get_execution_environment_default().image])
bargs.extend([get_default_execution_environment().image])
bargs.extend(['ansible-inventory', '-i', self.source])
bargs.extend(['--playbook-dir', functioning_dir(self.source)])
if self.verbosity:

View File

@ -21,7 +21,7 @@ from django.utils.translation import ugettext_lazy as _
from awx.main.models.base import prevent_search
from awx.main.models.rbac import Role, RoleAncestorEntry, get_roles_on_resource
from awx.main.utils import parse_yaml_or_json, get_custom_venv_choices, get_licenser, polymorphic
from awx.main.utils.execution_environments import get_execution_environment_default
from awx.main.utils.execution_environments import get_default_execution_environment
from awx.main.utils.encryption import decrypt_value, get_encryption_key, is_encrypted
from awx.main.utils.polymorphic import build_polymorphic_ctypes_map
from awx.main.fields import JSONField, AskForField
@ -476,7 +476,7 @@ class ExecutionEnvironmentMixin(models.Model):
if self.inventory.organization.default_environment is not None:
return self.inventory.organization.default_environment
return get_execution_environment_default()
return get_default_execution_environment()
class CustomVirtualEnvMixin(models.Model):

View File

@ -184,7 +184,7 @@ class ProjectOptions(models.Model):
Jobs using the project can use the default_environment, but the project updates
are not flexible enough to allow customizing the image they use.
"""
return self.get_execution_environment_default()
return self.get_default_execution_environment()
def get_project_path(self, check_if_exists=True):
local_path = os.path.basename(self.local_path)

View File

@ -8,6 +8,7 @@ from kubernetes import client, config
from django.utils.functional import cached_property
from awx.main.utils.common import parse_yaml_or_json
from awx.main.utils.execution_environments import get_default_pod_spec
logger = logging.getLogger('awx.main.scheduler')
@ -33,46 +34,23 @@ class PodManager(object):
def __init__(self, task=None):
self.task = task
def deploy(self):
if not self.credential.kubernetes:
raise RuntimeError('Pod deployment cannot occur without a Kubernetes credential')
self.kube_api.create_namespaced_pod(body=self.pod_definition, namespace=self.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
num_retries = settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRIES
for retry_attempt in range(num_retries - 1):
logger.debug(f"Checking for pod {self.pod_name}. Attempt {retry_attempt + 1} of {num_retries}")
pod = self.kube_api.read_namespaced_pod(name=self.pod_name, namespace=self.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
if pod.status.phase != 'Pending':
break
else:
logger.debug(f"Pod {self.pod_name} is Pending.")
time.sleep(settings.AWX_CONTAINER_GROUP_POD_LAUNCH_RETRY_DELAY)
continue
if pod.status.phase == 'Running':
logger.debug(f"Pod {self.pod_name} is online.")
return pod
else:
logger.warn(f"Pod {self.pod_name} did not start. Status is {pod.status.phase}.")
@classmethod
def list_active_jobs(self, instance_group):
task = collections.namedtuple('Task', 'id instance_group')(id='', instance_group=instance_group)
pm = PodManager(task)
pods = {}
try:
for pod in pm.kube_api.list_namespaced_pod(pm.namespace, label_selector='ansible-awx={}'.format(settings.INSTALL_UUID)).to_dict().get('items', []):
job = pod['metadata'].get('labels', {}).get('ansible-awx-job-id')
if job:
try:
yield int(job)
pods[int(job)] = pod['metadata']['name']
except ValueError:
pass
except Exception:
logger.exception('Failed to list pods for container group {}'.format(instance_group))
def delete(self):
return self.kube_api.delete_namespaced_pod(name=self.pod_name, namespace=self.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
return pods
@property
def namespace(self):
@ -91,10 +69,16 @@ class PodManager(object):
# this feels a little janky, but it's what k8s' own code does
# internally when it reads kube config files from disk:
# https://github.com/kubernetes-client/python-base/blob/0b208334ef0247aad9afcaae8003954423b61a0d/config/kube_config.py#L643
loader = config.kube_config.KubeConfigLoader(config_dict=self.kube_config)
cfg = type.__call__(client.Configuration)
loader.load_and_set(cfg)
return client.CoreV1Api(api_client=client.ApiClient(configuration=cfg))
if self.credential:
loader = config.kube_config.KubeConfigLoader(config_dict=self.kube_config)
cfg = type.__call__(client.Configuration)
loader.load_and_set(cfg)
api = client.CoreV1Api(api_client=client.ApiClient(configuration=cfg))
else:
config.load_incluster_config()
api = client.CoreV1Api()
return api
@property
def pod_name(self):
@ -102,22 +86,7 @@ class PodManager(object):
@property
def pod_definition(self):
default_pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE},
"spec": {
"containers": [
{
"image": settings.AWX_CONTAINER_GROUP_DEFAULT_IMAGE,
"tty": True,
"stdin": True,
"imagePullPolicy": "Always",
"args": ['sleep', 'infinity'],
}
]
},
}
default_pod_spec = get_default_pod_spec()
pod_spec_override = {}
if self.task and self.task.instance_group.pod_spec_override:

View File

@ -97,7 +97,7 @@ from awx.main.utils import (
deepmerge,
parse_yaml_or_json,
)
from awx.main.utils.execution_environments import get_execution_environment_default
from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.external_logging import reconfigure_rsyslog
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
@ -525,15 +525,16 @@ def cluster_node_heartbeat():
def awx_k8s_reaper():
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
for group in InstanceGroup.objects.filter(credential__isnull=False).iterator():
if group.is_container_group:
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
for job in UnifiedJob.objects.filter(pk__in=list(PodManager.list_active_jobs(group))).exclude(status__in=ACTIVE_STATES):
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
try:
PodManager(job).delete()
except Exception:
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
for group in InstanceGroup.objects.filter(is_container_group=True).iterator():
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
pods = PodManager.list_active_jobs(group)
for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES):
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
try:
pm = PodManager(job)
pm.kube_api.delete_namespaced_pod(name=pods[job.id], namespace=pm.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
except Exception:
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
@task(queue=get_local_queuename)
@ -3131,21 +3132,10 @@ class AWXReceptorJob:
if self.task:
ee = self.task.instance.resolve_execution_environment()
else:
ee = get_execution_environment_default()
ee = get_default_execution_environment()
default_pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE},
"spec": {
"containers": [
{
"image": ee.image,
"name": 'worker',
}
],
},
}
default_pod_spec = get_default_pod_spec()
default_pod_spec['spec']['containers'][0]['image'] = ee.image
pod_spec_override = {}
if self.task and self.task.instance.instance_group.pod_spec_override:

View File

@ -3,7 +3,24 @@ from django.conf import settings
from awx.main.models.execution_environments import ExecutionEnvironment
def get_execution_environment_default():
def get_default_execution_environment():
if settings.DEFAULT_EXECUTION_ENVIRONMENT is not None:
return settings.DEFAULT_EXECUTION_ENVIRONMENT
return ExecutionEnvironment.objects.filter(organization=None, managed_by_tower=True).first()
def get_default_pod_spec():
return {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"namespace": settings.AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE},
"spec": {
"containers": [
{
"image": get_default_execution_environment().image,
"name": 'worker',
}
],
},
}