flux_local.git_repo

Library for operating on a local repo and building Manifests.

This will build a manifest.Manifest from a cluster repo. This follows the pattern of building kustomizations, then reading helm releases (though it will not evaluate the templates). The resulting Manifest contains all the necessary information to do basic checks on objects in the cluster (e.g. run templates from unit tests).

Example usage:

from flux_local import git_repo


selector = git_repo.Selector(
    helm_release=git_repo.MetadataSelector(
        namespace="podinfo"
    )
)
manifest = await git_repo.build_manifest(selector)
for cluster in manifest:
    print(f"Found cluster: {cluster.path}")
    for kustomization in cluster.kustomizations:
        print(f"Found kustomization: {kustomization.path}")
        for release in kustomization.helm_releases:
            print(f"Found helm release: {release.release_name}")
  1"""Library for operating on a local repo and building Manifests.
  2
  3This will build a `manifest.Manifest` from a cluster repo. This follows the
  4pattern of building kustomizations, then reading helm releases (though it will
  5not evaluate the templates). The resulting `Manifest` contains all the necessary
  6information to do basic checks on objects in the cluster (e.g. run templates
  7from unit tests).
  8
  9Example usage:
 10
 11```python
 12from flux_local import git_repo
 13
 14
 15selector = git_repo.Selector(
 16    helm_release=git_repo.MetadataSelector(
 17        namespace="podinfo"
 18    )
 19)
 20manifest = await git_repo.build_manifest(selector)
 21for cluster in manifest:
 22    print(f"Found cluster: {cluster.path}")
 23    for kustomization in cluster.kustomizations:
 24        print(f"Found kustomization: {kustomization.path}")
 25        for release in kustomization.helm_releases:
 26            print(f"Found helm release: {release.release_name}")
 27```
 28
 29"""
 30
 31import asyncio
 32import contextlib
 33from dataclasses import dataclass, field
 34import logging
 35import os
 36import tempfile
 37from collections import deque
 38from collections.abc import Callable, Awaitable
 39from functools import cache
 40from pathlib import Path
 41from typing import Any, Generator
 42
 43import git
 44
 45from aiofiles.ospath import isdir
 46from . import kustomize, values
 47from .exceptions import FluxException, KustomizePathException
 48from .manifest import (
 49    CRD_KIND,
 50    FLUXTOMIZE_DOMAIN,
 51    KUSTOMIZE_DOMAIN,
 52    Cluster,
 53    HelmRelease,
 54    HelmRepository,
 55    Kustomization,
 56    Manifest,
 57    ConfigMap,
 58    Secret,
 59    SECRET_KIND,
 60    CONFIG_MAP_KIND,
 61    OCIRepository,
 62    HELM_REPO_KIND,
 63)
 64from .exceptions import InputException
 65from .context import trace_context
 66
 67__all__ = [
 68    "build_manifest",
 69    "ResourceSelector",
 70    "PathSelector",
 71    "MetadataSelector",
 72    "Options",
 73]
 74
 75_LOGGER = logging.getLogger(__name__)
 76
 77CLUSTER_KUSTOMIZE_KIND = "Kustomization"
 78HELM_RELEASE_KIND = "HelmRelease"
 79GIT_REPO_KIND = "GitRepository"
 80OCI_REPO_KIND = "OCIRepository"
 81DEFAULT_NAMESPACE = "flux-system"
 82DEFAULT_NAME = "flux-system"
 83GREP_SOURCE_REF_KIND = f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}"
 84ERROR_DETAIL_BAD_PATH = "Try specifying another path within the git repo?"
 85ERROR_DETAIL_BAD_KS = "Is a Kustomization pointing to a path that does not exist?"
 86
 87
 88@dataclass
 89class Source:
 90    """A source is a named mapping from a k8s object name to a path in the git repo.
 91
 92    This is needed to map the location within a reop if it's not the root. For example,
 93    you may have a `GitRepository` that is relative to `/` and all if of the
 94    `Kustomization`s inside may reference paths within it e.g. `/k8s/namespaces/`. But
 95    you may also have an `OCIRepository` that was built relative to `/k8s/` where the
 96    `Kustomization`s inside may reference the path relative to that like `/namespaces/`.
 97    """
 98
 99    name: str
100    """The name of the repository source."""
101
102    root: Path | None
103    """The path name within the repo root."""
104
105    namespace: str | None
106    """The namespace of the repository source."""
107
108    @property
109    def source_name(self) -> str:
110        """Return the full name of the source."""
111        return f"{self.namespace}/{self.name}"
112
113    @classmethod
114    def from_str(self, value: str) -> "Source":
115        """Parse a Source from key=value string."""
116        root: Path | None = None
117        if "=" in value:
118            name, root_str = value.split("=")
119            root = Path(root_str)
120        else:
121            name = value
122        namespace: str | None = None
123        if "/" in name:
124            namespace, name = name.split("/")
125        return Source(name=name, root=root, namespace=namespace)
126
127
128@cache
129def git_repo(path: Path | None = None) -> git.repo.Repo:
130    """Return the local git repo path."""
131    try:
132        if path is None:
133            return git.repo.Repo(os.getcwd(), search_parent_directories=True)
134        return git.repo.Repo(str(path), search_parent_directories=True)
135    except git.GitError as err:
136        raise InputException(f"Unable to find input path {path}: {err}") from err
137
138
139@cache
140def repo_root(repo: git.repo.Repo | None = None) -> Path:
141    """Return the local git repo path."""
142    if repo is None:
143        repo = git_repo()
144    return Path(repo.git.rev_parse("--show-toplevel"))
145
146
147def domain_filter(version: str) -> Callable[[dict[str, Any]], bool]:
148    """Return a yaml doc filter for specified resource version."""
149
150    def func(doc: dict[str, Any]) -> bool:
151        if api_version := doc.get("apiVersion"):
152            if api_version.startswith(version):
153                return True
154        return False
155
156    return func
157
158
159FLUXTOMIZE_DOMAIN_FILTER = domain_filter(FLUXTOMIZE_DOMAIN)
160KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN)
161
162
163@dataclass
164class PathSelector:
165    """A pathlib.Path inside of a git repo."""
166
167    path: Path | None = None
168    """The path within a repo."""
169
170    sources: list[Source] | None = None
171    """A list of repository sources for building relative paths."""
172
173    @property
174    def repo(self) -> git.repo.Repo:
175        """Return the local git repo."""
176        return git_repo(self.path)
177
178    @property
179    def root(self) -> Path:
180        """Return the local git repo root."""
181        return repo_root(self.repo)
182
183    @property
184    def relative_path(self) -> Path:
185        """Return the relative path within the repo.
186
187        This is used to translate a relative path specified onto the command
188        line into a relative path in the repo. The path on the command line may
189        be relative to the current working directory, but we want to translate
190        it into a relative path in the repo.
191
192        This is also used when transposing this path on a worktree.
193        """
194        arg_path = self.path or Path(os.getcwd())
195        resolved_path = arg_path.resolve()
196        return resolved_path.relative_to(self.root.resolve())
197
198
199@dataclass
200class ResourceVisitor:
201    """Invoked when a resource is visited to the caller can intercept."""
202
203    func: Callable[
204        [
205            Path,
206            Kustomization | HelmRelease | HelmRepository | OCIRepository,
207            kustomize.Kustomize | None,
208        ],
209        Awaitable[None],
210    ]
211    """Function called with the resource and optional content.
212
213    The function arguments are:
214      - path: This is the cluster and kustomization paths needed to disambiguate
215        when there are multiple clusters in the repository.
216      - doc: The resource object (e.g. Kustomization, HelmRelease, etc)
217      - cmd: A Kustomize object that can produce the specified object. Only supported
218        for Kustomizations.
219    """
220
221
222@dataclass
223class DocumentVisitor:
224    """Invoked when a document is visited so the caller can intercept.
225
226    This is similar to a resource visitor, but it visits the unparsed documents
227    since they may not have explicit schemas in this project.
228    """
229
230    kinds: list[str]
231    """The resource kinds of documents to visit."""
232
233    func: Callable[[str, dict[str, Any]], None]
234    """Function called with the resource and optional content.
235
236    The function arguments are:
237      - parent: The namespaced name of the Fluxtomization or HelmRelease
238      - doc: The resource object (e.g. Pod, ConfigMap, HelmRelease, etc)
239    """
240
241
242@dataclass
243class MetadataSelector:
244    """A filter for objects to select from the cluster."""
245
246    enabled: bool = True
247    """If true, this selector may return objects."""
248
249    name: str | None = None
250    """Resources returned will match this name."""
251
252    namespace: str | None = None
253    """Resources returned will be from this namespace."""
254
255    label_selector: dict[str, str] | None = None
256    """Resources returned must have these labels."""
257
258    skip_crds: bool = True
259    """If false, CRDs may be processed, depending on the resource type."""
260
261    skip_secrets: bool = True
262    """If false, Secrets may be processed, depending on the resource type."""
263
264    skip_kinds: list[str] | None = None
265    """A list of potential CRDs to skip when emitting objects."""
266
267    visitor: ResourceVisitor | None = None
268    """Visitor for the specified object type that can be used for building."""
269
270    @property
271    def predicate(
272        self,
273    ) -> Callable[
274        [Kustomization | HelmRelease | HelmRepository | OCIRepository],
275        bool,
276    ]:
277        """A predicate that selects Kustomization objects."""
278
279        def predicate(
280            obj: Kustomization | HelmRelease | HelmRepository | OCIRepository,
281        ) -> bool:
282            if not self.enabled:
283                return False
284            if self.name and obj.name != self.name:
285                return False
286            if self.namespace and obj.namespace != self.namespace:
287                return False
288            if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)):
289                obj_labels = obj.labels or {}
290                for name, value in self.label_selector.items():
291                    _LOGGER.debug("Checking %s=%s", name, value)
292                    if (
293                        obj_value := obj_labels.get(name)
294                    ) is None or obj_value != value:
295                        _LOGGER.debug("mismatch v=%s", obj_value)
296                        return False
297            return True
298
299        return predicate
300
301
302def cluster_metadata_selector() -> MetadataSelector:
303    """Create a new MetadataSelector for Kustomizations."""
304    return MetadataSelector(name=DEFAULT_NAME, namespace=DEFAULT_NAMESPACE)
305
306
307def ks_metadata_selector() -> MetadataSelector:
308    """Create a new MetadataSelector for Kustomizations."""
309    return MetadataSelector(namespace=DEFAULT_NAMESPACE)
310
311
312@dataclass
313class Options:
314    """Options for the resource selector for building manifets."""
315
316    kustomize_flags: list[str] = field(default_factory=list)
317    skip_kustomize_path_validation: bool = False
318
319
320@dataclass
321class ResourceSelector:
322    """A filter for objects to select from the cluster.
323
324    This is invoked when iterating over objects in the cluster to decide which
325    resources should be inflated and returned, to avoid iterating over
326    unnecessary resources.
327    """
328
329    path: PathSelector = field(default_factory=PathSelector)
330    """Path to find a repo of local flux Kustomization objects"""
331
332    cluster: MetadataSelector = field(default_factory=cluster_metadata_selector)
333    """Cluster names to return."""
334
335    kustomization: MetadataSelector = field(default_factory=ks_metadata_selector)
336    """Kustomization names to return."""
337
338    helm_repo: MetadataSelector = field(default_factory=MetadataSelector)
339    """HelmRepository objects to return."""
340
341    helm_release: MetadataSelector = field(default_factory=MetadataSelector)
342    """HelmRelease objects to return."""
343
344    oci_repo: MetadataSelector = field(default_factory=MetadataSelector)
345    """OCIRepository objects to return."""
346
347    doc_visitor: DocumentVisitor | None = None
348    """Raw objects to visit."""
349
350
351def is_allowed_source(sources: list[Source]) -> Callable[[Kustomization], bool]:
352    """Return true if this Kustomization is from an allowed source."""
353
354    def _filter(doc: Kustomization) -> bool:
355        if not sources:
356            return True
357        for source in sources:
358            if source.name == doc.source_name and (
359                source.namespace is None or source.namespace == doc.source_namespace
360            ):
361                return True
362        return False
363
364    return _filter
365
366
367def adjust_ks_path(doc: Kustomization, selector: PathSelector) -> Path | None:
368    """Make adjustments to the Kustomizations path."""
369    if doc.source_kind == OCI_REPO_KIND or doc.source_kind == GIT_REPO_KIND:
370        for source in selector.sources or []:
371            if source.name == doc.source_name:
372                _LOGGER.debug(
373                    "Updated Source for %s %s: %s", doc.source_kind, doc.name, doc.path
374                )
375                if not source.root:
376                    _LOGGER.info(
377                        "%s source for %s has no root specified",
378                        doc.source_kind,
379                        doc.name,
380                    )
381                    break
382                return source.root / doc.path
383
384        # No match so if OCI we can't do anything. If Git we assume its the root
385        # of the repository.
386        if doc.source_kind == OCI_REPO_KIND:
387            _LOGGER.info(
388                "Unknown cluster source for %s %s: %s",
389                doc.source_kind,
390                doc.name,
391                doc.path,
392            )
393            return None
394
395    # Source path is relative to the search path. Update to have the
396    # full prefix relative to the root.
397    if not doc.path:
398        _LOGGER.debug("Assigning implicit path %s", selector.relative_path)
399        return selector.relative_path
400
401    path = Path(doc.path)
402    if path.is_absolute():
403        return path.relative_to("/")
404    return path
405
406
407class CachableBuilder:
408    """Wrapper around flux_build that caches contents."""
409
410    def __init__(self) -> None:
411        """Initialize CachableBuilder."""
412        self._cache: dict[str, kustomize.Kustomize] = {}
413
414    async def build(
415        self, kustomization: Kustomization, path: Path
416    ) -> kustomize.Kustomize:
417        key = f"{kustomization.namespaced_name} @ {path}"
418        if cmd := self._cache.get(key):
419            return cmd
420        cmd = kustomize.flux_build(kustomization, path)
421        cmd = await cmd.stash()
422        self._cache[key] = cmd
423        return cmd
424
425    def remove(self, kustomization: Kustomization) -> None:
426        """Remove the kustomization value from the cache."""
427        target_key = f"{kustomization.namespaced_name} @"
428        for key in list(self._cache.keys()):
429            if key.startswith(target_key):
430                _LOGGER.debug("Invalidated cache %s", key)
431                del self._cache[key]
432
433
434@dataclass
435class VisitResult:
436    """Result of visiting a kustomization."""
437
438    kustomizations: list[Kustomization]
439    config_maps: list[ConfigMap]
440    secrets: list[Secret]
441
442    def __post_init__(self) -> None:
443        """Validate the object"""
444        unique = {ks.namespaced_name for ks in self.kustomizations}
445        if len(unique) != len(self.kustomizations):
446            ks_names = [ks.namespaced_name for ks in self.kustomizations]
447            dupes = list(filter(lambda x: ks_names.count(x) > 1, ks_names))
448            raise FluxException(
449                f"Detected multiple Fluxtomizations with the same name: {dupes}. "
450                "This indicates either (1) an incorrect Kustomization which needs to be fixed "
451                "or (2) a multi-cluster setup which requires flux-local to run with a more strict --path."
452            )
453
454
455async def visit_kustomization(
456    selector: PathSelector,
457    builder: CachableBuilder,
458    path: Path,
459    visit_ks: Kustomization | None,
460    options: Options,
461) -> VisitResult:
462    """Visit a path and return a list of Kustomizations."""
463
464    _LOGGER.debug("Visiting path (%s) %s", selector.path, path)
465    label = visit_ks.namespaced_name if visit_ks else str(path)
466
467    kinds = [CLUSTER_KUSTOMIZE_KIND, CONFIG_MAP_KIND, SECRET_KIND]
468
469    with trace_context(f"Kustomization '{label}'"):
470        cmd: kustomize.Kustomize
471        if visit_ks is None:
472            cmd = kustomize.filter_resources(kinds, selector.root / path)
473        else:
474            if not await isdir(selector.root / path):
475                if options.skip_kustomize_path_validation:
476                    _LOGGER.debug(
477                        "Skipping Kustomization '%s' since path does not exist: %s",
478                        visit_ks.namespaced_name,
479                        selector.root / path,
480                    )
481                    return VisitResult(kustomizations=[], config_maps=[], secrets=[])
482                raise KustomizePathException(
483                    f"Kustomization '{visit_ks.namespaced_name}' path field '{visit_ks.path or ''}' is not a directory: {selector.root / path}"
484                )
485            cmd = await builder.build(visit_ks, selector.root / path)
486            cmd = cmd.filter_resources(kinds)
487        cmd = await cmd.stash()
488        ks_cmd = cmd.grep(GREP_SOURCE_REF_KIND)
489        cfg_cmd = cmd.filter_resources([CONFIG_MAP_KIND, SECRET_KIND])
490
491        try:
492            ks_docs = await ks_cmd.objects()
493            cfg_docs = await cfg_cmd.objects()
494        except KustomizePathException as err:
495            raise FluxException(err) from err
496        except FluxException as err:
497            if visit_ks is None:
498                raise FluxException(
499                    f"Error building Fluxtomization in '{selector.root}' "
500                    f"path '{path}': {ERROR_DETAIL_BAD_PATH} {err}"
501                ) from err
502            raise FluxException(
503                f"Error building Fluxtomization '{visit_ks.namespaced_name}' "
504                f"path '{path}': {ERROR_DETAIL_BAD_KS} {err}"
505            ) from err
506
507    return VisitResult(
508        kustomizations=list(
509            filter(
510                is_allowed_source(selector.sources or []),
511                [
512                    Kustomization.parse_doc(doc)
513                    for doc in filter(FLUXTOMIZE_DOMAIN_FILTER, ks_docs)
514                ],
515            )
516        ),
517        config_maps=[
518            ConfigMap.parse_doc(doc)
519            for doc in cfg_docs
520            if doc.get("kind") == CONFIG_MAP_KIND
521        ],
522        secrets=[
523            Secret.parse_doc(doc) for doc in cfg_docs if doc.get("kind") == SECRET_KIND
524        ],
525    )
526
527
528async def kustomization_traversal(
529    selector: PathSelector, builder: CachableBuilder, options: Options
530) -> list[Kustomization]:
531    """Search for kustomizations in the specified path."""
532
533    response_kustomizations: list[Kustomization] = []
534    visited_paths: set[Path] = set()  # Relative paths within the cluster
535    visited_ks: set[str] = set()
536
537    path_queue: deque[tuple[Path, Kustomization | None]] = deque()
538    path_queue.append((selector.relative_path, None))
539    cluster_config = values.cluster_config([], [])
540    while path_queue:
541        # Fully empty the queue, running all tasks in parallel
542        tasks = []
543        while path_queue:
544            (path, visit_ks) = path_queue.popleft()
545
546            if path in visited_paths:
547                _LOGGER.debug("Already visited %s", path)
548                continue
549            visited_paths.add(path)
550
551            tasks.append(
552                visit_kustomization(selector, builder, path, visit_ks, options)
553            )
554
555        # Find new kustomizations
556        kustomizations = []
557        for result in await asyncio.gather(*tasks):
558            cluster_config = values.merge_cluster_config(
559                cluster_config, result.secrets, result.config_maps
560            )
561            for ks in result.kustomizations:
562                if ks.namespaced_name in visited_ks:
563                    continue
564                kustomizations.append(ks)
565                visited_ks.add(ks.namespaced_name)
566        _LOGGER.debug("Found %s new Kustomizations", len(kustomizations))
567
568        # Queue up new paths to visit to find more kustomizations
569        for ks in kustomizations:
570            _LOGGER.debug(
571                "Kustomization '%s' sourceRef.kind '%s' of '%s'",
572                ks.name,
573                ks.source_kind,
574                ks.source_name,
575            )
576            if not (ks_path := adjust_ks_path(ks, selector)):
577                continue
578            ks.path = str(ks_path)
579            if ks.postbuild_substitute_from:
580                values.expand_postbuild_substitute_reference(
581                    ks,
582                    cluster_config,
583                )
584
585            path_queue.append((ks_path, ks))
586            response_kustomizations.append(ks)
587
588    response_kustomizations.sort(key=lambda x: (x.namespace, x.name))
589    return response_kustomizations
590
591
592def node_name(ks: Kustomization) -> str:
593    """Return a unique node name for the Kustomization.
594
595    This includes the path since it needs to be unique within the entire
596    repository since we support multi-cluster.
597    """
598    return f"{ks.namespaced_name} @ {ks.id_name}"
599
600
601async def build_kustomization(
602    kustomization: Kustomization,
603    cluster_path: Path,
604    selector: ResourceSelector,
605    kustomize_flags: list[str],
606    builder: CachableBuilder,
607) -> None:
608    """Build helm objects for the Kustomization and update state."""
609    root: Path = selector.path.root
610    kustomization_selector: MetadataSelector = selector.kustomization
611    helm_repo_selector: MetadataSelector = selector.helm_repo
612    oci_repo_selector: MetadataSelector = selector.oci_repo
613    helm_release_selector: MetadataSelector = selector.helm_release
614    if (
615        not kustomization_selector.enabled
616        and not helm_repo_selector.enabled
617        and not oci_repo_selector.visitor
618        and not helm_release_selector.enabled
619        and not selector.doc_visitor
620    ):
621        return
622
623    with trace_context(f"Build '{kustomization.namespaced_name}'"):
624        cmd = await builder.build(kustomization, root / kustomization.path)
625        skips = []
626        if kustomization_selector.skip_crds:
627            skips.append(CRD_KIND)
628        if kustomization_selector.skip_secrets:
629            skips.append(SECRET_KIND)
630        if kustomization_selector.skip_kinds:
631            skips.extend(kustomization_selector.skip_kinds)
632        cmd = cmd.skip_resources(skips)
633        try:
634            cmd = await cmd.stash()
635        except FluxException as err:
636            raise FluxException(
637                f"Error while building Kustomization "
638                f"'{kustomization.namespace}/{kustomization.name}' "
639                f"(path={kustomization.source_path}): {err}"
640            ) from err
641
642        if kustomization_selector.visitor:
643            await kustomization_selector.visitor.func(
644                Path(kustomization.path),
645                kustomization,
646                cmd,
647            )
648
649        kinds = []
650        # Needed for expanding postbuild substitutions and value references
651        kinds.append(CONFIG_MAP_KIND)
652        if helm_repo_selector.enabled:
653            kinds.append(HELM_REPO_KIND)
654        if oci_repo_selector.enabled:
655            kinds.append(OCI_REPO_KIND)
656        if helm_release_selector.enabled:
657            kinds.append(HELM_RELEASE_KIND)
658            # Needed for expanding value references
659            kinds.append(SECRET_KIND)
660        if selector.doc_visitor:
661            kinds.extend(selector.doc_visitor.kinds)
662        if not kinds:
663            return
664
665        docs = await cmd.filter_resources(kinds).objects(
666            target_namespace=kustomization.target_namespace
667        )
668
669        if selector.doc_visitor:
670            doc_kinds = set(selector.doc_visitor.kinds)
671            for doc in docs:
672                if doc.get("kind") not in doc_kinds:
673                    continue
674                selector.doc_visitor.func(kustomization.namespaced_name, doc)
675
676        kustomization.helm_repos = list(
677            filter(
678                helm_repo_selector.predicate,
679                [
680                    HelmRepository.parse_doc(doc)
681                    for doc in docs
682                    if doc.get("kind") == HELM_REPO_KIND
683                ],
684            )
685        )
686        kustomization.oci_repos = list(
687            filter(
688                oci_repo_selector.predicate,
689                [
690                    OCIRepository.parse_doc(doc)
691                    for doc in docs
692                    if doc.get("kind") == OCI_REPO_KIND
693                ],
694            )
695        )
696        kustomization.helm_releases = list(
697            filter(
698                helm_release_selector.predicate,
699                [
700                    HelmRelease.parse_doc(doc)
701                    for doc in docs
702                    if doc.get("kind") == HELM_RELEASE_KIND
703                ],
704            )
705        )
706        kustomization.config_maps = [
707            ConfigMap.parse_doc(doc)
708            for doc in docs
709            if doc.get("kind") == CONFIG_MAP_KIND
710        ]
711        kustomization.secrets = [
712            Secret.parse_doc(doc) for doc in docs if doc.get("kind") == SECRET_KIND
713        ]
714
715
716def _ready_kustomizations(
717    kustomizations: list[Kustomization], visited: set[str]
718) -> tuple[list[Kustomization], list[Kustomization]]:
719    """Split the kustomizations into those that are ready vs pending."""
720    ready = []
721    pending = []
722    for kustomization in kustomizations:
723        if not_ready := (set(kustomization.depends_on or {}) - visited):
724            _LOGGER.debug(
725                "Kustomization %s waiting for %s",
726                kustomization.namespaced_name,
727                not_ready,
728            )
729            pending.append(kustomization)
730        else:
731            ready.append(kustomization)
732    return (ready, pending)
733
734
735async def build_manifest(
736    path: Path | None = None,
737    selector: ResourceSelector = ResourceSelector(),
738    options: Options = Options(),
739) -> Manifest:
740    """Build a Manifest object from the local cluster.
741
742    This will locate all Kustomizations that represent clusters, then find all
743    the Kustomizations within that cluster, as well as all relevant Helm
744    resources.
745
746    The path input parameter is deprecated. Use the PathSelector in `selector` instead.
747    """
748    if path:
749        selector.path = PathSelector(path=path)
750
751    _LOGGER.debug("Processing cluster with selector %s", selector)
752    if not selector.cluster.enabled:
753        return Manifest(clusters=[])
754
755    builder = CachableBuilder()
756
757    with trace_context(f"Cluster '{str(selector.path.path)}'"):
758        results = await kustomization_traversal(selector.path, builder, options)
759        clusters = [
760            Cluster(
761                path=str(selector.path.relative_path),
762                kustomizations=[
763                    ks for ks in results if selector.kustomization.predicate(ks)
764                ],
765            )
766        ]
767
768        async def update_kustomization(cluster: Cluster) -> None:
769            queue = [*cluster.kustomizations]
770            visited: set[str] = set()
771            while queue:
772                build_tasks = []
773                (ready, pending) = _ready_kustomizations(queue, visited)
774                for kustomization in ready:
775                    _LOGGER.debug(
776                        "Processing kustomization '%s': %s",
777                        kustomization.name,
778                        kustomization.path,
779                    )
780                    if not await isdir(selector.path.root / kustomization.path):
781                        if options.skip_kustomize_path_validation:
782                            _LOGGER.debug(
783                                "Skipping Kustomization '%s' since path does not exist: %s",
784                                kustomization.namespaced_name,
785                                selector.path.root / kustomization.path,
786                            )
787                            continue
788
789                    if kustomization.postbuild_substitute_from:
790                        values.expand_postbuild_substitute_reference(
791                            kustomization,
792                            values.ks_cluster_config(cluster.kustomizations),
793                        )
794                        # Clear the cache to remove any previous builds that are
795                        # missing the postbuild substitutions.
796                        builder.remove(kustomization)
797
798                    build_tasks.append(
799                        build_kustomization(
800                            kustomization,
801                            Path(cluster.path),
802                            selector,
803                            options.kustomize_flags,
804                            builder,
805                        )
806                    )
807                if not build_tasks:
808                    raise FluxException(
809                        "Internal error: Unexpected loop without build tasks"
810                    )
811                await asyncio.gather(*build_tasks)
812                visited.update([ks.namespaced_name for ks in ready])
813                queue = pending
814
815        # Validate all Kustomizations have valid dependsOn attributes since later
816        # we'll be using them to order processing.
817        for cluster in clusters:
818            all_ks = set([ks.namespaced_name for ks in cluster.kustomizations])
819            for ks in cluster.kustomizations:
820                ks.validate_depends_on(all_ks)
821
822        kustomization_tasks = []
823        # Expand and visit Kustomizations
824        for cluster in clusters:
825            kustomization_tasks.append(update_kustomization(cluster))
826        await asyncio.gather(*kustomization_tasks)
827
828        # Handle any HelmRelease value references
829        for cluster in clusters:
830            for kustomization in cluster.kustomizations:
831                kustomization.helm_releases = [
832                    values.expand_value_references(helm_release, kustomization)
833                    for helm_release in kustomization.helm_releases
834                ]
835
836        # Visit Helm resources
837        for cluster in clusters:
838            if selector.helm_repo.visitor:
839                for kustomization in cluster.kustomizations:
840                    for helm_repo in kustomization.helm_repos:
841                        await selector.helm_repo.visitor.func(
842                            Path(kustomization.path),
843                            helm_repo,
844                            None,
845                        )
846
847            if selector.oci_repo.visitor:
848                for kustomization in cluster.kustomizations:
849                    for oci_repo in kustomization.oci_repos:
850                        await selector.oci_repo.visitor.func(
851                            Path(kustomization.path),
852                            oci_repo,
853                            None,
854                        )
855
856            if selector.helm_release.visitor:
857                for kustomization in cluster.kustomizations:
858                    for helm_release in kustomization.helm_releases:
859                        await selector.helm_release.visitor.func(
860                            Path(kustomization.path),
861                            helm_release,
862                            None,
863                        )
864
865        return Manifest(clusters=clusters)
866
867
868@contextlib.contextmanager
869def create_worktree(
870    repo: git.repo.Repo, existing_branch: str | None = None
871) -> Generator[Path, None, None]:
872    """Create a ContextManager for a new git worktree in the current repo.
873
874    This is used to get a fork of the current repo without any local changes
875    in order to produce a diff.
876    Specifying existing_branch allows  to compare the current state with the existing branch.
877    """
878    orig = os.getcwd()
879    with tempfile.TemporaryDirectory() as tmp_dir:
880        _LOGGER.debug("Creating worktree in %s", tmp_dir)
881        if existing_branch is None:
882            # Add --detach to avoid creating a branch since we will not make modifications
883            repo.git.worktree("add", "--detach", str(tmp_dir))
884        else:
885            repo.git.worktree("add", str(tmp_dir), existing_branch)
886        os.chdir(tmp_dir)
887        yield Path(tmp_dir)
888    _LOGGER.debug("Restoring to %s", orig)
889    # The temp directory should now be removed and this prunes the worktree
890    repo.git.worktree("prune")
891    os.chdir(orig)
async def build_manifest( path: pathlib._local.Path | None = None, selector: ResourceSelector = ResourceSelector(path=PathSelector(path=None, sources=None), cluster=MetadataSelector(enabled=True, name='flux-system', namespace='flux-system', label_selector=None, skip_crds=True, skip_secrets=True, skip_kinds=None, visitor=None), kustomization=MetadataSelector(enabled=True, name=None, namespace='flux-system', label_selector=None, skip_crds=True, skip_secrets=True, skip_kinds=None, visitor=None), helm_repo=MetadataSelector(enabled=True, name=None, namespace=None, label_selector=None, skip_crds=True, skip_secrets=True, skip_kinds=None, visitor=None), helm_release=MetadataSelector(enabled=True, name=None, namespace=None, label_selector=None, skip_crds=True, skip_secrets=True, skip_kinds=None, visitor=None), oci_repo=MetadataSelector(enabled=True, name=None, namespace=None, label_selector=None, skip_crds=True, skip_secrets=True, skip_kinds=None, visitor=None), doc_visitor=None), options: Options = Options(kustomize_flags=[], skip_kustomize_path_validation=False)) -> flux_local.manifest.Manifest:
736async def build_manifest(
737    path: Path | None = None,
738    selector: ResourceSelector = ResourceSelector(),
739    options: Options = Options(),
740) -> Manifest:
741    """Build a Manifest object from the local cluster.
742
743    This will locate all Kustomizations that represent clusters, then find all
744    the Kustomizations within that cluster, as well as all relevant Helm
745    resources.
746
747    The path input parameter is deprecated. Use the PathSelector in `selector` instead.
748    """
749    if path:
750        selector.path = PathSelector(path=path)
751
752    _LOGGER.debug("Processing cluster with selector %s", selector)
753    if not selector.cluster.enabled:
754        return Manifest(clusters=[])
755
756    builder = CachableBuilder()
757
758    with trace_context(f"Cluster '{str(selector.path.path)}'"):
759        results = await kustomization_traversal(selector.path, builder, options)
760        clusters = [
761            Cluster(
762                path=str(selector.path.relative_path),
763                kustomizations=[
764                    ks for ks in results if selector.kustomization.predicate(ks)
765                ],
766            )
767        ]
768
769        async def update_kustomization(cluster: Cluster) -> None:
770            queue = [*cluster.kustomizations]
771            visited: set[str] = set()
772            while queue:
773                build_tasks = []
774                (ready, pending) = _ready_kustomizations(queue, visited)
775                for kustomization in ready:
776                    _LOGGER.debug(
777                        "Processing kustomization '%s': %s",
778                        kustomization.name,
779                        kustomization.path,
780                    )
781                    if not await isdir(selector.path.root / kustomization.path):
782                        if options.skip_kustomize_path_validation:
783                            _LOGGER.debug(
784                                "Skipping Kustomization '%s' since path does not exist: %s",
785                                kustomization.namespaced_name,
786                                selector.path.root / kustomization.path,
787                            )
788                            continue
789
790                    if kustomization.postbuild_substitute_from:
791                        values.expand_postbuild_substitute_reference(
792                            kustomization,
793                            values.ks_cluster_config(cluster.kustomizations),
794                        )
795                        # Clear the cache to remove any previous builds that are
796                        # missing the postbuild substitutions.
797                        builder.remove(kustomization)
798
799                    build_tasks.append(
800                        build_kustomization(
801                            kustomization,
802                            Path(cluster.path),
803                            selector,
804                            options.kustomize_flags,
805                            builder,
806                        )
807                    )
808                if not build_tasks:
809                    raise FluxException(
810                        "Internal error: Unexpected loop without build tasks"
811                    )
812                await asyncio.gather(*build_tasks)
813                visited.update([ks.namespaced_name for ks in ready])
814                queue = pending
815
816        # Validate all Kustomizations have valid dependsOn attributes since later
817        # we'll be using them to order processing.
818        for cluster in clusters:
819            all_ks = set([ks.namespaced_name for ks in cluster.kustomizations])
820            for ks in cluster.kustomizations:
821                ks.validate_depends_on(all_ks)
822
823        kustomization_tasks = []
824        # Expand and visit Kustomizations
825        for cluster in clusters:
826            kustomization_tasks.append(update_kustomization(cluster))
827        await asyncio.gather(*kustomization_tasks)
828
829        # Handle any HelmRelease value references
830        for cluster in clusters:
831            for kustomization in cluster.kustomizations:
832                kustomization.helm_releases = [
833                    values.expand_value_references(helm_release, kustomization)
834                    for helm_release in kustomization.helm_releases
835                ]
836
837        # Visit Helm resources
838        for cluster in clusters:
839            if selector.helm_repo.visitor:
840                for kustomization in cluster.kustomizations:
841                    for helm_repo in kustomization.helm_repos:
842                        await selector.helm_repo.visitor.func(
843                            Path(kustomization.path),
844                            helm_repo,
845                            None,
846                        )
847
848            if selector.oci_repo.visitor:
849                for kustomization in cluster.kustomizations:
850                    for oci_repo in kustomization.oci_repos:
851                        await selector.oci_repo.visitor.func(
852                            Path(kustomization.path),
853                            oci_repo,
854                            None,
855                        )
856
857            if selector.helm_release.visitor:
858                for kustomization in cluster.kustomizations:
859                    for helm_release in kustomization.helm_releases:
860                        await selector.helm_release.visitor.func(
861                            Path(kustomization.path),
862                            helm_release,
863                            None,
864                        )
865
866        return Manifest(clusters=clusters)

Build a Manifest object from the local cluster.

This will locate all Kustomizations that represent clusters, then find all the Kustomizations within that cluster, as well as all relevant Helm resources.

The path input parameter is deprecated. Use the PathSelector in selector instead.

@dataclass
class ResourceSelector:
321@dataclass
322class ResourceSelector:
323    """A filter for objects to select from the cluster.
324
325    This is invoked when iterating over objects in the cluster to decide which
326    resources should be inflated and returned, to avoid iterating over
327    unnecessary resources.
328    """
329
330    path: PathSelector = field(default_factory=PathSelector)
331    """Path to find a repo of local flux Kustomization objects"""
332
333    cluster: MetadataSelector = field(default_factory=cluster_metadata_selector)
334    """Cluster names to return."""
335
336    kustomization: MetadataSelector = field(default_factory=ks_metadata_selector)
337    """Kustomization names to return."""
338
339    helm_repo: MetadataSelector = field(default_factory=MetadataSelector)
340    """HelmRepository objects to return."""
341
342    helm_release: MetadataSelector = field(default_factory=MetadataSelector)
343    """HelmRelease objects to return."""
344
345    oci_repo: MetadataSelector = field(default_factory=MetadataSelector)
346    """OCIRepository objects to return."""
347
348    doc_visitor: DocumentVisitor | None = None
349    """Raw objects to visit."""

A filter for objects to select from the cluster.

This is invoked when iterating over objects in the cluster to decide which resources should be inflated and returned, to avoid iterating over unnecessary resources.

ResourceSelector( path: PathSelector = <factory>, cluster: MetadataSelector = <factory>, kustomization: MetadataSelector = <factory>, helm_repo: MetadataSelector = <factory>, helm_release: MetadataSelector = <factory>, oci_repo: MetadataSelector = <factory>, doc_visitor: flux_local.git_repo.DocumentVisitor | None = None)
path: PathSelector

Path to find a repo of local flux Kustomization objects

cluster: MetadataSelector

Cluster names to return.

kustomization: MetadataSelector

Kustomization names to return.

helm_repo: MetadataSelector

HelmRepository objects to return.

helm_release: MetadataSelector

HelmRelease objects to return.

oci_repo: MetadataSelector

OCIRepository objects to return.

doc_visitor: flux_local.git_repo.DocumentVisitor | None = None

Raw objects to visit.

@dataclass
class PathSelector:
164@dataclass
165class PathSelector:
166    """A pathlib.Path inside of a git repo."""
167
168    path: Path | None = None
169    """The path within a repo."""
170
171    sources: list[Source] | None = None
172    """A list of repository sources for building relative paths."""
173
174    @property
175    def repo(self) -> git.repo.Repo:
176        """Return the local git repo."""
177        return git_repo(self.path)
178
179    @property
180    def root(self) -> Path:
181        """Return the local git repo root."""
182        return repo_root(self.repo)
183
184    @property
185    def relative_path(self) -> Path:
186        """Return the relative path within the repo.
187
188        This is used to translate a relative path specified onto the command
189        line into a relative path in the repo. The path on the command line may
190        be relative to the current working directory, but we want to translate
191        it into a relative path in the repo.
192
193        This is also used when transposing this path on a worktree.
194        """
195        arg_path = self.path or Path(os.getcwd())
196        resolved_path = arg_path.resolve()
197        return resolved_path.relative_to(self.root.resolve())

A pathlib.Path inside of a git repo.

PathSelector( path: pathlib._local.Path | None = None, sources: list[flux_local.git_repo.Source] | None = None)
path: pathlib._local.Path | None = None

The path within a repo.

sources: list[flux_local.git_repo.Source] | None = None

A list of repository sources for building relative paths.

repo: git.repo.base.Repo
174    @property
175    def repo(self) -> git.repo.Repo:
176        """Return the local git repo."""
177        return git_repo(self.path)

Return the local git repo.

root: pathlib._local.Path
179    @property
180    def root(self) -> Path:
181        """Return the local git repo root."""
182        return repo_root(self.repo)

Return the local git repo root.

relative_path: pathlib._local.Path
184    @property
185    def relative_path(self) -> Path:
186        """Return the relative path within the repo.
187
188        This is used to translate a relative path specified onto the command
189        line into a relative path in the repo. The path on the command line may
190        be relative to the current working directory, but we want to translate
191        it into a relative path in the repo.
192
193        This is also used when transposing this path on a worktree.
194        """
195        arg_path = self.path or Path(os.getcwd())
196        resolved_path = arg_path.resolve()
197        return resolved_path.relative_to(self.root.resolve())

Return the relative path within the repo.

This is used to translate a relative path specified onto the command line into a relative path in the repo. The path on the command line may be relative to the current working directory, but we want to translate it into a relative path in the repo.

This is also used when transposing this path on a worktree.

@dataclass
class MetadataSelector:
243@dataclass
244class MetadataSelector:
245    """A filter for objects to select from the cluster."""
246
247    enabled: bool = True
248    """If true, this selector may return objects."""
249
250    name: str | None = None
251    """Resources returned will match this name."""
252
253    namespace: str | None = None
254    """Resources returned will be from this namespace."""
255
256    label_selector: dict[str, str] | None = None
257    """Resources returned must have these labels."""
258
259    skip_crds: bool = True
260    """If false, CRDs may be processed, depending on the resource type."""
261
262    skip_secrets: bool = True
263    """If false, Secrets may be processed, depending on the resource type."""
264
265    skip_kinds: list[str] | None = None
266    """A list of potential CRDs to skip when emitting objects."""
267
268    visitor: ResourceVisitor | None = None
269    """Visitor for the specified object type that can be used for building."""
270
271    @property
272    def predicate(
273        self,
274    ) -> Callable[
275        [Kustomization | HelmRelease | HelmRepository | OCIRepository],
276        bool,
277    ]:
278        """A predicate that selects Kustomization objects."""
279
280        def predicate(
281            obj: Kustomization | HelmRelease | HelmRepository | OCIRepository,
282        ) -> bool:
283            if not self.enabled:
284                return False
285            if self.name and obj.name != self.name:
286                return False
287            if self.namespace and obj.namespace != self.namespace:
288                return False
289            if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)):
290                obj_labels = obj.labels or {}
291                for name, value in self.label_selector.items():
292                    _LOGGER.debug("Checking %s=%s", name, value)
293                    if (
294                        obj_value := obj_labels.get(name)
295                    ) is None or obj_value != value:
296                        _LOGGER.debug("mismatch v=%s", obj_value)
297                        return False
298            return True
299
300        return predicate

A filter for objects to select from the cluster.

MetadataSelector( enabled: bool = True, name: str | None = None, namespace: str | None = None, label_selector: dict[str, str] | None = None, skip_crds: bool = True, skip_secrets: bool = True, skip_kinds: list[str] | None = None, visitor: flux_local.git_repo.ResourceVisitor | None = None)
enabled: bool = True

If true, this selector may return objects.

name: str | None = None

Resources returned will match this name.

namespace: str | None = None

Resources returned will be from this namespace.

label_selector: dict[str, str] | None = None

Resources returned must have these labels.

skip_crds: bool = True

If false, CRDs may be processed, depending on the resource type.

skip_secrets: bool = True

If false, Secrets may be processed, depending on the resource type.

skip_kinds: list[str] | None = None

A list of potential CRDs to skip when emitting objects.

visitor: flux_local.git_repo.ResourceVisitor | None = None

Visitor for the specified object type that can be used for building.

predicate: Callable[[flux_local.manifest.Kustomization | flux_local.manifest.HelmRelease | flux_local.manifest.HelmRepository | flux_local.manifest.OCIRepository], bool]
271    @property
272    def predicate(
273        self,
274    ) -> Callable[
275        [Kustomization | HelmRelease | HelmRepository | OCIRepository],
276        bool,
277    ]:
278        """A predicate that selects Kustomization objects."""
279
280        def predicate(
281            obj: Kustomization | HelmRelease | HelmRepository | OCIRepository,
282        ) -> bool:
283            if not self.enabled:
284                return False
285            if self.name and obj.name != self.name:
286                return False
287            if self.namespace and obj.namespace != self.namespace:
288                return False
289            if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)):
290                obj_labels = obj.labels or {}
291                for name, value in self.label_selector.items():
292                    _LOGGER.debug("Checking %s=%s", name, value)
293                    if (
294                        obj_value := obj_labels.get(name)
295                    ) is None or obj_value != value:
296                        _LOGGER.debug("mismatch v=%s", obj_value)
297                        return False
298            return True
299
300        return predicate

A predicate that selects Kustomization objects.

@dataclass
class Options:
313@dataclass
314class Options:
315    """Options for the resource selector for building manifets."""
316
317    kustomize_flags: list[str] = field(default_factory=list)
318    skip_kustomize_path_validation: bool = False

Options for the resource selector for building manifets.

Options( kustomize_flags: list[str] = <factory>, skip_kustomize_path_validation: bool = False)
kustomize_flags: list[str]
skip_kustomize_path_validation: bool = False