flux_local.kustomize
Library for generating kustomize commands to build local cluster resources.
Kustomize build can be used to apply overlays and output a set of resources or artifacts for the cluster that can be either be parsed directly or piped into additional commands for processing and filtering by kustomize grep.
This example returns the objects inside a Kustomization using kustomize build
:
from flux_local import kustomize
objects = await kustomize.build('/path/to/objects').objects()
for object in objects:
print(f"Found object {object['apiVersion']} {object['kind']}")
You can also filter documents to specific resource types or other fields:
from flux_local import kustomize
objects = await kustomize.build('/path/to/objects').grep('kind=ConfigMap').objects()
for object in objects:
print(f"Found ConfigMap: {object['metadata']['name']}")
It is also possible to find bare objects without a Kustomization:
from flux_local import kustomize
objects = await kustomize.grep('kind=ConfigMap', '/path/to/objects').objects()
for object in objects:
print(f"Found ConfigMap: {object['metadata']['name']}")
1"""Library for generating kustomize commands to build local cluster resources. 2 3Kustomize build can be used to apply overlays and output a set of resources or artifacts 4for the cluster that can be either be parsed directly or piped into additional 5commands for processing and filtering by kustomize grep. 6 7This example returns the objects inside a Kustomization using `kustomize build`: 8```python 9from flux_local import kustomize 10 11objects = await kustomize.build('/path/to/objects').objects() 12for object in objects: 13 print(f"Found object {object['apiVersion']} {object['kind']}") 14``` 15 16You can also filter documents to specific resource types or other fields: 17```python 18from flux_local import kustomize 19 20objects = await kustomize.build('/path/to/objects').grep('kind=ConfigMap').objects() 21for object in objects: 22 print(f"Found ConfigMap: {object['metadata']['name']}") 23``` 24 25It is also possible to find bare objects without a Kustomization: 26```python 27from flux_local import kustomize 28 29objects = await kustomize.grep('kind=ConfigMap', '/path/to/objects').objects() 30for object in objects: 31 print(f"Found ConfigMap: {object['metadata']['name']}") 32``` 33""" 34 35from aiofiles.ospath import isdir 36import asyncio 37from contextlib import asynccontextmanager 38from collections.abc import AsyncIterator 39import logging 40from pathlib import Path 41from typing import Any, AsyncGenerator 42 43import yaml 44 45from . import manifest 46from .command import Command, run_piped, Task, format_path 47from .exceptions import ( 48 InputException, 49 KustomizeException, 50 KustomizePathException, 51) 52from .manifest import Kustomization, HELM_RELEASE 53 54_LOGGER = logging.getLogger(__name__) 55 56__all__ = [ 57 "flux_build", 58 "grep", 59 "Kustomize", 60] 61 62KUSTOMIZE_BIN = "kustomize" 63FLUX_BIN = "flux" 64 65# Used to limit access to specific resources 66_LOCK_MAP: dict[str, asyncio.Lock] = {} 67 68 69class Kustomize: 70 """Library for issuing a kustomize command.""" 71 72 def __init__(self, cmds: list[Task]) -> None: 73 """Initialize Kustomize, used internally for copying object.""" 74 self._cmds = cmds 75 76 def grep(self, expr: str, invert: bool = False) -> "Kustomize": 77 """Filter resources based on an expression. 78 79 Example expressions: 80 `kind=HelmRelease` 81 `metadata.name=redis` 82 """ 83 out = [KUSTOMIZE_BIN, "cfg", "grep", expr] 84 if invert: 85 out.append("--invert-match") 86 return Kustomize(self._cmds + [Command(out, exc=KustomizeException)]) 87 88 def grep_helm_release( 89 self, helm_release: manifest.HelmRelease | None = None, invert: bool = False 90 ) -> "Kustomize": 91 """Filter the resources based on the specified HelmRelease.""" 92 if helm_release: 93 if invert: 94 raise InputException( 95 "Must specify either helm_release or invert but not both" 96 ) 97 return ( 98 self.grep(f"metadata.namespace=^{helm_release.namespace}$") 99 .grep(f"metadata.name=^{helm_release.name}$") 100 .grep(f"kind=^{HELM_RELEASE}$") 101 ) 102 if invert: 103 return self.grep(f"kind=^{HELM_RELEASE}$", invert=True) 104 raise InputException("Must specify either helm_release or invert") 105 106 async def run(self) -> str: 107 """Run the kustomize command and return the output as a string.""" 108 return await run_piped(self._cmds) 109 110 async def _docs( 111 self, target_namespace: str | None = None 112 ) -> AsyncGenerator[dict[str, Any], None]: 113 """Run the kustomize command and return the result documents.""" 114 out = await self.run() 115 for doc in yaml.safe_load_all(out): 116 if doc is None: 117 continue 118 if target_namespace is not None: 119 doc = update_namespace(doc, target_namespace) 120 yield doc 121 122 async def objects( 123 self, target_namespace: str | None = None 124 ) -> list[dict[str, Any]]: 125 """Run the kustomize command and return the result cluster objects as a list.""" 126 try: 127 return [doc async for doc in self._docs(target_namespace=target_namespace)] 128 except yaml.YAMLError as err: 129 raise KustomizeException( 130 f"Unable to parse command output: {self._cmds}: {err}" 131 ) from err 132 133 def skip_resources(self, kinds: list[str]) -> "Kustomize": 134 """Skip resources kinds of the specified types.""" 135 if not kinds: 136 return self 137 skip_re = "|".join(kinds) 138 return self.grep(f"kind=^({skip_re})$", invert=True) 139 140 def filter_resources(self, kinds: list[str]) -> "Kustomize": 141 """Skip resources kinds of the specified types.""" 142 if not kinds: 143 return self 144 skip_re = "|".join(kinds) 145 return self.grep(f"kind=^({skip_re})$", invert=False) 146 147 async def stash(self) -> "Kustomize": 148 """Memoize the contents built so far for efficient reuse. 149 150 This is useful to serialize a chain of commands but allow further 151 chaining with multiple branches. 152 """ 153 content = await self.run() 154 return Kustomize([Stash(content.encode("utf-8"))]) 155 156 157class Stash(Task): 158 """A task that memoizes output from a previous command.""" 159 160 def __init__(self, out: bytes) -> None: 161 """Initialize Stash.""" 162 self._out = out 163 164 async def run(self, stdin: bytes | None = None) -> bytes: 165 """Run the task.""" 166 return self._out 167 168 169@asynccontextmanager 170async def _resource_lock(key: str) -> AsyncIterator[None]: 171 """Run while holding a lock for the specified resource. 172 173 This is not threadsafe and expected to be run in the asyncio loop. 174 """ 175 if not (lock := _LOCK_MAP.get(key)): 176 lock = asyncio.Lock() 177 _LOCK_MAP[key] = lock 178 async with lock: 179 yield 180 181 182class FluxBuild(Task): 183 """A task that issues a flux build command.""" 184 185 def __init__(self, ks: Kustomization, path: Path) -> None: 186 """Initialize Build.""" 187 self._ks = ks 188 self._path = path 189 190 async def run(self, stdin: bytes | None = None) -> bytes: 191 """Run the task.""" 192 if stdin is not None: 193 raise InputException("Invalid stdin cannot be passed to build command") 194 if not await isdir(self._path): 195 raise KustomizePathException( 196 f"Kustomization '{self._ks.namespaced_name}' path field '{self._ks.path or ''}' is not a directory: {self._path}" 197 ) 198 199 args = [ 200 FLUX_BIN, 201 "build", 202 "ks", 203 self._ks.name, 204 "--dry-run", 205 "--kustomization-file", 206 "/dev/stdin", 207 "--path", 208 str(self._path), 209 ] 210 if self._ks.namespace: 211 args.extend( 212 [ 213 "--namespace", 214 self._ks.namespace, 215 ] 216 ) 217 kustomization_data = yaml.dump_all( 218 [self._ks.contents or {}], sort_keys=False, explicit_start=True 219 ) 220 input_ks = str(kustomization_data).encode("utf-8") 221 222 task = Command(args, cwd=None, exc=KustomizeException) 223 # `flux build` may mutate `kustomization.yaml` so we need to use the path as a resource key 224 resource_key = str(self._path.resolve()) 225 async with _resource_lock(resource_key): 226 return await task.run(stdin=input_ks) 227 228 def __str__(self) -> str: 229 """Render as a debug string.""" 230 return f"flux build {format_path(self._path)}" 231 232 233def flux_build(ks: Kustomization, path: Path) -> Kustomize: 234 """Build cluster artifacts from the specified path.""" 235 return Kustomize(cmds=[FluxBuild(ks, path)]) 236 237 238def grep(expr: str, path: Path, invert: bool = False) -> Kustomize: 239 """Filter resources in the specified path based on an expression.""" 240 args = [KUSTOMIZE_BIN, "cfg", "grep", expr] 241 if invert: 242 args.append("--invert-match") 243 cwd: Path | None = None 244 if path.is_absolute(): 245 args.append(".") 246 cwd = path 247 else: 248 args.append(str(path)) 249 return Kustomize([Command(args, cwd=cwd, exc=KustomizeException)]) 250 251 252def filter_resources(kinds: list[str], path: Path) -> Kustomize: 253 """Filter resources in the specified path based of a specific kind.""" 254 regexp = f"kind=^({'|'.join(kinds)})$" 255 return grep(regexp, path) 256 257 258def update_namespace(doc: dict[str, Any], namespace: str) -> dict[str, Any]: 259 """Update the namespace of the specified document. 260 261 Will only update the namespace if the doc appears to have a metadata/name. 262 """ 263 if (metadata := doc.get("metadata")) is not None and "name" in metadata: 264 doc["metadata"]["namespace"] = namespace 265 return doc
234def flux_build(ks: Kustomization, path: Path) -> Kustomize: 235 """Build cluster artifacts from the specified path.""" 236 return Kustomize(cmds=[FluxBuild(ks, path)])
Build cluster artifacts from the specified path.
239def grep(expr: str, path: Path, invert: bool = False) -> Kustomize: 240 """Filter resources in the specified path based on an expression.""" 241 args = [KUSTOMIZE_BIN, "cfg", "grep", expr] 242 if invert: 243 args.append("--invert-match") 244 cwd: Path | None = None 245 if path.is_absolute(): 246 args.append(".") 247 cwd = path 248 else: 249 args.append(str(path)) 250 return Kustomize([Command(args, cwd=cwd, exc=KustomizeException)])
Filter resources in the specified path based on an expression.
70class Kustomize: 71 """Library for issuing a kustomize command.""" 72 73 def __init__(self, cmds: list[Task]) -> None: 74 """Initialize Kustomize, used internally for copying object.""" 75 self._cmds = cmds 76 77 def grep(self, expr: str, invert: bool = False) -> "Kustomize": 78 """Filter resources based on an expression. 79 80 Example expressions: 81 `kind=HelmRelease` 82 `metadata.name=redis` 83 """ 84 out = [KUSTOMIZE_BIN, "cfg", "grep", expr] 85 if invert: 86 out.append("--invert-match") 87 return Kustomize(self._cmds + [Command(out, exc=KustomizeException)]) 88 89 def grep_helm_release( 90 self, helm_release: manifest.HelmRelease | None = None, invert: bool = False 91 ) -> "Kustomize": 92 """Filter the resources based on the specified HelmRelease.""" 93 if helm_release: 94 if invert: 95 raise InputException( 96 "Must specify either helm_release or invert but not both" 97 ) 98 return ( 99 self.grep(f"metadata.namespace=^{helm_release.namespace}$") 100 .grep(f"metadata.name=^{helm_release.name}$") 101 .grep(f"kind=^{HELM_RELEASE}$") 102 ) 103 if invert: 104 return self.grep(f"kind=^{HELM_RELEASE}$", invert=True) 105 raise InputException("Must specify either helm_release or invert") 106 107 async def run(self) -> str: 108 """Run the kustomize command and return the output as a string.""" 109 return await run_piped(self._cmds) 110 111 async def _docs( 112 self, target_namespace: str | None = None 113 ) -> AsyncGenerator[dict[str, Any], None]: 114 """Run the kustomize command and return the result documents.""" 115 out = await self.run() 116 for doc in yaml.safe_load_all(out): 117 if doc is None: 118 continue 119 if target_namespace is not None: 120 doc = update_namespace(doc, target_namespace) 121 yield doc 122 123 async def objects( 124 self, target_namespace: str | None = None 125 ) -> list[dict[str, Any]]: 126 """Run the kustomize command and return the result cluster objects as a list.""" 127 try: 128 return [doc async for doc in self._docs(target_namespace=target_namespace)] 129 except yaml.YAMLError as err: 130 raise KustomizeException( 131 f"Unable to parse command output: {self._cmds}: {err}" 132 ) from err 133 134 def skip_resources(self, kinds: list[str]) -> "Kustomize": 135 """Skip resources kinds of the specified types.""" 136 if not kinds: 137 return self 138 skip_re = "|".join(kinds) 139 return self.grep(f"kind=^({skip_re})$", invert=True) 140 141 def filter_resources(self, kinds: list[str]) -> "Kustomize": 142 """Skip resources kinds of the specified types.""" 143 if not kinds: 144 return self 145 skip_re = "|".join(kinds) 146 return self.grep(f"kind=^({skip_re})$", invert=False) 147 148 async def stash(self) -> "Kustomize": 149 """Memoize the contents built so far for efficient reuse. 150 151 This is useful to serialize a chain of commands but allow further 152 chaining with multiple branches. 153 """ 154 content = await self.run() 155 return Kustomize([Stash(content.encode("utf-8"))])
Library for issuing a kustomize command.
73 def __init__(self, cmds: list[Task]) -> None: 74 """Initialize Kustomize, used internally for copying object.""" 75 self._cmds = cmds
Initialize Kustomize, used internally for copying object.
77 def grep(self, expr: str, invert: bool = False) -> "Kustomize": 78 """Filter resources based on an expression. 79 80 Example expressions: 81 `kind=HelmRelease` 82 `metadata.name=redis` 83 """ 84 out = [KUSTOMIZE_BIN, "cfg", "grep", expr] 85 if invert: 86 out.append("--invert-match") 87 return Kustomize(self._cmds + [Command(out, exc=KustomizeException)])
Filter resources based on an expression.
Example expressions:
kind=HelmRelease
metadata.name=redis
89 def grep_helm_release( 90 self, helm_release: manifest.HelmRelease | None = None, invert: bool = False 91 ) -> "Kustomize": 92 """Filter the resources based on the specified HelmRelease.""" 93 if helm_release: 94 if invert: 95 raise InputException( 96 "Must specify either helm_release or invert but not both" 97 ) 98 return ( 99 self.grep(f"metadata.namespace=^{helm_release.namespace}$") 100 .grep(f"metadata.name=^{helm_release.name}$") 101 .grep(f"kind=^{HELM_RELEASE}$") 102 ) 103 if invert: 104 return self.grep(f"kind=^{HELM_RELEASE}$", invert=True) 105 raise InputException("Must specify either helm_release or invert")
Filter the resources based on the specified HelmRelease.
107 async def run(self) -> str: 108 """Run the kustomize command and return the output as a string.""" 109 return await run_piped(self._cmds)
Run the kustomize command and return the output as a string.
123 async def objects( 124 self, target_namespace: str | None = None 125 ) -> list[dict[str, Any]]: 126 """Run the kustomize command and return the result cluster objects as a list.""" 127 try: 128 return [doc async for doc in self._docs(target_namespace=target_namespace)] 129 except yaml.YAMLError as err: 130 raise KustomizeException( 131 f"Unable to parse command output: {self._cmds}: {err}" 132 ) from err
Run the kustomize command and return the result cluster objects as a list.
134 def skip_resources(self, kinds: list[str]) -> "Kustomize": 135 """Skip resources kinds of the specified types.""" 136 if not kinds: 137 return self 138 skip_re = "|".join(kinds) 139 return self.grep(f"kind=^({skip_re})$", invert=True)
Skip resources kinds of the specified types.
141 def filter_resources(self, kinds: list[str]) -> "Kustomize": 142 """Skip resources kinds of the specified types.""" 143 if not kinds: 144 return self 145 skip_re = "|".join(kinds) 146 return self.grep(f"kind=^({skip_re})$", invert=False)
Skip resources kinds of the specified types.
148 async def stash(self) -> "Kustomize": 149 """Memoize the contents built so far for efficient reuse. 150 151 This is useful to serialize a chain of commands but allow further 152 chaining with multiple branches. 153 """ 154 content = await self.run() 155 return Kustomize([Stash(content.encode("utf-8"))])
Memoize the contents built so far for efficient reuse.
This is useful to serialize a chain of commands but allow further chaining with multiple branches.