Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 113 additions & 8 deletions floatcsep/infrastructure/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from abc import ABC, abstractmethod
from typing import Union

import docker
from docker.errors import ImageNotFound, NotFound, APIError
from packaging.specifiers import SpecifierSet

log = logging.getLogger("floatLogger")
Expand Down Expand Up @@ -387,20 +389,123 @@ class DockerManager(EnvironmentManager):
"""

def __init__(self, base_name: str, model_directory: str) -> None:
self.base_name = base_name
self.base_name = base_name.replace(" ", "_")
self.model_directory = model_directory

def create_environment(self, force=False) -> None:
pass
# use a lower-case slug for tags
slug = self.base_name.lower()
self.image_tag = f"{slug}_image"
self.container_name = f"{slug}_container"

def env_exists(self) -> None:
pass
# Docker SDK client
self.client = docker.from_env()

def run_command(self, command) -> None:
pass
def create_environment(self, force: bool = False) -> None:
"""
Build (or rebuild) the Docker image for this model.
"""

# If forced, remove the existing image
if force and self.env_exists():
log.info(f"[{self.base_name}] Removing existing image '{self.image_tag}'")
try:
self.client.images.remove(self.image_tag, force=True)
except APIError as e:
log.warning(f"[{self.base_name}] Could not remove image: {e}")

# If image is missing or rebuild was requested, build it now
if force or not self.env_exists():
build_path = os.path.abspath(self.model_directory)
uid, gid = os.getuid(), os.getgid()
build_args = {
"USER_UID": str(uid),
"USER_GID": str(gid),
}
log.info(f"[{self.base_name}] Building image '{self.image_tag}' from {build_path}")

build_logs = self.client.api.build(
path=build_path,
tag=self.image_tag,
rm=True,
decode=True,
buildargs=build_args,
nocache=False # todo: create model arg for --no-cache
)

# Stream each chunk
for chunk in build_logs:
if "stream" in chunk:
for line in chunk["stream"].splitlines():
log.debug(f"[{self.base_name}][build] {line}")
elif "errorDetail" in chunk:
msg = chunk["errorDetail"].get("message", "").strip()
log.error(f"[{self.base_name}][build error] {msg}")
raise RuntimeError(f"Docker build error: {msg}")
log.info(f"[{self.base_name}] Successfully built '{self.image_tag}'")

def env_exists(self) -> bool:
"""
Checks if the Docker image with the given tag already exists.

Returns:
bool: True if the Docker image exists, False otherwise.
"""
"""
Returns True if an image with our tag already exists locally.
"""
try:
self.client.images.get(self.image_tag)
return True
except ImageNotFound:
return False

def run_command(self, command=None) -> None:
"""
Runs the model’s Docker container with input/ and forecasts/ mounted.
Streams logs and checks for non-zero exit codes.
"""
model_root = os.path.abspath(self.model_directory)
mounts = {
os.path.join(model_root, "input"): {'bind': '/app/input', 'mode': 'rw'},
os.path.join(model_root, "forecasts"): {'bind': '/app/forecasts', 'mode': 'rw'},
}

uid, gid = os.getuid(), os.getgid()

log.info(f"[{self.base_name}] Launching container {self.container_name}")

try:
container = self.client.containers.run(
self.image_tag,
remove=False,
volumes=mounts,
detach=True,
user=f"{uid}:{gid}",
)
except docker.errors.APIError as e:
raise RuntimeError(f"[{self.base_name}] Failed to start container: {e}")

# Log output live
for line in container.logs(stream=True):
log.info(f"[{self.base_name}] {line.decode().rstrip()}")

# Wait for exit
exit_code = container.wait().get("StatusCode", 1)

# Clean up
container.remove(force=True)

if exit_code != 0:
raise RuntimeError(f"[{self.base_name}] Container exited with code {exit_code}")

log.info(f"[{self.base_name}] Container finished successfully.")

def install_dependencies(self) -> None:
pass
"""
Installs dependencies for Docker-based models. This is typically handled by the Dockerfile,
so no additional action is needed here.
"""
log.info("No additional dependency installation required for Docker environments.")


class EnvironmentFactory:
Expand Down
14 changes: 11 additions & 3 deletions floatcsep/infrastructure/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(
database: str = None,
args_file: str = None,
input_cat: str = None,
fmt: str = None,
) -> None:
"""

Expand All @@ -111,6 +112,8 @@ def __init__(
self.input_cat = input_cat
self.forecasts = {}

self._fmt = fmt

def get(self, *args: Sequence[str]) -> str:
"""
Args:
Expand Down Expand Up @@ -161,7 +164,11 @@ def fmt(self) -> str:
if self.database:
return os.path.splitext(self.database)[1][1:]
else:
return os.path.splitext(self.path)[1][1:]
ext = os.path.splitext(self.path)[1][1:]
if ext:
return ext
else:
return self._fmt

def as_dict(self) -> dict:
"""
Expand Down Expand Up @@ -199,7 +206,7 @@ def build_tree(
model_class: str = "TimeIndependentModel",
prefix: str = None,
args_file: str = None,
input_cat: str = None,
input_cat: str = None
) -> None:
"""
Creates the run directory, and reads the file structure inside.
Expand All @@ -210,6 +217,7 @@ def build_tree(
prefix (str): prefix of the model forecast filenames if TD
args_file (str, bool): input arguments path of the model if TD
input_cat (str, bool): input catalog path of the model if TD
fmt (str, bool): for time dependent mdoels

"""

Expand All @@ -235,7 +243,7 @@ def build_tree(

# set forecast names
self.forecasts = {
win: join(dirtree["forecasts"], f"{prefix}_{win}.csv") for win in windows
win: join(dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}") for win in windows
}

def log_tree(self) -> None:
Expand Down
38 changes: 36 additions & 2 deletions floatcsep/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Callable, Union, Sequence

import git
import yaml
from csep.core.forecasts import GriddedForecast, CatalogForecast

from floatcsep.utils.accessors import from_zenodo, from_git
Expand Down Expand Up @@ -106,6 +107,7 @@ def get_source(self, zenodo_id: int = None, giturl: str = None, **kwargs) -> Non
from_git(
giturl,
self.registry.dir if self.registry.fmt else self.registry.path,
force=self.force_stage,
**kwargs,
)
except (git.NoSuchPathError, git.CommandError) as msg:
Expand All @@ -122,7 +124,7 @@ def get_source(self, zenodo_id: int = None, giturl: str = None, **kwargs) -> Non
f"structure"
)

def as_dict(self, excluded=("name", "repository", "workdir")):
def as_dict(self, excluded=("name", "repository", "workdir", "environment")):
"""
Returns:
Dictionary with relevant attributes. Model can be re-instantiated from this dict
Expand Down Expand Up @@ -295,6 +297,7 @@ def __init__(
model_path: str,
func: Union[str, Callable] = None,
func_kwargs: dict = None,
fmt: str = 'csv',
**kwargs,
) -> None:
"""
Expand All @@ -317,7 +320,9 @@ def __init__(
self.func = func
self.func_kwargs = func_kwargs or {}

self.registry = ForecastRegistry(kwargs.get("workdir", os.getcwd()), model_path)
self.registry = ForecastRegistry(workdir=kwargs.get("workdir", os.getcwd()),
path=model_path,
fmt=fmt)
self.repository = ForecastRepository.factory(
self.registry, model_class=self.__class__.__name__, **kwargs
)
Expand Down Expand Up @@ -451,3 +456,32 @@ def replace_arg(arg, val, fp):

with open(filepath, "w") as file_:
json.dump(args, file_, indent=2)

elif fmt == ".yml" or fmt == ".yaml":

def nested_update(dest: dict, src: dict, max_depth: int = 3, _level: int = 1):
"""
Recursively update dest with values from src down to max_depth levels.
- If dest[k] and src[k] are both dicts, recurse (until max_depth).
- Otherwise overwrite dest[k] with src[k].
"""
for key, val in src.items():
if (
_level < max_depth
and key in dest
and isinstance(dest[key], dict)
and isinstance(val, dict)
):
nested_update(dest[key], val, max_depth, _level + 1)
else:
dest[key] = val


with open(filepath, "r") as file_:
args = yaml.safe_load(file_)
args["start_date"] = start.isoformat()
args["end_date"] = end.isoformat()

nested_update(args, self.func_kwargs)
with open(filepath, "w") as file_:
yaml.safe_dump(args, file_, indent=2)
14 changes: 9 additions & 5 deletions floatcsep/utils/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
import sys
import shutil

HOST_CATALOG = "https://service.iris.edu/fdsnws/event/1/query?"
TIMEOUT = 180


def from_zenodo(record_id, folder, force=False):
"""
Download data from a Zenodo repository.
Expand Down Expand Up @@ -50,7 +46,7 @@ def from_zenodo(record_id, folder, force=False):
sys.exit(-1)


def from_git(url, path, branch=None, depth=1, **kwargs):
def from_git(url, path, branch=None, depth=1, force=False, **kwargs):
"""
Clones a shallow repository from a git url.

Expand All @@ -59,6 +55,7 @@ def from_git(url, path, branch=None, depth=1, **kwargs):
path (str): path/folder where to clone the repo
branch (str): repository's branch to clone (default: main)
depth (int): depth history of commits
force (bool): If True, deletes existing path before cloning
**kwargs: keyword args passed to Repo.clone_from

Returns:
Expand All @@ -68,6 +65,13 @@ def from_git(url, path, branch=None, depth=1, **kwargs):
kwargs.update({"depth": depth})
git.refresh()

if os.path.exists(path):
if force:
shutil.rmtree(path)
elif os.listdir(path):
raise ValueError(f"Cannot clone into non-empty directory: {path}")
os.makedirs(path, exist_ok=True)

try:
repo = git.Repo(path)
except (git.NoSuchPathError, git.InvalidGitRepositoryError):
Expand Down
1 change: 0 additions & 1 deletion floatcsep/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ def timewindows_ti(
timelimits = pandas.date_range(
start=start_date, end=end_date, periods=periods, freq=frequency
)
print(timelimits)
timelimits = timelimits.to_pydatetime()
except ValueError as e_:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ubuntu:22.04
CMD ["/bin/false"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM nonexistingimage:latest
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ubuntu:22.04
CMD ["asd"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ubuntu:22.04
CMD ["touch", "/root/forbidden"]
2 changes: 2 additions & 0 deletions tests/artifacts/models/docker_test/valid/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM ubuntu:22.04
CMD ["echo", "Hello from valid container"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM ubuntu:22.04
RUN useradd -u 1234 -m modeler
USER modeler
CMD ["id"]
Loading