flux_local.git_repo
Library for operating on a local repo and building Manifests.
This will build a manifest.Manifest
from a cluster repo. This follows the
pattern of building kustomizations, then reading helm releases (though it will
not evaluate the templates). The resulting Manifest
contains all the necessary
information to do basic checks on objects in the cluster (e.g. run templates
from unit tests).
Example usage:
from flux_local import git_repo
selector = git_repo.Selector(
helm_release=git_repo.MetadataSelector(
namespace="podinfo"
)
)
manifest = await git_repo.build_manifest(selector)
for cluster in manifest:
print(f"Found cluster: {cluster.path}")
for kustomization in cluster.kustomizations:
print(f"Found kustomization: {kustomization.path}")
for release in kustomization.helm_releases:
print(f"Found helm release: {release.release_name}")
1"""Library for operating on a local repo and building Manifests. 2 3This will build a `manifest.Manifest` from a cluster repo. This follows the 4pattern of building kustomizations, then reading helm releases (though it will 5not evaluate the templates). The resulting `Manifest` contains all the necessary 6information to do basic checks on objects in the cluster (e.g. run templates 7from unit tests). 8 9Example usage: 10 11```python 12from flux_local import git_repo 13 14 15selector = git_repo.Selector( 16 helm_release=git_repo.MetadataSelector( 17 namespace="podinfo" 18 ) 19) 20manifest = await git_repo.build_manifest(selector) 21for cluster in manifest: 22 print(f"Found cluster: {cluster.path}") 23 for kustomization in cluster.kustomizations: 24 print(f"Found kustomization: {kustomization.path}") 25 for release in kustomization.helm_releases: 26 print(f"Found helm release: {release.release_name}") 27``` 28 29""" 30 31import asyncio 32import contextlib 33from dataclasses import dataclass, field 34import logging 35import os 36import tempfile 37from collections import deque 38from collections.abc import Callable, Awaitable 39from functools import cache 40from pathlib import Path 41from typing import Any, Generator 42 43import git 44 45from aiofiles.ospath import isdir 46from . import kustomize, values 47from .exceptions import FluxException, KustomizePathException 48from .manifest import ( 49 CRD_KIND, 50 FLUXTOMIZE_DOMAIN, 51 KUSTOMIZE_DOMAIN, 52 Cluster, 53 HelmRelease, 54 HelmRepository, 55 Kustomization, 56 Manifest, 57 ConfigMap, 58 Secret, 59 SECRET_KIND, 60 CONFIG_MAP_KIND, 61 OCIRepository, 62 HELM_REPO_KIND, 63) 64from .exceptions import InputException 65from .context import trace_context 66 67__all__ = [ 68 "build_manifest", 69 "ResourceSelector", 70 "PathSelector", 71 "MetadataSelector", 72 "Options", 73] 74 75_LOGGER = logging.getLogger(__name__) 76 77CLUSTER_KUSTOMIZE_KIND = "Kustomization" 78HELM_RELEASE_KIND = "HelmRelease" 79GIT_REPO_KIND = "GitRepository" 80OCI_REPO_KIND = "OCIRepository" 81DEFAULT_NAMESPACE = "flux-system" 82DEFAULT_NAME = "flux-system" 83GREP_SOURCE_REF_KIND = f"spec.sourceRef.kind={GIT_REPO_KIND}|{OCI_REPO_KIND}" 84ERROR_DETAIL_BAD_PATH = "Try specifying another path within the git repo?" 85ERROR_DETAIL_BAD_KS = "Is a Kustomization pointing to a path that does not exist?" 86 87 88@dataclass 89class Source: 90 """A source is a named mapping from a k8s object name to a path in the git repo. 91 92 This is needed to map the location within a reop if it's not the root. For example, 93 you may have a `GitRepository` that is relative to `/` and all if of the 94 `Kustomization`s inside may reference paths within it e.g. `/k8s/namespaces/`. But 95 you may also have an `OCIRepository` that was built relative to `/k8s/` where the 96 `Kustomization`s inside may reference the path relative to that like `/namespaces/`. 97 """ 98 99 name: str 100 """The name of the repository source.""" 101 102 root: Path | None 103 """The path name within the repo root.""" 104 105 namespace: str | None 106 """The namespace of the repository source.""" 107 108 @property 109 def source_name(self) -> str: 110 """Return the full name of the source.""" 111 return f"{self.namespace}/{self.name}" 112 113 @classmethod 114 def from_str(self, value: str) -> "Source": 115 """Parse a Source from key=value string.""" 116 root: Path | None = None 117 if "=" in value: 118 name, root_str = value.split("=") 119 root = Path(root_str) 120 else: 121 name = value 122 namespace: str | None = None 123 if "/" in name: 124 namespace, name = name.split("/") 125 return Source(name=name, root=root, namespace=namespace) 126 127 128@cache 129def git_repo(path: Path | None = None) -> git.repo.Repo: 130 """Return the local git repo path.""" 131 try: 132 if path is None: 133 return git.repo.Repo(os.getcwd(), search_parent_directories=True) 134 return git.repo.Repo(str(path), search_parent_directories=True) 135 except git.GitError as err: 136 raise InputException(f"Unable to find input path {path}: {err}") from err 137 138 139@cache 140def repo_root(repo: git.repo.Repo | None = None) -> Path: 141 """Return the local git repo path.""" 142 if repo is None: 143 repo = git_repo() 144 return Path(repo.git.rev_parse("--show-toplevel")) 145 146 147def domain_filter(version: str) -> Callable[[dict[str, Any]], bool]: 148 """Return a yaml doc filter for specified resource version.""" 149 150 def func(doc: dict[str, Any]) -> bool: 151 if api_version := doc.get("apiVersion"): 152 if api_version.startswith(version): 153 return True 154 return False 155 156 return func 157 158 159FLUXTOMIZE_DOMAIN_FILTER = domain_filter(FLUXTOMIZE_DOMAIN) 160KUSTOMIZE_DOMAIN_FILTER = domain_filter(KUSTOMIZE_DOMAIN) 161 162 163@dataclass 164class PathSelector: 165 """A pathlib.Path inside of a git repo.""" 166 167 path: Path | None = None 168 """The path within a repo.""" 169 170 sources: list[Source] | None = None 171 """A list of repository sources for building relative paths.""" 172 173 @property 174 def repo(self) -> git.repo.Repo: 175 """Return the local git repo.""" 176 return git_repo(self.path) 177 178 @property 179 def root(self) -> Path: 180 """Return the local git repo root.""" 181 return repo_root(self.repo) 182 183 @property 184 def relative_path(self) -> Path: 185 """Return the relative path within the repo. 186 187 This is used to translate a relative path specified onto the command 188 line into a relative path in the repo. The path on the command line may 189 be relative to the current working directory, but we want to translate 190 it into a relative path in the repo. 191 192 This is also used when transposing this path on a worktree. 193 """ 194 arg_path = self.path or Path(os.getcwd()) 195 resolved_path = arg_path.resolve() 196 return resolved_path.relative_to(self.root.resolve()) 197 198 199@dataclass 200class ResourceVisitor: 201 """Invoked when a resource is visited to the caller can intercept.""" 202 203 func: Callable[ 204 [ 205 Path, 206 Kustomization | HelmRelease | HelmRepository | OCIRepository, 207 kustomize.Kustomize | None, 208 ], 209 Awaitable[None], 210 ] 211 """Function called with the resource and optional content. 212 213 The function arguments are: 214 - path: This is the cluster and kustomization paths needed to disambiguate 215 when there are multiple clusters in the repository. 216 - doc: The resource object (e.g. Kustomization, HelmRelease, etc) 217 - cmd: A Kustomize object that can produce the specified object. Only supported 218 for Kustomizations. 219 """ 220 221 222@dataclass 223class DocumentVisitor: 224 """Invoked when a document is visited so the caller can intercept. 225 226 This is similar to a resource visitor, but it visits the unparsed documents 227 since they may not have explicit schemas in this project. 228 """ 229 230 kinds: list[str] 231 """The resource kinds of documents to visit.""" 232 233 func: Callable[[str, dict[str, Any]], None] 234 """Function called with the resource and optional content. 235 236 The function arguments are: 237 - parent: The namespaced name of the Fluxtomization or HelmRelease 238 - doc: The resource object (e.g. Pod, ConfigMap, HelmRelease, etc) 239 """ 240 241 242@dataclass 243class MetadataSelector: 244 """A filter for objects to select from the cluster.""" 245 246 enabled: bool = True 247 """If true, this selector may return objects.""" 248 249 name: str | None = None 250 """Resources returned will match this name.""" 251 252 namespace: str | None = None 253 """Resources returned will be from this namespace.""" 254 255 label_selector: dict[str, str] | None = None 256 """Resources returned must have these labels.""" 257 258 skip_crds: bool = True 259 """If false, CRDs may be processed, depending on the resource type.""" 260 261 skip_secrets: bool = True 262 """If false, Secrets may be processed, depending on the resource type.""" 263 264 skip_kinds: list[str] | None = None 265 """A list of potential CRDs to skip when emitting objects.""" 266 267 visitor: ResourceVisitor | None = None 268 """Visitor for the specified object type that can be used for building.""" 269 270 @property 271 def predicate( 272 self, 273 ) -> Callable[ 274 [Kustomization | HelmRelease | HelmRepository | OCIRepository], 275 bool, 276 ]: 277 """A predicate that selects Kustomization objects.""" 278 279 def predicate( 280 obj: Kustomization | HelmRelease | HelmRepository | OCIRepository, 281 ) -> bool: 282 if not self.enabled: 283 return False 284 if self.name and obj.name != self.name: 285 return False 286 if self.namespace and obj.namespace != self.namespace: 287 return False 288 if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)): 289 obj_labels = obj.labels or {} 290 for name, value in self.label_selector.items(): 291 _LOGGER.debug("Checking %s=%s", name, value) 292 if ( 293 obj_value := obj_labels.get(name) 294 ) is None or obj_value != value: 295 _LOGGER.debug("mismatch v=%s", obj_value) 296 return False 297 return True 298 299 return predicate 300 301 302def cluster_metadata_selector() -> MetadataSelector: 303 """Create a new MetadataSelector for Kustomizations.""" 304 return MetadataSelector(name=DEFAULT_NAME, namespace=DEFAULT_NAMESPACE) 305 306 307def ks_metadata_selector() -> MetadataSelector: 308 """Create a new MetadataSelector for Kustomizations.""" 309 return MetadataSelector(namespace=DEFAULT_NAMESPACE) 310 311 312@dataclass 313class Options: 314 """Options for the resource selector for building manifets.""" 315 316 kustomize_flags: list[str] = field(default_factory=list) 317 skip_kustomize_path_validation: bool = False 318 319 320@dataclass 321class ResourceSelector: 322 """A filter for objects to select from the cluster. 323 324 This is invoked when iterating over objects in the cluster to decide which 325 resources should be inflated and returned, to avoid iterating over 326 unnecessary resources. 327 """ 328 329 path: PathSelector = field(default_factory=PathSelector) 330 """Path to find a repo of local flux Kustomization objects""" 331 332 cluster: MetadataSelector = field(default_factory=cluster_metadata_selector) 333 """Cluster names to return.""" 334 335 kustomization: MetadataSelector = field(default_factory=ks_metadata_selector) 336 """Kustomization names to return.""" 337 338 helm_repo: MetadataSelector = field(default_factory=MetadataSelector) 339 """HelmRepository objects to return.""" 340 341 helm_release: MetadataSelector = field(default_factory=MetadataSelector) 342 """HelmRelease objects to return.""" 343 344 oci_repo: MetadataSelector = field(default_factory=MetadataSelector) 345 """OCIRepository objects to return.""" 346 347 doc_visitor: DocumentVisitor | None = None 348 """Raw objects to visit.""" 349 350 351def is_allowed_source(sources: list[Source]) -> Callable[[Kustomization], bool]: 352 """Return true if this Kustomization is from an allowed source.""" 353 354 def _filter(doc: Kustomization) -> bool: 355 if not sources: 356 return True 357 for source in sources: 358 if source.name == doc.source_name and ( 359 source.namespace is None or source.namespace == doc.source_namespace 360 ): 361 return True 362 return False 363 364 return _filter 365 366 367def adjust_ks_path(doc: Kustomization, selector: PathSelector) -> Path | None: 368 """Make adjustments to the Kustomizations path.""" 369 if doc.source_kind == OCI_REPO_KIND or doc.source_kind == GIT_REPO_KIND: 370 for source in selector.sources or []: 371 if source.name == doc.source_name: 372 _LOGGER.debug( 373 "Updated Source for %s %s: %s", doc.source_kind, doc.name, doc.path 374 ) 375 if not source.root: 376 _LOGGER.info( 377 "%s source for %s has no root specified", 378 doc.source_kind, 379 doc.name, 380 ) 381 break 382 return source.root / doc.path 383 384 # No match so if OCI we can't do anything. If Git we assume its the root 385 # of the repository. 386 if doc.source_kind == OCI_REPO_KIND: 387 _LOGGER.info( 388 "Unknown cluster source for %s %s: %s", 389 doc.source_kind, 390 doc.name, 391 doc.path, 392 ) 393 return None 394 395 # Source path is relative to the search path. Update to have the 396 # full prefix relative to the root. 397 if not doc.path: 398 _LOGGER.debug("Assigning implicit path %s", selector.relative_path) 399 return selector.relative_path 400 401 path = Path(doc.path) 402 if path.is_absolute(): 403 return path.relative_to("/") 404 return path 405 406 407class CachableBuilder: 408 """Wrapper around flux_build that caches contents.""" 409 410 def __init__(self) -> None: 411 """Initialize CachableBuilder.""" 412 self._cache: dict[str, kustomize.Kustomize] = {} 413 414 async def build( 415 self, kustomization: Kustomization, path: Path 416 ) -> kustomize.Kustomize: 417 key = f"{kustomization.namespaced_name} @ {path}" 418 if cmd := self._cache.get(key): 419 return cmd 420 cmd = kustomize.flux_build(kustomization, path) 421 cmd = await cmd.stash() 422 self._cache[key] = cmd 423 return cmd 424 425 def remove(self, kustomization: Kustomization) -> None: 426 """Remove the kustomization value from the cache.""" 427 target_key = f"{kustomization.namespaced_name} @" 428 for key in list(self._cache.keys()): 429 if key.startswith(target_key): 430 _LOGGER.debug("Invalidated cache %s", key) 431 del self._cache[key] 432 433 434@dataclass 435class VisitResult: 436 """Result of visiting a kustomization.""" 437 438 kustomizations: list[Kustomization] 439 config_maps: list[ConfigMap] 440 secrets: list[Secret] 441 442 def __post_init__(self) -> None: 443 """Validate the object""" 444 unique = {ks.namespaced_name for ks in self.kustomizations} 445 if len(unique) != len(self.kustomizations): 446 ks_names = [ks.namespaced_name for ks in self.kustomizations] 447 dupes = list(filter(lambda x: ks_names.count(x) > 1, ks_names)) 448 raise FluxException( 449 f"Detected multiple Fluxtomizations with the same name: {dupes}. " 450 "This indicates either (1) an incorrect Kustomization which needs to be fixed " 451 "or (2) a multi-cluster setup which requires flux-local to run with a more strict --path." 452 ) 453 454 455async def visit_kustomization( 456 selector: PathSelector, 457 builder: CachableBuilder, 458 path: Path, 459 visit_ks: Kustomization | None, 460 options: Options, 461) -> VisitResult: 462 """Visit a path and return a list of Kustomizations.""" 463 464 _LOGGER.debug("Visiting path (%s) %s", selector.path, path) 465 label = visit_ks.namespaced_name if visit_ks else str(path) 466 467 kinds = [CLUSTER_KUSTOMIZE_KIND, CONFIG_MAP_KIND, SECRET_KIND] 468 469 with trace_context(f"Kustomization '{label}'"): 470 cmd: kustomize.Kustomize 471 if visit_ks is None: 472 cmd = kustomize.filter_resources(kinds, selector.root / path) 473 else: 474 if not await isdir(selector.root / path): 475 if options.skip_kustomize_path_validation: 476 _LOGGER.debug( 477 "Skipping Kustomization '%s' since path does not exist: %s", 478 visit_ks.namespaced_name, 479 selector.root / path, 480 ) 481 return VisitResult(kustomizations=[], config_maps=[], secrets=[]) 482 raise KustomizePathException( 483 f"Kustomization '{visit_ks.namespaced_name}' path field '{visit_ks.path or ''}' is not a directory: {selector.root / path}" 484 ) 485 cmd = await builder.build(visit_ks, selector.root / path) 486 cmd = cmd.filter_resources(kinds) 487 cmd = await cmd.stash() 488 ks_cmd = cmd.grep(GREP_SOURCE_REF_KIND) 489 cfg_cmd = cmd.filter_resources([CONFIG_MAP_KIND, SECRET_KIND]) 490 491 try: 492 ks_docs = await ks_cmd.objects() 493 cfg_docs = await cfg_cmd.objects() 494 except KustomizePathException as err: 495 raise FluxException(err) from err 496 except FluxException as err: 497 if visit_ks is None: 498 raise FluxException( 499 f"Error building Fluxtomization in '{selector.root}' " 500 f"path '{path}': {ERROR_DETAIL_BAD_PATH} {err}" 501 ) from err 502 raise FluxException( 503 f"Error building Fluxtomization '{visit_ks.namespaced_name}' " 504 f"path '{path}': {ERROR_DETAIL_BAD_KS} {err}" 505 ) from err 506 507 return VisitResult( 508 kustomizations=list( 509 filter( 510 is_allowed_source(selector.sources or []), 511 [ 512 Kustomization.parse_doc(doc) 513 for doc in filter(FLUXTOMIZE_DOMAIN_FILTER, ks_docs) 514 ], 515 ) 516 ), 517 config_maps=[ 518 ConfigMap.parse_doc(doc) 519 for doc in cfg_docs 520 if doc.get("kind") == CONFIG_MAP_KIND 521 ], 522 secrets=[ 523 Secret.parse_doc(doc) for doc in cfg_docs if doc.get("kind") == SECRET_KIND 524 ], 525 ) 526 527 528async def kustomization_traversal( 529 selector: PathSelector, builder: CachableBuilder, options: Options 530) -> list[Kustomization]: 531 """Search for kustomizations in the specified path.""" 532 533 response_kustomizations: list[Kustomization] = [] 534 visited_paths: set[Path] = set() # Relative paths within the cluster 535 visited_ks: set[str] = set() 536 537 path_queue: deque[tuple[Path, Kustomization | None]] = deque() 538 path_queue.append((selector.relative_path, None)) 539 cluster_config = values.cluster_config([], []) 540 while path_queue: 541 # Fully empty the queue, running all tasks in parallel 542 tasks = [] 543 while path_queue: 544 (path, visit_ks) = path_queue.popleft() 545 546 if path in visited_paths: 547 _LOGGER.debug("Already visited %s", path) 548 continue 549 visited_paths.add(path) 550 551 tasks.append( 552 visit_kustomization(selector, builder, path, visit_ks, options) 553 ) 554 555 # Find new kustomizations 556 kustomizations = [] 557 for result in await asyncio.gather(*tasks): 558 cluster_config = values.merge_cluster_config( 559 cluster_config, result.secrets, result.config_maps 560 ) 561 for ks in result.kustomizations: 562 if ks.namespaced_name in visited_ks: 563 continue 564 kustomizations.append(ks) 565 visited_ks.add(ks.namespaced_name) 566 _LOGGER.debug("Found %s new Kustomizations", len(kustomizations)) 567 568 # Queue up new paths to visit to find more kustomizations 569 for ks in kustomizations: 570 _LOGGER.debug( 571 "Kustomization '%s' sourceRef.kind '%s' of '%s'", 572 ks.name, 573 ks.source_kind, 574 ks.source_name, 575 ) 576 if not (ks_path := adjust_ks_path(ks, selector)): 577 continue 578 ks.path = str(ks_path) 579 if ks.postbuild_substitute_from: 580 values.expand_postbuild_substitute_reference( 581 ks, 582 cluster_config, 583 ) 584 585 path_queue.append((ks_path, ks)) 586 response_kustomizations.append(ks) 587 588 response_kustomizations.sort(key=lambda x: (x.namespace, x.name)) 589 return response_kustomizations 590 591 592def node_name(ks: Kustomization) -> str: 593 """Return a unique node name for the Kustomization. 594 595 This includes the path since it needs to be unique within the entire 596 repository since we support multi-cluster. 597 """ 598 return f"{ks.namespaced_name} @ {ks.id_name}" 599 600 601async def build_kustomization( 602 kustomization: Kustomization, 603 cluster_path: Path, 604 selector: ResourceSelector, 605 kustomize_flags: list[str], 606 builder: CachableBuilder, 607) -> None: 608 """Build helm objects for the Kustomization and update state.""" 609 root: Path = selector.path.root 610 kustomization_selector: MetadataSelector = selector.kustomization 611 helm_repo_selector: MetadataSelector = selector.helm_repo 612 oci_repo_selector: MetadataSelector = selector.oci_repo 613 helm_release_selector: MetadataSelector = selector.helm_release 614 if ( 615 not kustomization_selector.enabled 616 and not helm_repo_selector.enabled 617 and not oci_repo_selector.visitor 618 and not helm_release_selector.enabled 619 and not selector.doc_visitor 620 ): 621 return 622 623 with trace_context(f"Build '{kustomization.namespaced_name}'"): 624 cmd = await builder.build(kustomization, root / kustomization.path) 625 skips = [] 626 if kustomization_selector.skip_crds: 627 skips.append(CRD_KIND) 628 if kustomization_selector.skip_secrets: 629 skips.append(SECRET_KIND) 630 if kustomization_selector.skip_kinds: 631 skips.extend(kustomization_selector.skip_kinds) 632 cmd = cmd.skip_resources(skips) 633 try: 634 cmd = await cmd.stash() 635 except FluxException as err: 636 raise FluxException( 637 f"Error while building Kustomization " 638 f"'{kustomization.namespace}/{kustomization.name}' " 639 f"(path={kustomization.source_path}): {err}" 640 ) from err 641 642 if kustomization_selector.visitor: 643 await kustomization_selector.visitor.func( 644 Path(kustomization.path), 645 kustomization, 646 cmd, 647 ) 648 649 kinds = [] 650 # Needed for expanding postbuild substitutions and value references 651 kinds.append(CONFIG_MAP_KIND) 652 if helm_repo_selector.enabled: 653 kinds.append(HELM_REPO_KIND) 654 if oci_repo_selector.enabled: 655 kinds.append(OCI_REPO_KIND) 656 if helm_release_selector.enabled: 657 kinds.append(HELM_RELEASE_KIND) 658 # Needed for expanding value references 659 kinds.append(SECRET_KIND) 660 if selector.doc_visitor: 661 kinds.extend(selector.doc_visitor.kinds) 662 if not kinds: 663 return 664 665 docs = await cmd.filter_resources(kinds).objects( 666 target_namespace=kustomization.target_namespace 667 ) 668 669 if selector.doc_visitor: 670 doc_kinds = set(selector.doc_visitor.kinds) 671 for doc in docs: 672 if doc.get("kind") not in doc_kinds: 673 continue 674 selector.doc_visitor.func(kustomization.namespaced_name, doc) 675 676 kustomization.helm_repos = list( 677 filter( 678 helm_repo_selector.predicate, 679 [ 680 HelmRepository.parse_doc(doc) 681 for doc in docs 682 if doc.get("kind") == HELM_REPO_KIND 683 ], 684 ) 685 ) 686 kustomization.oci_repos = list( 687 filter( 688 oci_repo_selector.predicate, 689 [ 690 OCIRepository.parse_doc(doc) 691 for doc in docs 692 if doc.get("kind") == OCI_REPO_KIND 693 ], 694 ) 695 ) 696 kustomization.helm_releases = list( 697 filter( 698 helm_release_selector.predicate, 699 [ 700 HelmRelease.parse_doc(doc) 701 for doc in docs 702 if doc.get("kind") == HELM_RELEASE_KIND 703 ], 704 ) 705 ) 706 kustomization.config_maps = [ 707 ConfigMap.parse_doc(doc) 708 for doc in docs 709 if doc.get("kind") == CONFIG_MAP_KIND 710 ] 711 kustomization.secrets = [ 712 Secret.parse_doc(doc) for doc in docs if doc.get("kind") == SECRET_KIND 713 ] 714 715 716def _ready_kustomizations( 717 kustomizations: list[Kustomization], visited: set[str] 718) -> tuple[list[Kustomization], list[Kustomization]]: 719 """Split the kustomizations into those that are ready vs pending.""" 720 ready = [] 721 pending = [] 722 for kustomization in kustomizations: 723 if not_ready := (set(kustomization.depends_on or {}) - visited): 724 _LOGGER.debug( 725 "Kustomization %s waiting for %s", 726 kustomization.namespaced_name, 727 not_ready, 728 ) 729 pending.append(kustomization) 730 else: 731 ready.append(kustomization) 732 return (ready, pending) 733 734 735async def build_manifest( 736 path: Path | None = None, 737 selector: ResourceSelector = ResourceSelector(), 738 options: Options = Options(), 739) -> Manifest: 740 """Build a Manifest object from the local cluster. 741 742 This will locate all Kustomizations that represent clusters, then find all 743 the Kustomizations within that cluster, as well as all relevant Helm 744 resources. 745 746 The path input parameter is deprecated. Use the PathSelector in `selector` instead. 747 """ 748 if path: 749 selector.path = PathSelector(path=path) 750 751 _LOGGER.debug("Processing cluster with selector %s", selector) 752 if not selector.cluster.enabled: 753 return Manifest(clusters=[]) 754 755 builder = CachableBuilder() 756 757 with trace_context(f"Cluster '{str(selector.path.path)}'"): 758 results = await kustomization_traversal(selector.path, builder, options) 759 clusters = [ 760 Cluster( 761 path=str(selector.path.relative_path), 762 kustomizations=[ 763 ks for ks in results if selector.kustomization.predicate(ks) 764 ], 765 ) 766 ] 767 768 async def update_kustomization(cluster: Cluster) -> None: 769 queue = [*cluster.kustomizations] 770 visited: set[str] = set() 771 while queue: 772 build_tasks = [] 773 (ready, pending) = _ready_kustomizations(queue, visited) 774 for kustomization in ready: 775 _LOGGER.debug( 776 "Processing kustomization '%s': %s", 777 kustomization.name, 778 kustomization.path, 779 ) 780 if not await isdir(selector.path.root / kustomization.path): 781 if options.skip_kustomize_path_validation: 782 _LOGGER.debug( 783 "Skipping Kustomization '%s' since path does not exist: %s", 784 kustomization.namespaced_name, 785 selector.path.root / kustomization.path, 786 ) 787 continue 788 789 if kustomization.postbuild_substitute_from: 790 values.expand_postbuild_substitute_reference( 791 kustomization, 792 values.ks_cluster_config(cluster.kustomizations), 793 ) 794 # Clear the cache to remove any previous builds that are 795 # missing the postbuild substitutions. 796 builder.remove(kustomization) 797 798 build_tasks.append( 799 build_kustomization( 800 kustomization, 801 Path(cluster.path), 802 selector, 803 options.kustomize_flags, 804 builder, 805 ) 806 ) 807 if not build_tasks: 808 raise FluxException( 809 "Internal error: Unexpected loop without build tasks" 810 ) 811 await asyncio.gather(*build_tasks) 812 visited.update([ks.namespaced_name for ks in ready]) 813 queue = pending 814 815 # Validate all Kustomizations have valid dependsOn attributes since later 816 # we'll be using them to order processing. 817 for cluster in clusters: 818 all_ks = set([ks.namespaced_name for ks in cluster.kustomizations]) 819 for ks in cluster.kustomizations: 820 ks.validate_depends_on(all_ks) 821 822 kustomization_tasks = [] 823 # Expand and visit Kustomizations 824 for cluster in clusters: 825 kustomization_tasks.append(update_kustomization(cluster)) 826 await asyncio.gather(*kustomization_tasks) 827 828 # Handle any HelmRelease value references 829 for cluster in clusters: 830 for kustomization in cluster.kustomizations: 831 kustomization.helm_releases = [ 832 values.expand_value_references(helm_release, kustomization) 833 for helm_release in kustomization.helm_releases 834 ] 835 836 # Visit Helm resources 837 for cluster in clusters: 838 if selector.helm_repo.visitor: 839 for kustomization in cluster.kustomizations: 840 for helm_repo in kustomization.helm_repos: 841 await selector.helm_repo.visitor.func( 842 Path(kustomization.path), 843 helm_repo, 844 None, 845 ) 846 847 if selector.oci_repo.visitor: 848 for kustomization in cluster.kustomizations: 849 for oci_repo in kustomization.oci_repos: 850 await selector.oci_repo.visitor.func( 851 Path(kustomization.path), 852 oci_repo, 853 None, 854 ) 855 856 if selector.helm_release.visitor: 857 for kustomization in cluster.kustomizations: 858 for helm_release in kustomization.helm_releases: 859 await selector.helm_release.visitor.func( 860 Path(kustomization.path), 861 helm_release, 862 None, 863 ) 864 865 return Manifest(clusters=clusters) 866 867 868@contextlib.contextmanager 869def create_worktree( 870 repo: git.repo.Repo, existing_branch: str | None = None 871) -> Generator[Path, None, None]: 872 """Create a ContextManager for a new git worktree in the current repo. 873 874 This is used to get a fork of the current repo without any local changes 875 in order to produce a diff. 876 Specifying existing_branch allows to compare the current state with the existing branch. 877 """ 878 orig = os.getcwd() 879 with tempfile.TemporaryDirectory() as tmp_dir: 880 _LOGGER.debug("Creating worktree in %s", tmp_dir) 881 if existing_branch is None: 882 # Add --detach to avoid creating a branch since we will not make modifications 883 repo.git.worktree("add", "--detach", str(tmp_dir)) 884 else: 885 repo.git.worktree("add", str(tmp_dir), existing_branch) 886 os.chdir(tmp_dir) 887 yield Path(tmp_dir) 888 _LOGGER.debug("Restoring to %s", orig) 889 # The temp directory should now be removed and this prunes the worktree 890 repo.git.worktree("prune") 891 os.chdir(orig)
736async def build_manifest( 737 path: Path | None = None, 738 selector: ResourceSelector = ResourceSelector(), 739 options: Options = Options(), 740) -> Manifest: 741 """Build a Manifest object from the local cluster. 742 743 This will locate all Kustomizations that represent clusters, then find all 744 the Kustomizations within that cluster, as well as all relevant Helm 745 resources. 746 747 The path input parameter is deprecated. Use the PathSelector in `selector` instead. 748 """ 749 if path: 750 selector.path = PathSelector(path=path) 751 752 _LOGGER.debug("Processing cluster with selector %s", selector) 753 if not selector.cluster.enabled: 754 return Manifest(clusters=[]) 755 756 builder = CachableBuilder() 757 758 with trace_context(f"Cluster '{str(selector.path.path)}'"): 759 results = await kustomization_traversal(selector.path, builder, options) 760 clusters = [ 761 Cluster( 762 path=str(selector.path.relative_path), 763 kustomizations=[ 764 ks for ks in results if selector.kustomization.predicate(ks) 765 ], 766 ) 767 ] 768 769 async def update_kustomization(cluster: Cluster) -> None: 770 queue = [*cluster.kustomizations] 771 visited: set[str] = set() 772 while queue: 773 build_tasks = [] 774 (ready, pending) = _ready_kustomizations(queue, visited) 775 for kustomization in ready: 776 _LOGGER.debug( 777 "Processing kustomization '%s': %s", 778 kustomization.name, 779 kustomization.path, 780 ) 781 if not await isdir(selector.path.root / kustomization.path): 782 if options.skip_kustomize_path_validation: 783 _LOGGER.debug( 784 "Skipping Kustomization '%s' since path does not exist: %s", 785 kustomization.namespaced_name, 786 selector.path.root / kustomization.path, 787 ) 788 continue 789 790 if kustomization.postbuild_substitute_from: 791 values.expand_postbuild_substitute_reference( 792 kustomization, 793 values.ks_cluster_config(cluster.kustomizations), 794 ) 795 # Clear the cache to remove any previous builds that are 796 # missing the postbuild substitutions. 797 builder.remove(kustomization) 798 799 build_tasks.append( 800 build_kustomization( 801 kustomization, 802 Path(cluster.path), 803 selector, 804 options.kustomize_flags, 805 builder, 806 ) 807 ) 808 if not build_tasks: 809 raise FluxException( 810 "Internal error: Unexpected loop without build tasks" 811 ) 812 await asyncio.gather(*build_tasks) 813 visited.update([ks.namespaced_name for ks in ready]) 814 queue = pending 815 816 # Validate all Kustomizations have valid dependsOn attributes since later 817 # we'll be using them to order processing. 818 for cluster in clusters: 819 all_ks = set([ks.namespaced_name for ks in cluster.kustomizations]) 820 for ks in cluster.kustomizations: 821 ks.validate_depends_on(all_ks) 822 823 kustomization_tasks = [] 824 # Expand and visit Kustomizations 825 for cluster in clusters: 826 kustomization_tasks.append(update_kustomization(cluster)) 827 await asyncio.gather(*kustomization_tasks) 828 829 # Handle any HelmRelease value references 830 for cluster in clusters: 831 for kustomization in cluster.kustomizations: 832 kustomization.helm_releases = [ 833 values.expand_value_references(helm_release, kustomization) 834 for helm_release in kustomization.helm_releases 835 ] 836 837 # Visit Helm resources 838 for cluster in clusters: 839 if selector.helm_repo.visitor: 840 for kustomization in cluster.kustomizations: 841 for helm_repo in kustomization.helm_repos: 842 await selector.helm_repo.visitor.func( 843 Path(kustomization.path), 844 helm_repo, 845 None, 846 ) 847 848 if selector.oci_repo.visitor: 849 for kustomization in cluster.kustomizations: 850 for oci_repo in kustomization.oci_repos: 851 await selector.oci_repo.visitor.func( 852 Path(kustomization.path), 853 oci_repo, 854 None, 855 ) 856 857 if selector.helm_release.visitor: 858 for kustomization in cluster.kustomizations: 859 for helm_release in kustomization.helm_releases: 860 await selector.helm_release.visitor.func( 861 Path(kustomization.path), 862 helm_release, 863 None, 864 ) 865 866 return Manifest(clusters=clusters)
Build a Manifest object from the local cluster.
This will locate all Kustomizations that represent clusters, then find all the Kustomizations within that cluster, as well as all relevant Helm resources.
The path input parameter is deprecated. Use the PathSelector in selector
instead.
321@dataclass 322class ResourceSelector: 323 """A filter for objects to select from the cluster. 324 325 This is invoked when iterating over objects in the cluster to decide which 326 resources should be inflated and returned, to avoid iterating over 327 unnecessary resources. 328 """ 329 330 path: PathSelector = field(default_factory=PathSelector) 331 """Path to find a repo of local flux Kustomization objects""" 332 333 cluster: MetadataSelector = field(default_factory=cluster_metadata_selector) 334 """Cluster names to return.""" 335 336 kustomization: MetadataSelector = field(default_factory=ks_metadata_selector) 337 """Kustomization names to return.""" 338 339 helm_repo: MetadataSelector = field(default_factory=MetadataSelector) 340 """HelmRepository objects to return.""" 341 342 helm_release: MetadataSelector = field(default_factory=MetadataSelector) 343 """HelmRelease objects to return.""" 344 345 oci_repo: MetadataSelector = field(default_factory=MetadataSelector) 346 """OCIRepository objects to return.""" 347 348 doc_visitor: DocumentVisitor | None = None 349 """Raw objects to visit."""
A filter for objects to select from the cluster.
This is invoked when iterating over objects in the cluster to decide which resources should be inflated and returned, to avoid iterating over unnecessary resources.
164@dataclass 165class PathSelector: 166 """A pathlib.Path inside of a git repo.""" 167 168 path: Path | None = None 169 """The path within a repo.""" 170 171 sources: list[Source] | None = None 172 """A list of repository sources for building relative paths.""" 173 174 @property 175 def repo(self) -> git.repo.Repo: 176 """Return the local git repo.""" 177 return git_repo(self.path) 178 179 @property 180 def root(self) -> Path: 181 """Return the local git repo root.""" 182 return repo_root(self.repo) 183 184 @property 185 def relative_path(self) -> Path: 186 """Return the relative path within the repo. 187 188 This is used to translate a relative path specified onto the command 189 line into a relative path in the repo. The path on the command line may 190 be relative to the current working directory, but we want to translate 191 it into a relative path in the repo. 192 193 This is also used when transposing this path on a worktree. 194 """ 195 arg_path = self.path or Path(os.getcwd()) 196 resolved_path = arg_path.resolve() 197 return resolved_path.relative_to(self.root.resolve())
A pathlib.Path inside of a git repo.
A list of repository sources for building relative paths.
174 @property 175 def repo(self) -> git.repo.Repo: 176 """Return the local git repo.""" 177 return git_repo(self.path)
Return the local git repo.
179 @property 180 def root(self) -> Path: 181 """Return the local git repo root.""" 182 return repo_root(self.repo)
Return the local git repo root.
184 @property 185 def relative_path(self) -> Path: 186 """Return the relative path within the repo. 187 188 This is used to translate a relative path specified onto the command 189 line into a relative path in the repo. The path on the command line may 190 be relative to the current working directory, but we want to translate 191 it into a relative path in the repo. 192 193 This is also used when transposing this path on a worktree. 194 """ 195 arg_path = self.path or Path(os.getcwd()) 196 resolved_path = arg_path.resolve() 197 return resolved_path.relative_to(self.root.resolve())
Return the relative path within the repo.
This is used to translate a relative path specified onto the command line into a relative path in the repo. The path on the command line may be relative to the current working directory, but we want to translate it into a relative path in the repo.
This is also used when transposing this path on a worktree.
243@dataclass 244class MetadataSelector: 245 """A filter for objects to select from the cluster.""" 246 247 enabled: bool = True 248 """If true, this selector may return objects.""" 249 250 name: str | None = None 251 """Resources returned will match this name.""" 252 253 namespace: str | None = None 254 """Resources returned will be from this namespace.""" 255 256 label_selector: dict[str, str] | None = None 257 """Resources returned must have these labels.""" 258 259 skip_crds: bool = True 260 """If false, CRDs may be processed, depending on the resource type.""" 261 262 skip_secrets: bool = True 263 """If false, Secrets may be processed, depending on the resource type.""" 264 265 skip_kinds: list[str] | None = None 266 """A list of potential CRDs to skip when emitting objects.""" 267 268 visitor: ResourceVisitor | None = None 269 """Visitor for the specified object type that can be used for building.""" 270 271 @property 272 def predicate( 273 self, 274 ) -> Callable[ 275 [Kustomization | HelmRelease | HelmRepository | OCIRepository], 276 bool, 277 ]: 278 """A predicate that selects Kustomization objects.""" 279 280 def predicate( 281 obj: Kustomization | HelmRelease | HelmRepository | OCIRepository, 282 ) -> bool: 283 if not self.enabled: 284 return False 285 if self.name and obj.name != self.name: 286 return False 287 if self.namespace and obj.namespace != self.namespace: 288 return False 289 if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)): 290 obj_labels = obj.labels or {} 291 for name, value in self.label_selector.items(): 292 _LOGGER.debug("Checking %s=%s", name, value) 293 if ( 294 obj_value := obj_labels.get(name) 295 ) is None or obj_value != value: 296 _LOGGER.debug("mismatch v=%s", obj_value) 297 return False 298 return True 299 300 return predicate
A filter for objects to select from the cluster.
Visitor for the specified object type that can be used for building.
271 @property 272 def predicate( 273 self, 274 ) -> Callable[ 275 [Kustomization | HelmRelease | HelmRepository | OCIRepository], 276 bool, 277 ]: 278 """A predicate that selects Kustomization objects.""" 279 280 def predicate( 281 obj: Kustomization | HelmRelease | HelmRepository | OCIRepository, 282 ) -> bool: 283 if not self.enabled: 284 return False 285 if self.name and obj.name != self.name: 286 return False 287 if self.namespace and obj.namespace != self.namespace: 288 return False 289 if self.label_selector and isinstance(obj, (Kustomization, HelmRelease)): 290 obj_labels = obj.labels or {} 291 for name, value in self.label_selector.items(): 292 _LOGGER.debug("Checking %s=%s", name, value) 293 if ( 294 obj_value := obj_labels.get(name) 295 ) is None or obj_value != value: 296 _LOGGER.debug("mismatch v=%s", obj_value) 297 return False 298 return True 299 300 return predicate
A predicate that selects Kustomization objects.
313@dataclass 314class Options: 315 """Options for the resource selector for building manifets.""" 316 317 kustomize_flags: list[str] = field(default_factory=list) 318 skip_kustomize_path_validation: bool = False
Options for the resource selector for building manifets.