diff --git a/CHANGES.md b/CHANGES.md index a9c39a7..8e33996 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ * Improve handling of environment file specification (#63) * Stop running container on SIGINT (#62) * `xcetool image run --server` prints server and viewer URLs (#46) +* Improve type annotations and checks (#69) ## Changes in 0.1.1 diff --git a/xcengine/cli.py b/xcengine/cli.py index cfb7f9a..d86d7a8 100644 --- a/xcengine/cli.py +++ b/xcengine/cli.py @@ -8,8 +8,8 @@ import os import pathlib import subprocess -import sys import tempfile +from typing import TypedDict import click import yaml @@ -86,7 +86,7 @@ def make_script( output_dir=output_dir, clear_output=clear ) if batch or server: - args = ["python3", output_dir / "execute.py"] + args: list[str | pathlib.Path] = ["python3", output_dir / "execute.py"] if batch: args.append("--batch") if server: @@ -102,7 +102,8 @@ def image_cli(): @image_cli.command( - help="Build, and optionally run, a compute engine as a Docker image" + help="Build a compute engine as a Docker image, optionally generating an " + "Application Package" ) @click.option( "-b", @@ -144,7 +145,11 @@ def build( ) -> None: if environment is None: LOGGER.info("No environment file specified on command line.") - init_args = dict(notebook=notebook, environment=environment, tag=tag) + class InitArgs(TypedDict): + notebook: pathlib.Path + environment: pathlib.Path + tag: str + init_args = InitArgs(notebook=notebook, environment=environment, tag=tag) if build_dir: image_builder = ImageBuilder(build_dir=build_dir, **init_args) os.makedirs(build_dir, exist_ok=True) @@ -156,11 +161,9 @@ def build( ) image = image_builder.build() if eoap: - class IndentDumper(yaml.Dumper): def increase_indent(self, flow=False, indentless=False): return super(IndentDumper, self).increase_indent(flow, False) - eoap.write_text( yaml.dump( image_builder.create_cwl(), @@ -212,7 +215,7 @@ def increase_indent(self, flow=False, indentless=False): def run( ctx: click.Context, batch: bool, - server: False, + server: bool, port: int, from_saved: bool, keep: bool, diff --git a/xcengine/core.py b/xcengine/core.py index 92fdc13..26dcca3 100755 --- a/xcengine/core.py +++ b/xcengine/core.py @@ -7,7 +7,6 @@ import os import shutil import signal -import socket import sys import tarfile import subprocess @@ -263,6 +262,7 @@ def export_conda_env() -> dict: ) pip_inspect = PipInspector() if pip_map: + assert pip_index is not None nonlocals = [] for pkg in pip_map["pip"]: if pip_inspect.is_local(pkg): @@ -340,7 +340,7 @@ def __init__( self, image: Image | str, output_dir: pathlib.Path | None, - client: docker.DockerClient = None, + client: docker.DockerClient | None = None, ): self._client = client match image: @@ -383,7 +383,7 @@ def run( ) + (["--from-saved"] if from_saved else []) ) - run_args = dict( + run_args: dict[str, Any] = dict( image=self.image, command=command, remove=False, detach=True ) if host_port is not None: @@ -429,6 +429,7 @@ def _tar_strip(member, path): def extract_output_from_container(self, container: Container) -> None: # This assumes the image-defined CWD, so it won't work in EOAP mode, # but EOAP has its own protocol for data stage-in/out anyway. + assert self.output_dir is not None bits, stat = container.get_archive("/home/mambauser/output") reader = io.BufferedReader(ChunkStream(bits)) with tarfile.open(name=None, mode="r|", fileobj=reader) as tar_fh: @@ -463,7 +464,7 @@ class PipInspector: local filesystem. """ - def __init__(self): + def __init__(self) -> None: environment = os.environ.copy() for varname in "FORCE_COLOR", "CLICOLOR", "CLICOLOR_FORCE": environment.pop(varname, None) diff --git a/xcengine/parameters.py b/xcengine/parameters.py index 291e4a2..f687192 100644 --- a/xcengine/parameters.py +++ b/xcengine/parameters.py @@ -2,12 +2,11 @@ import os import pathlib import typing -from typing import Any +from typing import Any, ClassVar import pystac import xarray as xr import yaml -from typing import ClassVar LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -24,7 +23,7 @@ class NotebookParameters: def __init__( self, params: dict[str, tuple[type, Any]], - config: dict[str, Any] = None, + config: dict[str, Any] | None = None, ): self.params = params self.config = {} if config is None else config @@ -73,7 +72,7 @@ def extract_variables( cls, code: str, setup_code: str | None = None ) -> dict[str, tuple[type, Any]]: if setup_code is None: - locals_ = {} + locals_: dict[str, object] = {} old_locals = {} else: exec(setup_code, globals(), locals_ := {}) @@ -135,13 +134,13 @@ def to_yaml(self) -> str: def read_params_combined( self, cli_args: list[str] | None - ) -> dict[str, str]: + ) -> dict[str, Any]: params = self.read_params_from_env() if cli_args: params.update(self.read_params_from_cli(cli_args)) return params - def read_params_from_env(self) -> dict[str, str]: + def read_params_from_env(self) -> dict[str, Any]: values = {} for param_name, (type_, _) in self.params.items(): env_var_name = "xce_" + param_name @@ -154,7 +153,7 @@ def read_params_from_env(self) -> dict[str, str]: ) return values - def read_params_from_cli(self, args: list[str]) -> dict[str, str]: + def read_params_from_cli(self, args: list[str]) -> dict[str, Any]: values = {} for param_name, (type_, _) in self.params.items(): arg_name = "--" + param_name.replace("_", "-") @@ -216,7 +215,7 @@ def read_staged_in_dataset( ), ) ) - asset = next(a for a in item.assets.values() if "data" in a.roles) + asset = next(a for a in item.assets.values() if "data" in (a.roles or [])) return xr.open_dataset(stage_in_path / asset.href) @staticmethod diff --git a/xcengine/util.py b/xcengine/util.py index 18376b8..472964c 100644 --- a/xcengine/util.py +++ b/xcengine/util.py @@ -1,13 +1,16 @@ # Copyright (c) 2024-2025 by Brockmann Consult GmbH # Permissions are hereby granted under the terms of the MIT License: # https://opensource.org/licenses/MIT. + from collections import namedtuple from datetime import datetime import pathlib import shutil +from typing import NamedTuple, Any, Mapping import pystac import xarray as xr +from xarray import Dataset def clear_directory(directory: pathlib.Path) -> None: @@ -19,7 +22,7 @@ def clear_directory(directory: pathlib.Path) -> None: def write_stac( - datasets: dict[str, xr.Dataset], stac_root: pathlib.Path + datasets: Mapping[str, xr.Dataset], stac_root: pathlib.Path ) -> None: catalog_path = stac_root / "catalog.json" if catalog_path.exists(): @@ -57,9 +60,13 @@ def write_stac( media_type="application/x-netcdf" if output_format == "netcdf" else "application/vnd.zarr", title=ds.attrs.get("title", ds_name), ) - bb = namedtuple("Bounds", ["left", "bottom", "right", "top"])( - 0, -90, 360, 90 - ) # TODO determine and set actual bounds here + class Bounds(NamedTuple): + left: float + bottom: float + right: float + top: float + # TODO determine and set actual bounds here + bb = Bounds(0, -90, 360, 90) item = pystac.Item( id=ds_name, geometry={ @@ -85,8 +92,8 @@ def write_stac( def save_datasets( - datasets, output_path: pathlib.Path, eoap_mode: bool -) -> dict[str, xr.Dataset]: + datasets: Mapping[str, Dataset], output_path: pathlib.Path, eoap_mode: bool +) -> dict[str, pathlib.Path]: saved_datasets = {} # EOAP doesn't require an "output" subdirectory (output can go anywhere # in the CWD) but it's used by xcetool's built-in runner. @@ -98,6 +105,7 @@ def save_datasets( suffix = "nc" if output_format == "netcdf" else "zarr" dataset_path = output_subpath / f"{ds_id}.{suffix}" saved_datasets[ds_id] = dataset_path + if output_format == "netcdf": ds.to_netcdf(dataset_path) else: