Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
import datetime
import os
from kubernetes.client import Configuration
from .config_exception import ConfigException
SERVICE_HOST_ENV_NAME = “KUBERNETES_SERVICE_HOST”
SERVICE_PORT_ENV_NAME = “KUBERNETES_SERVICE_PORT”
SERVICE_TOKEN_FILENAME = “/var/run/secrets/kubernetes.io/serviceaccount/token”
SERVICE_CERT_FILENAME = “/var/run/secrets/kubernetes.io/serviceaccount/ca.crt”
def _join_host_port(host, port):
“”“Adapted golang’s net.JoinHostPort”“”
template = “%s:%s”
host_requires_bracketing = ‘:’ in host or ‘%’ in host
if host_requires_bracketing:
template = “[%s]:%s”
return template % (host, port)
class InClusterConfigLoader(object):
def init(self,
token_filename,
cert_filename,
try_refresh_token=True,
environ=os.environ):
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ
self._try_refresh_token = try_refresh_token
self._token_refresh_period = datetime.timedelta(minutes=1)
def load_and_set(self, client_configuration=None):
try_set_default = False
if client_configuration is None:
client_configuration = type.__call__(Configuration)
try_set_default = True
self._load_config()
self._set_config(client_configuration)
if try_set_default:
Configuration.set_default(client_configuration)
def _load_config(self):
if (SERVICE_HOST_ENV_NAME not in self._environ
or SERVICE_PORT_ENV_NAME not in self._environ):
raise ConfigException("Service host/port is not set.")
if (not self._environ[SERVICE_HOST_ENV_NAME]
or not self._environ[SERVICE_PORT_ENV_NAME]):
raise ConfigException("Service host/port is set but empty.")
self.host = ("https://" +
_join_host_port(self._environ[SERVICE_HOST_ENV_NAME],
self._environ[SERVICE_PORT_ENV_NAME]))
if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exist.")
self._read_token_file()
if not os.path.isfile(self._cert_filename):
raise ConfigException(
"Service certification file does not exist.")
with open(self._cert_filename) as f:
if not f.read():
raise ConfigException("Cert file exists but empty.")
self.ssl_ca_cert = self._cert_filename
def _set_config(self, client_configuration):
client_configuration.host = self.host
client_configuration.ssl_ca_cert = self.ssl_ca_cert
if self.token is not None:
client_configuration.api_key['authorization'] = self.token
if not self._try_refresh_token:
return
def _refresh_api_key(client_configuration):
if self.token_expires_at <= datetime.datetime.now():
self._read_token_file()
self._set_config(client_configuration)
client_configuration.refresh_api_key_hook = _refresh_api_key
def _read_token_file(self):
with open(self._token_filename) as f:
content = f.read()
if not content:
raise ConfigException("Token file exists but empty.")
self.token = "bearer " + content
self.token_expires_at = datetime.datetime.now(
) + self._token_refresh_period
def load_incluster_config(client_configuration=None, try_refresh_token=True):
“””
Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It’s intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment.”“”
InClusterConfigLoader(
token_filename=SERVICE_TOKEN_FILENAME,
cert_filename=SERVICE_CERT_FILENAME,
try_refresh_token=try_refresh_token).load_and_set(client_configuration)