def __init__(self, client, cache_file=None, discoverer=None):
# Setting default here to delay evaluation of LazyDiscoverer class
# until constructor is called
discoverer = discoverer or LazyDiscoverer
self.client = client
self.configuration = client.configuration
self.__discoverer = discoverer(self, cache_file)
@property
def resources(self):
return self.__discoverer
@property
def version(self):
return self.__discoverer.version
def ensure_namespace(self, resource, namespace, body):
namespace = namespace or body.get('metadata', {}).get('namespace')
if not namespace:
raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind))
return namespace
def serialize_body(self, body):
"""Serialize body to raw dict so apiserver can handle it
:param body: kubernetes resource body, current support: Union[Dict, ResourceInstance]
"""
This should match any ResourceInstance
instances
if callable(getattr(body, ‘to_dict’, None)):
return body.to_dict()
return body or {}
def get(self, resource, name=None, namespace=None, **kwargs):
path = resource.path(name=name, namespace=namespace)
return self.request(‘get’, path, **kwargs)
def create(self, resource, body=None, namespace=None, **kwargs):
body = self.serialize_body(body)
if resource.namespaced:
namespace = self.ensure_namespace(resource, namespace, body)
path = resource.path(namespace=namespace)
return self.request(‘post’, path, body=body, **kwargs)
def delete(self, resource, name=None, namespace=None, body=None, label_selector=None, field_selector=None, **kwargs):
if not (name or label_selector or field_selector):
raise ValueError(“At least one of name|label_selector|field_selector is required”)
if resource.namespaced and not (label_selector or field_selector or namespace):
raise ValueError(“At least one of namespace|label_selector|field_selector is required”)
path = resource.path(name=name, namespace=namespace)
return self.request(‘delete’, path, body=body, label_selector=label_selector, field_selector=field_selector, **kwargs)
def replace(self, resource, body=None, name=None, namespace=None, **kwargs):
body = self.serialize_body(body)
name = name or body.get(‘metadata’, {}).get(‘name’)
if not name:
raise ValueError(“name is required to replace {}.{}”.format(resource.group_version, resource.kind))
if resource.namespaced:
namespace = self.ensure_namespace(resource, namespace, body)
path = resource.path(name=name, namespace=namespace)
return self.request(‘put’, path, body=body, **kwargs)
def patch(self, resource, body=None, name=None, namespace=None, **kwargs):
body = self.serialize_body(body)
name = name or body.get(‘metadata’, {}).get(‘name’)
if not name:
raise ValueError(“name is required to patch {}.{}”.format(resource.group_version, resource.kind))
if resource.namespaced:
namespace = self.ensure_namespace(resource, namespace, body)
content_type = kwargs.pop(‘content_type’, ‘application/strategic-merge-patch+json’)
path = resource.path(name=name, namespace=namespace)
return self.request(‘patch’, path, body=body, content_type=content_type, **kwargs)
def server_side_apply(self, resource, body=None, name=None, namespace=None, force_conflicts=None, **kwargs):
body = self.serialize_body(body)
name = name or body.get(‘metadata’, {}).get(‘name’)
if not name:
raise ValueError(“name is required to patch {}.{}”.format(resource.group_version, resource.kind))
if resource.namespaced:
namespace = self.ensure_namespace(resource, namespace, body)
force content type to ‘application/apply-patch+yaml’
kwargs.update({‘content_type’: ‘application/apply-patch+yaml’})
path = resource.path(name=name, namespace=namespace)
return self.request(‘patch’, path, body=body, force_conflicts=force_conflicts, **kwargs)
def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None):
“””
Stream events for a resource from the Kubernetes API
:param resource: The API resource object that will be used to query the API
:param namespace: The namespace to query
:param name: The name of the resource instance to query
:param label_selector: The label selector with which to filter results
:param field_selector: The field selector with which to filter results
:param resource_version: The version with which to filter results. Only events with
a resource_version greater than this value will be returned
:param timeout: The amount of time in seconds to wait before terminating the stream
:param watcher: The Watcher object that will be used to stream the resource
:return: Event object with these keys:
‘type’: The type of event such as “ADDED”, “DELETED”, etc.
‘raw_object’: a dict representing the watched object.
‘object’: A ResourceInstance wrapping raw_object.
Example:
client = DynamicClient(k8s_client)
watcher = watch.Watch()
v1_pods = client.resources.get(api_version=’v1’, kind=’Pod’)
for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5, watcher=watcher):
print(e[‘type’])
print(e[‘object’].metadata)
If you want to gracefully stop the stream watcher
watcher.stop()
“””
if not watcher: watcher = watch.Watch()
Use field selector to query for named instance so the watch parameter is handled properly.
if name:
field_selector = f”metadata.name={name}”
for event in watcher.stream(
resource.get,
namespace=namespace,
field_selector=field_selector,
label_selector=label_selector,
resource_version=resource_version,
serialize=False,
timeout_seconds=timeout
):
event[‘object’] = ResourceInstance(resource, event[‘object’])
yield event
@meta_request
def request(self, method, path, body=None, **params):
if not path.startswith(‘/’):
path = ‘/’ + path
path_params = params.get(‘path_params’, {})
query_params = params.get(‘query_params’, [])
if params.get(‘pretty’) is not None:
query_params.append((‘pretty’, params[‘pretty’]))
if params.get(‘_continue’) is not None:
query_params.append((‘continue’, params[‘_continue’]))
if params.get(‘include_uninitialized’) is not None:
query_params.append((‘includeUninitialized’, params[‘include_uninitialized’]))
if params.get(‘field_selector’) is not None:
query_params.append((‘fieldSelector’, params[‘field_selector’]))
if params.get(‘label_selector’) is not None:
query_params.append((‘labelSelector’, params[‘label_selector’]))
if params.get(‘limit’) is not None:
query_params.append((‘limit’, params[‘limit’]))
if params.get(‘resource_version’) is not None:
query_params.append((‘resourceVersion’, params[‘resource_version’]))
if params.get(‘timeout_seconds’) is not None:
query_params.append((‘timeoutSeconds’, params[‘timeout_seconds’]))
if params.get(‘watch’) is not None:
query_params.append((‘watch’, params[‘watch’]))
if params.get(‘grace_period_seconds’) is not None:
query_params.append((‘gracePeriodSeconds’, params[‘grace_period_seconds’]))
if params.get(‘propagation_policy’) is not None:
query_params.append((‘propagationPolicy’, params[‘propagation_policy’]))
if params.get(‘orphan_dependents’) is not None:
query_params.append((‘orphanDependents’, params[‘orphan_dependents’]))
if params.get(‘dry_run’) is not None:
query_params.append((‘dryRun’, params[‘dry_run’]))
if params.get(‘field_manager’) is not None:
query_params.append((‘fieldManager’, params[‘field_manager’]))
if params.get(‘force_conflicts’) is not None:
query_params.append((‘force’, params[‘force_conflicts’]))
header_params = params.get(‘header_params’, {})
form_params = []
local_var_files = {}
Checking Accept header.
new_header_params = dict((key.lower(), value) for key, value in header_params.items())
if not ‘accept’ in new_header_params:
header_params[‘Accept’] = self.client.select_header_accept([
‘application/json’,
‘application/yaml’,
])
HTTP header Content-Type
if params.get(‘content_type’):
header_params[‘Content-Type’] = params[‘content_type’]
else:
header_params[‘Content-Type’] = self.client.select_header_content_type([‘/’])
Authentication setting
auth_settings = [‘BearerToken’]
api_response = self.client.call_api(
path,
method.upper(),
path_params,
query_params,
header_params,
body=body,
post_params=form_params,
async_req=params.get(‘async_req’),
files=local_var_files,
auth_settings=auth_settings,
_preload_content=False,
_return_http_data_only=params.get(‘_return_http_data_only’, True),
_request_timeout=params.get(‘_request_timeout’)
)
if params.get(‘async_req’):
return api_response.get()
else:
return api_response
def validate(self, definition, version=None, strict=False):
“”“validate checks a kubernetes resource definition
Args:
definition (dict): resource definition
version (str): version of kubernetes to validate against
strict (bool): whether unexpected additional properties should be considered errors
Returns:
warnings (list), errors (list): warnings are missing validations, errors are validation failures
“””
if not HAS_KUBERNETES_VALIDATE:
raise KubernetesValidateMissing()