Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ rivretrieve/data

**/*.pyc
**/*.png
**/*.env
**/*.env

.ipynb_checkpoints/
__pycache__/
.DS_Store
3 changes: 2 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ API Reference
fetchers/chile
fetchers/czech
fetchers/france
fetchers/germany_berlin
fetchers/japan
fetchers/norway
fetchers/poland
Expand All @@ -23,4 +24,4 @@ API Reference
fetchers/spain
fetchers/uk_ea
fetchers/uk_nrfa
fetchers/usa
fetchers/usa
5 changes: 5 additions & 0 deletions docs/fetchers/germany_berlin.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Germany Berlin Fetcher
======================

.. automodule:: rivretrieve.germany_berlin
:members:
58 changes: 58 additions & 0 deletions examples/test_germany_berlin_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging

import matplotlib.pyplot as plt

from rivretrieve import GermanyBerlinFetcher, constants

logging.basicConfig(level=logging.INFO)

# Berlin gauge IDs (example: 5867601 = Tegeler See)
gauge_ids = ["5867601"]

# Variable to test — choose from available constants
variable = constants.DISCHARGE_DAILY_MEAN # water level (m)

# Period to fetch
start_date = "2023-10-01"
end_date = "2024-03-31"

plt.figure(figsize=(12, 6))

fetcher = GermanyBerlinFetcher()

for gauge_id in gauge_ids:
print(f"Fetching data for {gauge_id} from {start_date} to {end_date}...")

data = fetcher.get_data(
gauge_id=gauge_id,
variable=variable,
start_date=start_date,
end_date=end_date,
)

if not data.empty:
print(f"\nData retrieved for gauge {gauge_id}")
print(data.head())
print(f"Time series from {data.index.min()} to {data.index.max()}")

plt.plot(
data.index,
data[variable],
label=gauge_id,
marker="o",
)
else:
print(f"\nNo data found for {gauge_id}")

plt.xlabel(constants.TIME_INDEX)
plt.ylabel(f"{variable} ({'m³/s' if variable == constants.DISCHARGE_DAILY_MEAN else 'm'})")
plt.title(f"Berlin ({gauge_ids[0]}) — {variable} time series")
plt.legend()
plt.grid(True)
plt.tight_layout()

plot_path = "berlin_fetcher_plot.png"
plt.savefig(plot_path)
print(f"Plot saved to {plot_path}")

# print(fetcher.get_metadata())
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ tqdm
zarr>=3.0.7
sphinx>=8.0.0
sphinx-rtd-theme>=3.0.0
myst-parser>=4.0.0
myst-parser>=4.0.0
pyproj>=3.7.1
1 change: 1 addition & 0 deletions rivretrieve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .chile import ChileFetcher
from .czech import CzechFetcher
from .france import FranceFetcher
from .germany_berlin import GermanyBerlinFetcher
from .japan import JapanFetcher
from .norway import NorwayFetcher
from .poland import PolandFetcher
Expand Down
190 changes: 190 additions & 0 deletions rivretrieve/cached_site_data/germany_berlin_sites.csv

Large diffs are not rendered by default.

227 changes: 227 additions & 0 deletions rivretrieve/germany_berlin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
"""Fetcher for Berlin river gauge data from Wasserportal Berlin."""

import logging
from io import StringIO
from typing import Optional

import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from pyproj import Transformer

from . import base, constants, utils

logger = logging.getLogger(__name__)


class GermanyBerlinFetcher(base.RiverDataFetcher):
"""Fetches river gauge data from Wasserportal Berlin.

Data source:
https://wasserportal.berlin.de/

Supported variables:
- constants.STAGE_DAILY_MEAN (m)
- constants.DISCHARGE_DAILY_MEAN (m³/s)
- constants.WATER_TEMPERATURE_DAILY_MEAN (°C)
- constants.STAGE_INSTANT (m)
- constants.DISCHARGE_INSTANT (m³/s)

Data description and API:
- see https://wasserportal.berlin.de/download/

Terms of use:
- see https://daten.berlin.de/impressum
"""

METADATA_URL = "https://wasserportal.berlin.de/start.php?anzeige=tabelle_ow&messanzeige=ms_all"
BASE_URL = (
"https://wasserportal.berlin.de/station.php"
"?anzeige=d&station={id}&thema={thema}&sreihe={frequency}&smode=c&sdatum={start_date}"
)

@staticmethod
def get_cached_metadata() -> pd.DataFrame:
"""Retrieves cached metadata (if available)."""
return utils.load_cached_metadata_csv("germany_berlin")

def get_metadata(self) -> pd.DataFrame:
"""Downloads and parses site metadata from Wasserportal Berlin.

Keeps all original columns, but renames and standardizes key fields.
Converts UTM33N (EPSG:32633) → WGS84 (EPSG:4326) coordinates.
"""
try:
logger.info(f"Fetching Berlin metadata from {self.METADATA_URL}")
resp = requests.get(self.METADATA_URL, timeout=20)
resp.raise_for_status()

soup = BeautifulSoup(resp.text, "html.parser")
table = soup.find("table")
if table is None:
raise ValueError("No table found in metadata page.")

df = pd.read_html(str(table))[0]
df.columns = [c.strip() for c in df.columns]

rename_map = {
"Messstellen- nummer": constants.GAUGE_ID,
"Messstellen- name": constants.STATION_NAME,
"Gewässer": constants.RIVER,
"Rechts- wert": "utm_easting",
"Hoch- wert": "utm_northing",
}
df = df.rename(columns=rename_map)

# Convert numeric UTM coordinates
if "utm_easting" in df.columns and "utm_northing" in df.columns:
df["utm_easting"] = pd.to_numeric(df["utm_easting"], errors="coerce")
df["utm_northing"] = pd.to_numeric(df["utm_northing"], errors="coerce")

# Drop invalid coordinates
df = df.dropna(subset=["utm_easting", "utm_northing"])

# Fix scaling (some values are 10× larger)
mask_large = df["utm_easting"] > 1_000_000
df.loc[mask_large, "utm_easting"] = df.loc[mask_large, "utm_easting"] / 10

transformer = Transformer.from_crs("EPSG:32633", "EPSG:4326", always_xy=True)
lon, lat = transformer.transform(df["utm_easting"].values, df["utm_northing"].values)
df[constants.LONGITUDE] = lon
df[constants.LATITUDE] = lat
else:
df[constants.LONGITUDE] = np.nan
df[constants.LATITUDE] = np.nan

# Add standardized metadata fields if missing
for col in [
constants.GAUGE_ID,
constants.STATION_NAME,
constants.RIVER,
constants.LATITUDE,
constants.LONGITUDE,
constants.ALTITUDE,
constants.AREA,
constants.COUNTRY,
constants.SOURCE,
]:
if col not in df.columns:
df[col] = np.nan

df[constants.ALTITUDE] = None
df[constants.AREA] = None
df[constants.COUNTRY] = "Germany"
df[constants.SOURCE] = "Wasserportal Berlin"

# Clean and type-correct
df[constants.GAUGE_ID] = df[constants.GAUGE_ID].astype(str).str.strip()
df[constants.LATITUDE] = pd.to_numeric(df[constants.LATITUDE], errors="coerce")
df[constants.LONGITUDE] = pd.to_numeric(df[constants.LONGITUDE], errors="coerce")

# Drop duplicates
df = df.drop_duplicates(subset=[constants.GAUGE_ID])

logger.info(f"Fetched {len(df)} Berlin gauge metadata records.")
return df.reset_index(drop=True)

except Exception as e:
logger.error(f"Failed to fetch metadata: {e}")
return pd.DataFrame()

@staticmethod
def get_available_variables() -> tuple[str, ...]:
return (
constants.STAGE_DAILY_MEAN,
constants.DISCHARGE_DAILY_MEAN,
constants.WATER_TEMPERATURE_DAILY_MEAN,
constants.STAGE_INSTANT,
constants.DISCHARGE_INSTANT,
)

def _download_data(
self,
gauge_id: str,
variable: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""Downloads CSV data for a gauge and variable."""
thema_map = {
# Daily
constants.STAGE_DAILY_MEAN: ("ows", "tw"),
constants.DISCHARGE_DAILY_MEAN: ("odf", "tw"),
constants.WATER_TEMPERATURE_DAILY_MEAN: ("owt", "tw"),
# Instantaneous
constants.STAGE_INSTANT: ("ows", "ew"),
constants.DISCHARGE_INSTANT: ("odf", "ew"),
}

if variable not in thema_map:
raise ValueError(f"Unsupported variable: {variable}")

thema, frequency = thema_map[variable]
start_date_fmt = pd.to_datetime(start_date).strftime("%d.%m.%Y")
url = self.BASE_URL.format(id=gauge_id, thema=thema, frequency=frequency, start_date=start_date_fmt)

logger.info(f"Fetching {variable} for {gauge_id} from {url}")
r = requests.get(url, timeout=20)
r.raise_for_status()

csv_text = r.text.strip()
if not csv_text or "<html" in csv_text or "Fehler" in csv_text:
logger.warning(f"No data returned for {gauge_id} ({variable})")
return pd.DataFrame()

try:
df = pd.read_csv(StringIO(csv_text), sep=";", decimal=",", encoding="utf-8")
return df
except Exception as e:
logger.error(f"Error parsing CSV for {gauge_id}: {e}")
return pd.DataFrame()

def _parse_data(self, gauge_id: str, raw_data: pd.DataFrame, variable: str) -> pd.DataFrame:
"""Parses Wasserportal CSV to standardized DataFrame."""
if raw_data.empty:
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])

raw_data.columns = [c.strip().lower() for c in raw_data.columns]
time_col = next((c for c in raw_data.columns if "datum" in c or "zeit" in c), raw_data.columns[0])
val_col = next(
(c for c in raw_data.columns if c not in [time_col] and raw_data[c].dtype != "O"), raw_data.columns[1]
)

raw_data[constants.TIME_INDEX] = pd.to_datetime(raw_data[time_col], dayfirst=True, errors="coerce")
raw_data[variable] = pd.to_numeric(raw_data[val_col], errors="coerce")

if variable == constants.STAGE_DAILY_MEAN:
raw_data[variable] = raw_data[variable] / 100.0 # cm → m

df = (
raw_data[[constants.TIME_INDEX, variable]]
.dropna()
.sort_values(by=constants.TIME_INDEX)
.set_index(constants.TIME_INDEX)
)
return df

def get_data(
self,
gauge_id: str,
variable: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
start_date = utils.format_start_date(start_date)
end_date = utils.format_end_date(end_date)

if variable not in self.get_available_variables():
raise ValueError(f"Unsupported variable: {variable}")

try:
raw_df = self._download_data(gauge_id, variable, start_date, end_date)
df = self._parse_data(gauge_id, raw_df, variable)
return df.loc[(df.index >= start_date) & (df.index <= end_date)]
except Exception as e:
logger.error(f"Failed to get data for site {gauge_id}, variable {variable}: {e}")
return pd.DataFrame(columns=[constants.TIME_INDEX, variable])
4 changes: 4 additions & 0 deletions tests/test_data/germany_berlin_discharge_sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Datum;Tagesmittelwert;"Stationsnummer: 5867601";"Stationsname: Eisstadion";"Gew�sser: Panke";"Durchfluss in m�/s";"Fehlwerte: -777"
01.01.2024;1,75
02.01.2024;1,75
03.01.2024;2,08
47 changes: 47 additions & 0 deletions tests/test_germany_berlin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import unittest
from unittest.mock import MagicMock, patch

import pandas as pd
from pandas.testing import assert_frame_equal

from rivretrieve import GermanyBerlinFetcher, constants


class TestGermanyBerlinFetcher(unittest.TestCase):
def setUp(self):
self.fetcher = GermanyBerlinFetcher()
self.test_data_dir = os.path.join(os.path.dirname(__file__), "test_data")

def load_sample_data(self, filename):
with open(os.path.join(self.test_data_dir, filename), "r", encoding="utf-8") as f:
return f.read()

@patch("requests.get")
def test_get_data_discharge(self, mock_get):
sample_csv = self.load_sample_data("germany_berlin_discharge_sample.csv")

mock_response = MagicMock()
mock_response.text = sample_csv
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response

gauge_id = "5867601"
variable = constants.DISCHARGE_DAILY_MEAN
start_date = "2024-01-01"
end_date = "2024-01-03"

result_df = self.fetcher.get_data(gauge_id, variable, start_date, end_date)

expected_data = {
constants.TIME_INDEX: pd.to_datetime(["2024-01-01", "2024-01-02", "2024-01-03"]),
constants.DISCHARGE_DAILY_MEAN: [1.75, 1.75, 2.08],
}
expected_df = pd.DataFrame(expected_data).set_index(constants.TIME_INDEX)

assert_frame_equal(result_df, expected_df)
mock_get.assert_called_once()


if __name__ == "__main__":
unittest.main()