flux_local.kustomize

Library for generating kustomize commands to build local cluster resources.

Kustomize build can be used to apply overlays and output a set of resources or artifacts for the cluster that can be either be parsed directly or piped into additional commands for processing and filtering by kustomize grep.

This example returns the objects inside a Kustomization using kustomize build:

from flux_local import kustomize

objects = await kustomize.build('/path/to/objects').objects()
for object in objects:
    print(f"Found object {object['apiVersion']} {object['kind']}")

You can also filter documents to specific resource types or other fields:

from flux_local import kustomize

objects = await kustomize.build('/path/to/objects').grep('kind=ConfigMap').objects()
for object in objects:
    print(f"Found ConfigMap: {object['metadata']['name']}")

It is also possible to find bare objects without a Kustomization:

from flux_local import kustomize

objects = await kustomize.grep('kind=ConfigMap', '/path/to/objects').objects()
for object in objects:
    print(f"Found ConfigMap: {object['metadata']['name']}")
  1"""Library for generating kustomize commands to build local cluster resources.
  2
  3Kustomize build can be used to apply overlays and output a set of resources or artifacts
  4for the cluster that can be either be parsed directly or piped into additional
  5commands for processing and filtering by kustomize grep.
  6
  7This example returns the objects inside a Kustomization using `kustomize build`:
  8```python
  9from flux_local import kustomize
 10
 11objects = await kustomize.build('/path/to/objects').objects()
 12for object in objects:
 13    print(f"Found object {object['apiVersion']} {object['kind']}")
 14```
 15
 16You can also filter documents to specific resource types or other fields:
 17```python
 18from flux_local import kustomize
 19
 20objects = await kustomize.build('/path/to/objects').grep('kind=ConfigMap').objects()
 21for object in objects:
 22    print(f"Found ConfigMap: {object['metadata']['name']}")
 23```
 24
 25It is also possible to find bare objects without a Kustomization:
 26```python
 27from flux_local import kustomize
 28
 29objects = await kustomize.grep('kind=ConfigMap', '/path/to/objects').objects()
 30for object in objects:
 31    print(f"Found ConfigMap: {object['metadata']['name']}")
 32```
 33"""
 34
 35from aiofiles.ospath import isdir
 36import asyncio
 37from contextlib import asynccontextmanager
 38from collections.abc import AsyncIterator
 39import logging
 40from pathlib import Path
 41from typing import Any, AsyncGenerator
 42
 43import yaml
 44
 45from . import manifest
 46from .command import Command, run_piped, Task, format_path
 47from .exceptions import (
 48    InputException,
 49    KustomizeException,
 50    KustomizePathException,
 51)
 52from .manifest import Kustomization, HELM_RELEASE
 53
 54_LOGGER = logging.getLogger(__name__)
 55
 56__all__ = [
 57    "flux_build",
 58    "grep",
 59    "Kustomize",
 60]
 61
 62KUSTOMIZE_BIN = "kustomize"
 63FLUX_BIN = "flux"
 64
 65# Used to limit access to specific resources
 66_LOCK_MAP: dict[str, asyncio.Lock] = {}
 67
 68
 69class Kustomize:
 70    """Library for issuing a kustomize command."""
 71
 72    def __init__(self, cmds: list[Task]) -> None:
 73        """Initialize Kustomize, used internally for copying object."""
 74        self._cmds = cmds
 75
 76    def grep(self, expr: str, invert: bool = False) -> "Kustomize":
 77        """Filter resources based on an expression.
 78
 79        Example expressions:
 80          `kind=HelmRelease`
 81          `metadata.name=redis`
 82        """
 83        out = [KUSTOMIZE_BIN, "cfg", "grep", expr]
 84        if invert:
 85            out.append("--invert-match")
 86        return Kustomize(self._cmds + [Command(out, exc=KustomizeException)])
 87
 88    def grep_helm_release(
 89        self, helm_release: manifest.HelmRelease | None = None, invert: bool = False
 90    ) -> "Kustomize":
 91        """Filter the resources based on the specified HelmRelease."""
 92        if helm_release:
 93            if invert:
 94                raise InputException(
 95                    "Must specify either helm_release or invert but not both"
 96                )
 97            return (
 98                self.grep(f"metadata.namespace=^{helm_release.namespace}$")
 99                .grep(f"metadata.name=^{helm_release.name}$")
100                .grep(f"kind=^{HELM_RELEASE}$")
101            )
102        if invert:
103            return self.grep(f"kind=^{HELM_RELEASE}$", invert=True)
104        raise InputException("Must specify either helm_release or invert")
105
106    async def run(self) -> str:
107        """Run the kustomize command and return the output as a string."""
108        return await run_piped(self._cmds)
109
110    async def _docs(
111        self, target_namespace: str | None = None
112    ) -> AsyncGenerator[dict[str, Any], None]:
113        """Run the kustomize command and return the result documents."""
114        out = await self.run()
115        for doc in yaml.safe_load_all(out):
116            if doc is None:
117                continue
118            if target_namespace is not None:
119                doc = update_namespace(doc, target_namespace)
120            yield doc
121
122    async def objects(
123        self, target_namespace: str | None = None
124    ) -> list[dict[str, Any]]:
125        """Run the kustomize command and return the result cluster objects as a list."""
126        try:
127            return [doc async for doc in self._docs(target_namespace=target_namespace)]
128        except yaml.YAMLError as err:
129            raise KustomizeException(
130                f"Unable to parse command output: {self._cmds}: {err}"
131            ) from err
132
133    def skip_resources(self, kinds: list[str]) -> "Kustomize":
134        """Skip resources kinds of the specified types."""
135        if not kinds:
136            return self
137        skip_re = "|".join(kinds)
138        return self.grep(f"kind=^({skip_re})$", invert=True)
139
140    def filter_resources(self, kinds: list[str]) -> "Kustomize":
141        """Skip resources kinds of the specified types."""
142        if not kinds:
143            return self
144        skip_re = "|".join(kinds)
145        return self.grep(f"kind=^({skip_re})$", invert=False)
146
147    async def stash(self) -> "Kustomize":
148        """Memoize the contents built so far for efficient reuse.
149
150        This is useful to serialize a chain of commands but allow further
151        chaining with multiple branches.
152        """
153        content = await self.run()
154        return Kustomize([Stash(content.encode("utf-8"))])
155
156
157class Stash(Task):
158    """A task that memoizes output from a previous command."""
159
160    def __init__(self, out: bytes) -> None:
161        """Initialize Stash."""
162        self._out = out
163
164    async def run(self, stdin: bytes | None = None) -> bytes:
165        """Run the task."""
166        return self._out
167
168
169@asynccontextmanager
170async def _resource_lock(key: str) -> AsyncIterator[None]:
171    """Run while holding a lock for the specified resource.
172
173    This is not threadsafe and expected to be run in the asyncio loop.
174    """
175    if not (lock := _LOCK_MAP.get(key)):
176        lock = asyncio.Lock()
177        _LOCK_MAP[key] = lock
178    async with lock:
179        yield
180
181
182class FluxBuild(Task):
183    """A task that issues a flux build command."""
184
185    def __init__(self, ks: Kustomization, path: Path) -> None:
186        """Initialize Build."""
187        self._ks = ks
188        self._path = path
189
190    async def run(self, stdin: bytes | None = None) -> bytes:
191        """Run the task."""
192        if stdin is not None:
193            raise InputException("Invalid stdin cannot be passed to build command")
194        if not await isdir(self._path):
195            raise KustomizePathException(
196                f"Kustomization '{self._ks.namespaced_name}' path field '{self._ks.path or ''}' is not a directory: {self._path}"
197            )
198
199        args = [
200            FLUX_BIN,
201            "build",
202            "ks",
203            self._ks.name,
204            "--dry-run",
205            "--kustomization-file",
206            "/dev/stdin",
207            "--path",
208            str(self._path),
209        ]
210        if self._ks.namespace:
211            args.extend(
212                [
213                    "--namespace",
214                    self._ks.namespace,
215                ]
216            )
217        kustomization_data = yaml.dump_all(
218            [self._ks.contents or {}], sort_keys=False, explicit_start=True
219        )
220        input_ks = str(kustomization_data).encode("utf-8")
221
222        task = Command(args, cwd=None, exc=KustomizeException)
223        # `flux build` may mutate `kustomization.yaml` so we need to use the path as a resource key
224        resource_key = str(self._path.resolve())
225        async with _resource_lock(resource_key):
226            return await task.run(stdin=input_ks)
227
228    def __str__(self) -> str:
229        """Render as a debug string."""
230        return f"flux build {format_path(self._path)}"
231
232
233def flux_build(ks: Kustomization, path: Path) -> Kustomize:
234    """Build cluster artifacts from the specified path."""
235    return Kustomize(cmds=[FluxBuild(ks, path)])
236
237
238def grep(expr: str, path: Path, invert: bool = False) -> Kustomize:
239    """Filter resources in the specified path based on an expression."""
240    args = [KUSTOMIZE_BIN, "cfg", "grep", expr]
241    if invert:
242        args.append("--invert-match")
243    cwd: Path | None = None
244    if path.is_absolute():
245        args.append(".")
246        cwd = path
247    else:
248        args.append(str(path))
249    return Kustomize([Command(args, cwd=cwd, exc=KustomizeException)])
250
251
252def filter_resources(kinds: list[str], path: Path) -> Kustomize:
253    """Filter resources in the specified path based of a specific kind."""
254    regexp = f"kind=^({'|'.join(kinds)})$"
255    return grep(regexp, path)
256
257
258def update_namespace(doc: dict[str, Any], namespace: str) -> dict[str, Any]:
259    """Update the namespace of the specified document.
260
261    Will only update the namespace if the doc appears to have a metadata/name.
262    """
263    if (metadata := doc.get("metadata")) is not None and "name" in metadata:
264        doc["metadata"]["namespace"] = namespace
265    return doc
def flux_build( ks: flux_local.manifest.Kustomization, path: pathlib._local.Path) -> Kustomize:
234def flux_build(ks: Kustomization, path: Path) -> Kustomize:
235    """Build cluster artifacts from the specified path."""
236    return Kustomize(cmds=[FluxBuild(ks, path)])

Build cluster artifacts from the specified path.

def grep( expr: str, path: pathlib._local.Path, invert: bool = False) -> Kustomize:
239def grep(expr: str, path: Path, invert: bool = False) -> Kustomize:
240    """Filter resources in the specified path based on an expression."""
241    args = [KUSTOMIZE_BIN, "cfg", "grep", expr]
242    if invert:
243        args.append("--invert-match")
244    cwd: Path | None = None
245    if path.is_absolute():
246        args.append(".")
247        cwd = path
248    else:
249        args.append(str(path))
250    return Kustomize([Command(args, cwd=cwd, exc=KustomizeException)])

Filter resources in the specified path based on an expression.

class Kustomize:
 70class Kustomize:
 71    """Library for issuing a kustomize command."""
 72
 73    def __init__(self, cmds: list[Task]) -> None:
 74        """Initialize Kustomize, used internally for copying object."""
 75        self._cmds = cmds
 76
 77    def grep(self, expr: str, invert: bool = False) -> "Kustomize":
 78        """Filter resources based on an expression.
 79
 80        Example expressions:
 81          `kind=HelmRelease`
 82          `metadata.name=redis`
 83        """
 84        out = [KUSTOMIZE_BIN, "cfg", "grep", expr]
 85        if invert:
 86            out.append("--invert-match")
 87        return Kustomize(self._cmds + [Command(out, exc=KustomizeException)])
 88
 89    def grep_helm_release(
 90        self, helm_release: manifest.HelmRelease | None = None, invert: bool = False
 91    ) -> "Kustomize":
 92        """Filter the resources based on the specified HelmRelease."""
 93        if helm_release:
 94            if invert:
 95                raise InputException(
 96                    "Must specify either helm_release or invert but not both"
 97                )
 98            return (
 99                self.grep(f"metadata.namespace=^{helm_release.namespace}$")
100                .grep(f"metadata.name=^{helm_release.name}$")
101                .grep(f"kind=^{HELM_RELEASE}$")
102            )
103        if invert:
104            return self.grep(f"kind=^{HELM_RELEASE}$", invert=True)
105        raise InputException("Must specify either helm_release or invert")
106
107    async def run(self) -> str:
108        """Run the kustomize command and return the output as a string."""
109        return await run_piped(self._cmds)
110
111    async def _docs(
112        self, target_namespace: str | None = None
113    ) -> AsyncGenerator[dict[str, Any], None]:
114        """Run the kustomize command and return the result documents."""
115        out = await self.run()
116        for doc in yaml.safe_load_all(out):
117            if doc is None:
118                continue
119            if target_namespace is not None:
120                doc = update_namespace(doc, target_namespace)
121            yield doc
122
123    async def objects(
124        self, target_namespace: str | None = None
125    ) -> list[dict[str, Any]]:
126        """Run the kustomize command and return the result cluster objects as a list."""
127        try:
128            return [doc async for doc in self._docs(target_namespace=target_namespace)]
129        except yaml.YAMLError as err:
130            raise KustomizeException(
131                f"Unable to parse command output: {self._cmds}: {err}"
132            ) from err
133
134    def skip_resources(self, kinds: list[str]) -> "Kustomize":
135        """Skip resources kinds of the specified types."""
136        if not kinds:
137            return self
138        skip_re = "|".join(kinds)
139        return self.grep(f"kind=^({skip_re})$", invert=True)
140
141    def filter_resources(self, kinds: list[str]) -> "Kustomize":
142        """Skip resources kinds of the specified types."""
143        if not kinds:
144            return self
145        skip_re = "|".join(kinds)
146        return self.grep(f"kind=^({skip_re})$", invert=False)
147
148    async def stash(self) -> "Kustomize":
149        """Memoize the contents built so far for efficient reuse.
150
151        This is useful to serialize a chain of commands but allow further
152        chaining with multiple branches.
153        """
154        content = await self.run()
155        return Kustomize([Stash(content.encode("utf-8"))])

Library for issuing a kustomize command.

Kustomize(cmds: list[flux_local.command.Task])
73    def __init__(self, cmds: list[Task]) -> None:
74        """Initialize Kustomize, used internally for copying object."""
75        self._cmds = cmds

Initialize Kustomize, used internally for copying object.

def grep(self, expr: str, invert: bool = False) -> Kustomize:
77    def grep(self, expr: str, invert: bool = False) -> "Kustomize":
78        """Filter resources based on an expression.
79
80        Example expressions:
81          `kind=HelmRelease`
82          `metadata.name=redis`
83        """
84        out = [KUSTOMIZE_BIN, "cfg", "grep", expr]
85        if invert:
86            out.append("--invert-match")
87        return Kustomize(self._cmds + [Command(out, exc=KustomizeException)])

Filter resources based on an expression.

Example expressions: kind=HelmRelease metadata.name=redis

def grep_helm_release( self, helm_release: flux_local.manifest.HelmRelease | None = None, invert: bool = False) -> Kustomize:
 89    def grep_helm_release(
 90        self, helm_release: manifest.HelmRelease | None = None, invert: bool = False
 91    ) -> "Kustomize":
 92        """Filter the resources based on the specified HelmRelease."""
 93        if helm_release:
 94            if invert:
 95                raise InputException(
 96                    "Must specify either helm_release or invert but not both"
 97                )
 98            return (
 99                self.grep(f"metadata.namespace=^{helm_release.namespace}$")
100                .grep(f"metadata.name=^{helm_release.name}$")
101                .grep(f"kind=^{HELM_RELEASE}$")
102            )
103        if invert:
104            return self.grep(f"kind=^{HELM_RELEASE}$", invert=True)
105        raise InputException("Must specify either helm_release or invert")

Filter the resources based on the specified HelmRelease.

async def run(self) -> str:
107    async def run(self) -> str:
108        """Run the kustomize command and return the output as a string."""
109        return await run_piped(self._cmds)

Run the kustomize command and return the output as a string.

async def objects(self, target_namespace: str | None = None) -> list[dict[str, typing.Any]]:
123    async def objects(
124        self, target_namespace: str | None = None
125    ) -> list[dict[str, Any]]:
126        """Run the kustomize command and return the result cluster objects as a list."""
127        try:
128            return [doc async for doc in self._docs(target_namespace=target_namespace)]
129        except yaml.YAMLError as err:
130            raise KustomizeException(
131                f"Unable to parse command output: {self._cmds}: {err}"
132            ) from err

Run the kustomize command and return the result cluster objects as a list.

def skip_resources(self, kinds: list[str]) -> Kustomize:
134    def skip_resources(self, kinds: list[str]) -> "Kustomize":
135        """Skip resources kinds of the specified types."""
136        if not kinds:
137            return self
138        skip_re = "|".join(kinds)
139        return self.grep(f"kind=^({skip_re})$", invert=True)

Skip resources kinds of the specified types.

def filter_resources(self, kinds: list[str]) -> Kustomize:
141    def filter_resources(self, kinds: list[str]) -> "Kustomize":
142        """Skip resources kinds of the specified types."""
143        if not kinds:
144            return self
145        skip_re = "|".join(kinds)
146        return self.grep(f"kind=^({skip_re})$", invert=False)

Skip resources kinds of the specified types.

async def stash(self) -> Kustomize:
148    async def stash(self) -> "Kustomize":
149        """Memoize the contents built so far for efficient reuse.
150
151        This is useful to serialize a chain of commands but allow further
152        chaining with multiple branches.
153        """
154        content = await self.run()
155        return Kustomize([Stash(content.encode("utf-8"))])

Memoize the contents built so far for efficient reuse.

This is useful to serialize a chain of commands but allow further chaining with multiple branches.