- .github
- devel
- doc
-
examples
-
dynamic-client
-
notebooks
-
watch
-
yaml_dir
- README.md
- __init__.py
- annotate_deployment.py
- api_discovery.py
- apply_from_dict.py
- apply_from_directory.py
- apply_from_single_file.py
- cluster_scoped_custom_object.py
- cronjob_crud.py
- deployment_create.py
- deployment_crud.py
- duration-gep2257.py
- in_cluster_config.py
- ingress_create.py
- job_crud.py
- multiple_clusters.py
- namespaced_custom_object.py
- node_labels.py
- out_of_cluster_config.py
- pick_kube_config_context.py
- pod_config_list.py
- pod_exec.py
- pod_portforward.py
- remote_cluster.py
- rollout-daemonset.py
- rollout-statefulset.py
-
dynamic-client
- kubernetes
- scripts
- .gitignore
- CHANGELOG.md
- CONTRIBUTING.md
- LICENSE
- MANIFEST.in
- OWNERS
- README.md
- SECURITY_CONTACTS
- code-of-conduct.md
- codecov.yml
- requirements.txt
- setup.cfg
- setup.py
- test-requirements.txt
- tox.ini
Explanation
This code defines a Watch
class for streaming Kubernetes API events in Python. It provides a mechanism to observe changes to resources like pods, deployments, and namespaces in real-time. Here’s a breakdown of the code:
Core Components
Watch
class: The main class responsible for managing watch operations.__init__(self, return_type=None)
: Initializes the watch object.return_type
: Optional type of the object returned by the watched API function. If not provided, it’s inferred from the function’s docstring.stop(self)
: Sets a flag to stop the watch loop.get_return_type(self, func)
: Determines the return type of the watched API function, either from thereturn_type
argument or by parsing the function’s docstring.get_watch_argument_name(self, func)
: Identifies the name of the watch argument in the API function (e.g., ‘follow’ or ‘watch’).unmarshal_event(self, data, return_type)
: Parses the received event data from the stream, deserializes the object, and updates the resource version.stream(self, func, *args, **kwargs)
: The core watch method that:
- Sets up the watch arguments.
- Enters a loop that retrieves data from the API using the provided function.
- Iterates over the stream of events received from the API.
- Unmarshals events into
'type'
,'raw_object'
, and'object'
dictionary keys. - Handles ‘ERROR’ events, retrying if the API call expired.
- Yields the unmarshalled event data.
- Handles the
stop()
flag to terminate the watch.
Key Concepts
- Kubernetes API Watch: The Kubernetes API allows you to watch resources for changes. It provides a stream of events (ADD, DELETE, UPDATE) representing modifications.
- Resource Version: A unique version identifier used to ensure consistency when working with the Kubernetes API. When watching resources, a
resource_version
is used to determine the starting point for the watch and ensure you don’t miss any events. - Event Handling: The code handles various event types (ADD, DELETE, UPDATE, ERROR), and provides a mechanism to retry expired watches.
Example Usage
from kubernetes import client
from kubernetes.watch import Watch
v1 = client.CoreV1Api()
watch = Watch()
for event in watch.stream(v1.list_namespace, resource_version="1127"):
type = event['type']
object = event['object'] # object is one of type return_type
raw_object = event['raw_object'] # raw_object is a dict
if should_stop:
watch.stop()
Additional Notes
- The code assumes that the API functions have docstrings specifying the return type and watch argument name.
- This code simplifies the watch process by automatically inferring the object type from the API function’s docstring, but users can also explicitly specify the
return_type
if needed. - The code provides retry logic in case the watch expires, ensuring a certain level of robustness in handling transient errors.
Overall
This Python code provides a robust and convenient mechanism to implement watch operations for Kubernetes resources, enabling real-time monitoring and event-driven application development.
Graph
The graph shows the usage of functions within the codebase.
Select a code symbol to view it's graph