Source code for hydromodpy._api

"""Top-level functional API for HydroModPy.

Mirrors the CLI verbs so ``hmp run config.toml`` and ``hmp.run("config.toml")``
execute the same workflow. Kept as a private module so the package facade
stays minimal.
"""

from __future__ import annotations

import importlib
import platform
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any

from hydromodpy.core.version import __version__

if TYPE_CHECKING:
    import geopandas as gpd
    import pandas as pd
    import xarray as xr

    Readable = xr.DataArray | pd.Series | pd.DataFrame | gpd.GeoDataFrame


[docs] def open(workspace: Any, *, create: bool = False) -> Any: """Open a HydroModPy project catalog. The single door to a workspace catalog: returns a :class:`hydromodpy.results.catalog.SimulationCatalog` backed by ``catalog.duckdb``. It exposes object access (``latest``, ``best``, ``find``, ``cat[ref]``), tabular access (``frame``, ``sql``, ``list_simulations``), schema discovery (``describe``, ``tables``, ``columns``, ``variables``, ``metrics``, ``stations``), per-id reads (``read``), and the simulation writers used by the workflow engine. Parameters ---------- workspace Project directory holding ``catalog.duckdb`` (or a direct path to the ``.duckdb`` file). create ``False`` (default) raises :class:`FileNotFoundError` when no catalog exists yet (no phantom catalog is created). ``True`` opens and initialises an empty catalog. Returns ------- hydromodpy.results.catalog.SimulationCatalog Catalog handle for the project. Raises ------ FileNotFoundError If no ``catalog.duckdb`` is found and ``create`` is ``False``. hydromodpy.core.exceptions.CatalogError If the DuckDB catalog file is locked, corrupted, or unreadable. Examples -------- >>> import hydromodpy as hmp >>> cat = hmp.open("~/ws/projects/naizin") # doctest: +SKIP >>> cat.latest() # doctest: +SKIP See Also -------- hydromodpy.index Machine-wide federation across registered workspaces. """ from hydromodpy.core.state.paths import CATALOG_FILENAME, find_catalog_root from hydromodpy.results.catalog import SimulationCatalog ws = Path(workspace).expanduser().resolve() if ws.suffix == ".duckdb": catalog_file = ws else: catalog_file = find_catalog_root(ws) / CATALOG_FILENAME if not create and not catalog_file.is_file(): raise FileNotFoundError( f"No catalog at {catalog_file.parent}. Run a workflow there first, " f"or pass create=True to initialise an empty catalog." ) return SimulationCatalog(ws)
[docs] def index(db_path: Any = None, *, read_only: bool = False) -> Any: """Open the machine-wide global index that federates registered workspaces. Parameters ---------- db_path Optional path to the index DuckDB file. ``None`` uses the default machine-state location. read_only Open the index in read-only mode. Writes (``register_workspace``, ``forget``, ``prune``) will raise. Pure reads (``search``, ``find``, ``list_workspaces``) keep working while another process holds the write-lock. Returns ------- GlobalIndex Index object exposing ``register_workspace``, ``find``, ``search``, ``prune`` and ``forget``. Raises ------ RuntimeError If a mutating method is called on a read-only handle. duckdb.IOException If the index database cannot be opened due to non-lock I/O errors. Examples -------- >>> import hydromodpy as hmp >>> idx = hmp.index(read_only=True) # doctest: +SKIP >>> idx.list_workspaces() # doctest: +SKIP See Also -------- hydromodpy.core.state.global_index.GlobalIndex Underlying federation implementation. """ from pathlib import Path as _Path from hydromodpy.core.state.global_index import GlobalIndex resolved = _Path(db_path).expanduser().resolve() if db_path is not None else None return GlobalIndex(resolved, read_only=read_only)
[docs] def run(config: Any, **kwargs: Any) -> Any: """Run a HydroModPy workflow from Python. Path and config-object inputs converge on the same dispatch. Simulation workflows return a :class:`~hydromodpy.results.run.Run` (or ``None`` when nothing was persisted, e.g. ``dry_run``). Overview, calibration, comparison and testbed workflows return their adapter ``dict`` summary. Parameters ---------- config TOML path or validated configuration object. kwargs Runtime options forwarded to the selected workflow. The ``headless`` keyword is honored on both branches (path and config object) and controls the underlying ``Project`` interactive side effects. Returns ------- Run or None or dict ``Run`` instance (or ``None``) for the ``simulation`` workflow. ``dict`` summary for ``overview``, ``calibration``, ``comparison`` and ``testbed`` workflows. Raises ------ FileNotFoundError If the TOML path does not exist. hydromodpy.core.exceptions.ConfigError If the TOML payload fails Pydantic validation. hydromodpy.core.exceptions.PipelineError If a workflow step raises during execution. hydromodpy.core.exceptions.SolverError If the configured solver fails to converge or crashes. Examples -------- >>> import hydromodpy as hmp >>> run = hmp.run("run_transient_nwt.toml", name="baseline") # doctest: +SKIP See Also -------- hydromodpy.project.Project.simulate Object-oriented form for repeated runs from one project. """ headless = bool(kwargs.pop("headless", False)) if isinstance(config, (str, Path)): from hydromodpy.project.dispatch.workflow import dispatch_workflow from hydromodpy.workflow.dispatch import resolve_workflow config_path = Path(config).expanduser().resolve() workflow = resolve_workflow( config_path, cli_workflow=None, require_toml_field=True, ) return dispatch_workflow(workflow, config_path, **kwargs) from hydromodpy.project import Project with Project(config, headless=headless) as project: return project.simulate(**kwargs)
[docs] def calibrate(config: Any, **kwargs: Any) -> Any: """Run a calibration workflow from a TOML file or config object. Paths route directly to :func:`run_calibration_cli`; in-memory config objects open a lazy :class:`Project` so :func:`run_calibration_programmatic` has the project context it requires. Parameters ---------- config Calibration TOML path or validated configuration object. kwargs Options forwarded to the underlying calibration runner. The ``headless`` keyword controls the project initialization for the in-memory config branch and is ignored for the TOML branch (which builds no project). Returns ------- Any Calibration report or workflow-specific result. Raises ------ FileNotFoundError If the calibration TOML path does not exist. hydromodpy.core.exceptions.ConfigMissingError If neither ``config_path`` nor ``parameters`` is supplied. hydromodpy.core.exceptions.CalibrationError If the optimizer or objective evaluation fails. Examples -------- >>> import hydromodpy as hmp >>> report = hmp.calibrate("calibration.toml") # doctest: +SKIP See Also -------- hydromodpy.calibration.cli_runner.run_calibration_cli TOML entry point used by the path branch. hydromodpy.calibration.programmatic_runner.run_calibration_programmatic Python entry point used by the config-object branch. hydromodpy.calibration.CalibrationReport Structured calibration result. """ if isinstance(config, (str, Path)): from hydromodpy.calibration.cli_runner import run_calibration_cli kwargs.pop("headless", None) return run_calibration_cli(Path(config).expanduser().resolve(), **kwargs) from hydromodpy.project import Project headless = bool(kwargs.pop("headless", True)) with Project(config, headless=headless) as project: return project.calibrate(**kwargs)
[docs] def compare_pair(sim_a: Any, sim_b: Any, *, workspace: Any = None) -> Any: """Compare two simulations by id or result object. Parameters ---------- sim_a, sim_b Simulation ids or objects accepted by the comparison runtime. workspace Optional workspace used to resolve simulation ids. Returns ------- pandas.DataFrame Side-by-side comparison table. Raises ------ hydromodpy.results.errors.RunNotFoundError If either simulation id cannot be resolved in the workspace. Examples -------- >>> import hydromodpy as hmp >>> table = hmp.compare_pair( ... "ab12cd34", "ef56gh78", workspace="~/hmp_workspace" ... ) # doctest: +SKIP See Also -------- hydromodpy.analysis.comparison Comparison package used by this helper. """ from hydromodpy.analysis.comparison.pairwise import compare_pair as _compare_pair return _compare_pair(sim_a, sim_b, workspace=workspace)
[docs] def report(session_id_or_prefix: Any = None, *, workspace: Any = None) -> Any: """Render the HTML report for a calibration session. ``session_id_or_prefix`` accepts a full UUID, a unique hex prefix, or ``None`` to fall back to the most recently started session. ``workspace`` defaults to the nearest ancestor of the current working directory containing ``catalog.duckdb``. Parameters ---------- session_id_or_prefix Full session UUID, unique hex prefix, or ``None`` for the latest session. workspace Optional workspace directory. Returns ------- Any Report rendering result. Raises ------ hydromodpy.results.errors.RunNotFoundError If no calibration session matches ``session_id_or_prefix``. hydromodpy.core.exceptions.DisplayError If the report template or one of its figures fails to render. Examples -------- >>> import hydromodpy as hmp >>> hmp.report() # latest session in the current workspace # doctest: +SKIP >>> hmp.report("ab12cd34", workspace="~/hmp_workspace") # doctest: +SKIP """ from hydromodpy.calibration.report import resolve_calibration_session_id from hydromodpy.core.state.paths import CATALOG_FILENAME from hydromodpy.results.catalog import SimulationCatalog from hydromodpy.workflow.steps.calibration import step_render_calibration_report if workspace is None: workspace_root = Path.cwd() for parent in [workspace_root] + list(workspace_root.parents): if (parent / CATALOG_FILENAME).exists(): workspace_root = parent break else: workspace_root = Path(workspace).expanduser().resolve() with SimulationCatalog(workspace_root) as catalog: full_id = resolve_calibration_session_id(catalog, session_id_or_prefix) return step_render_calibration_report( catalog=catalog, session_id=full_id, workspace_root=workspace_root, )
[docs] def read( sim: Any, var: str, *, time: int | slice | None = None, layer: int | None = None, sel: dict | None = None, bbox: tuple[float, float, float, float] | None = None, ) -> Any: """Read a variable from a simulation Run with storage-kind auto-dispatch. Single entry point for reads on a :class:`~hydromodpy.results.run.Run`. The return type follows one rule: - Zarr field -> ``xr.DataArray`` (lazy); when ``time`` is an ``int``, the eager ``np.ndarray`` of that single timestep instead. - timeseries -> ``pd.Series``. - geographic feature -> ``gpd.GeoDataFrame``. To read by reference (id / unique prefix / name) instead of a ``Run``, use ``cat.read(ref, var)`` on a :class:`hydromodpy.catalog.Catalog`. Parameters ---------- sim A :class:`~hydromodpy.results.run.Run` (e.g. ``cat.latest()`` or ``cat[ref]``). var Variable name, resolved against the field registry, then the DuckDB ``timeseries`` table, then the geographic features. time Timestep index (``int``) or ``slice`` for Zarr fields. ``None`` loads every persisted timestep lazily. layer Optional layer index for three-dimensional fields. sel Optional selectors forwarded to the reader: ``{"station": ...}`` for timeseries, ``{"period": ...}`` for a time window. bbox Optional ``(xmin, ymin, xmax, ymax)`` in the simulation CRS; restricts Zarr fields to faces whose centroid lies in the box. Returns ------- xarray.DataArray or numpy.ndarray or pandas.Series or geopandas.GeoDataFrame See the rule above. Raises ------ TypeError If ``sim`` is not a :class:`Run` instance. hydromodpy.results.errors.FieldNotFoundError If ``var`` could not be resolved by any backend. Examples -------- >>> import hydromodpy as hmp >>> cat = hmp.open("~/ws/projects/naizin") # doctest: +SKIP >>> run = cat.latest() # doctest: +SKIP >>> da = hmp.read(run, "head") # lazy DataArray # doctest: +SKIP >>> arr = hmp.read(run, "head", time=-1, layer=0) # ndarray # doctest: +SKIP >>> ts = hmp.read(run, "discharge", sel={"station": "outlet"}) # doctest: +SKIP >>> gdf = hmp.read(run, "watershed_polygon") # doctest: +SKIP """ from hydromodpy.results.reading import read_variable return read_variable(sim, var, time=time, layer=layer, sel=sel, bbox=bbox)
[docs] def export( sim: Any, var: str | list[str], dest: Any, *, fmt: str | None = None, time: int | str | None = None, layer: int | None = None, resolution: float | None = None, crs: str | None = None, nodata: float = -9999.0, ) -> Path: """Export a variable from a simulation to a standalone file. Functional mirror of :func:`read`: same selector (``sim`` / ``var`` / ``time`` / ``layer``) plus an output format and destination. ``sim`` must be a :class:`~hydromodpy.results.run.Run`, as returned by ``hmp.open(workspace)[ref]`` or ``catalog.latest()``. ``fmt`` is optional when ``dest`` carries a known extension (``.nc`` -> netcdf, ``.tif`` -> geotiff, ``.csv`` -> csv, ``.shp`` -> shapefile, ``.vtu`` -> vtu, ``.hmp`` -> portable package). Examples -------- >>> import hydromodpy as hmp >>> run = hmp.open("~/hmp_workspace")["transient_nwt"] # doctest: +SKIP >>> hmp.export( ... run, "head", "head.tif", time="last", resolution=50 ... ) # doctest: +SKIP >>> hmp.export( ... run, ["head", "watertable_depth"], "fields.nc", time="all" ... ) # doctest: +SKIP """ from hydromodpy.results.run import Run if not isinstance(sim, Run): raise TypeError( f"hmp.export expects a Run object as first argument, got {type(sim).__name__}. " f"Obtain one with hmp.open(workspace)[ref] or catalog.latest()." ) return sim.export( var, dest, fmt=fmt, time=time, layer=layer, resolution=resolution, crs=crs, nodata=nodata, )
[docs] def audit_prune(workspace: Any = None, *, apply: bool = False) -> dict[str, int]: """Apply ``retention_policies`` to ``audit_log`` for the workspace catalog. Parameters ---------- workspace Path to a workspace or project directory. Resolved via :func:`hydromodpy.cli.helpers.find_catalog_root` so any path under the project tree works. ``None`` resolves to the current directory. apply ``False`` (default) counts rows that would be removed without modifying the file. ``True`` actually deletes rows. Returns ------- dict[str, int] Mapping ``event_type -> rows_affected``. Empty when no retention policy is registered. Raises ------ FileNotFoundError If the workspace does not host a ``catalog.duckdb``. """ from hydromodpy.core.state.paths import CATALOG_FILENAME, find_catalog_root from hydromodpy.results.catalog import SimulationCatalog from hydromodpy.results.catalog.audit import apply_retention workspace_root = find_catalog_root( Path(workspace).expanduser().resolve() if workspace else Path.cwd().resolve() ) catalog_path = workspace_root / CATALOG_FILENAME if not catalog_path.is_file(): raise FileNotFoundError(f"No catalog at {workspace_root}") with SimulationCatalog(workspace_root) as catalog: return apply_retention(catalog.connection, dry_run=not apply)
[docs] def doctor() -> dict: """Lightweight environment diagnostic. Returns a dict describing Python, hydromodpy, and solver versions. Quick by design (no actual solver invocation) and safe to call at import probing time. Returns ------- dict Diagnostic payload with Python, HydroModPy, optional package, and solver executable information. Examples -------- >>> import hydromodpy as hmp >>> hmp.doctor()["hydromodpy"] # doctest: +SKIP """ info: dict = { "python": platform.python_version(), "hydromodpy": __version__, "solvers": {}, "optional": {}, } for pkg in ( "numpy", "pandas", "scipy", "duckdb", "zarr", "pyproj", "rasterio", "shapely", "xarray", "flopy", "pydantic", "pint", "matplotlib", "gmsh", "whitebox_workflows", "geopandas", "pyvista", ): try: mod = importlib.import_module(pkg) info["optional"][pkg] = getattr(mod, "__version__", "?") except Exception: info["optional"][pkg] = None for exe in ("mf2005", "mfnwt", "mf6", "mp6", "mp7", "mt3dusgs"): info["solvers"][exe] = shutil.which(exe) return info