Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10.14"
python-version: "3.11.14"
- name: Set up Poetry
uses: snok/install-poetry@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ repos:
rev: 24.8.0
hooks:
- id: black
language_version: python3.10
language_version: python3.11
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10-slim
FROM python:3.11-slim

WORKDIR /app
RUN useradd -l -m -s /bin/bash appuser
Expand Down
7 changes: 6 additions & 1 deletion data_access_service/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
STR_LATITUDE_LOWER_CASE,
STR_LONGITUDE_UPPER_CASE,
STR_LONGITUDE_LOWER_CASE,
STR_TIME_UPPER_CASE,
)
from data_access_service.core.descriptor import Depth, Descriptor, Coordinate
from urllib.parse import unquote_plus
Expand Down Expand Up @@ -753,9 +754,12 @@ def get_dataset(
lon_mapped = self.map_column_names(
uuid, key, [STR_LONGITUDE_UPPER_CASE]
)

time_mapped = self.map_column_names(
uuid, key, [STR_TIME_UPPER_CASE]
)
lat_varname = lat_mapped[0] if lat_mapped else None
lon_varname = lon_mapped[0] if lon_mapped else None
time_varname = time_mapped[0] if time_mapped else None

# Accuracy to nanoseconds
result = ds.get_data(
Expand All @@ -769,6 +773,7 @@ def get_dataset(
self.map_column_names(uuid, key, columns),
lat_varname=lat_varname,
lon_varname=lon_varname,
time_varname=time_varname,
)

return ddf.from_pandas(
Expand Down
10 changes: 6 additions & 4 deletions data_access_service/tasks/generate_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from data_access_service import API, init_log, Config
from data_access_service.core.AWSHelper import AWSHelper
from data_access_service.core.constants import PARTITION_KEY
from data_access_service.core.constants import PARTITION_KEY, STR_TIME_UPPER_CASE
from data_access_service.core.descriptor import Descriptor
from data_access_service.tasks.data_file_upload import (
upload_all_files_in_folder_to_temp_s3,
Expand Down Expand Up @@ -139,7 +139,9 @@ def _generate_partition_output(

log.info(f"Saved table schema to {schema_path}")

checked_date_ranges = check_rows_with_date_range(datasource, date_ranges)
checked_date_ranges = check_rows_with_date_range(
api, uuid, key, datasource, date_ranges
)

need_append = False
for date_range in checked_date_ranges:
Expand All @@ -162,7 +164,7 @@ def _generate_partition_output(

# Derive partition key without time
time_key = api.map_column_names(
uuid=uuid, key=key, columns=["TIME"]
uuid=uuid, key=key, columns=[STR_TIME_UPPER_CASE]
)[0]
result[PARTITION_KEY] = result[time_key].dt.strftime("%Y-%m")

Expand Down Expand Up @@ -192,7 +194,7 @@ def _generate_partition_output(
need_append = True
else:
time_dim = api.map_column_names(
uuid=uuid, key=key, columns=["TIME"]
uuid=uuid, key=key, columns=[STR_TIME_UPPER_CASE]
)[0]
result.to_zarr(
output_path, mode="a", append_dim=time_dim, compute=True
Expand Down
53 changes: 29 additions & 24 deletions data_access_service/utils/date_time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
get_timestamps_boundary_values,
create_time_filter,
)
from aodn_cloud_optimised.lib.exceptions import DateOutOfRangeError

from data_access_service import init_log, Config
from dateutil.relativedelta import relativedelta
from data_access_service.core.api import BaseAPI
from data_access_service.core.constants import (
PARQUET_SUBSET_ROW_NUMBER,
MAX_PARQUET_SPLIT,
STR_TIME_UPPER_CASE,
)

YEAR_MONTH_DAY = "%Y-%m-%d"
Expand Down Expand Up @@ -90,13 +92,18 @@ def next_month_first_day(date: pd.Timestamp) -> pd.Timestamp:
)


def check_rows_with_date_range(ds: DataSource, date_ranges: list[dict]) -> list[dict]:
def check_rows_with_date_range(
api: BaseAPI, uuid: str, key: str, ds: DataSource, date_ranges: list[dict]
) -> list[dict]:
"""
Count number of rows with specific monthly range. ignore bbox.
If rows number exceeds PARQUET_SUBSET_ROW_NUMBER, split this date range with binary division, until rows number
under the safe threshold.
If rows number is 0, remove this date range from the list of date_ranges so that to skip further querying data.
Args:
api: BaseAPI instance for column name mapping
uuid: Dataset UUID for metadata lookup
key: Metadata key for column mapping
ds: DataSource fetched from cloud optimised library
date_ranges: List of monthly intervals as dictionaries with 'start_date' and 'end_date' as UTC timestamps in
'YYYY-MM-DD HH:MM:SS.fffffffff+00:00' format.
Expand All @@ -109,11 +116,14 @@ def check_rows_with_date_range(ds: DataSource, date_ranges: list[dict]) -> list[
return date_ranges

dataset = ds.dataset

checked_date_ranges = []
q = []

# go through monthly interval
time_dim = api.map_column_names(uuid=uuid, key=key, columns=[STR_TIME_UPPER_CASE])[
0
]

# Go through monthly interval
for date_range in date_ranges:
month_start, month_end = date_range["start_date"], date_range["end_date"]
if month_end < month_start:
Expand All @@ -132,28 +142,19 @@ def check_rows_with_date_range(ds: DataSource, date_ranges: list[dict]) -> list[

try:
time_filter = create_time_filter(
dataset, date_start=start_str, date_end=end_str
dataset=dataset,
date_start=start_str,
date_end=end_str,
time_varname=time_dim,
)
except DateOutOfRangeError:
time_filter = create_customised_time_filter(
dataset=dataset, start=start, end=end, time_varname=time_dim
)
except ValueError as e:
if "is out of range of dataset." in str(e):
try:
time_filter = create_customised_time_filter(
dataset, start=start, end=end
)
except ValueError as e:
log.warning(
f"Warning: Could not create time filter for range {start} to {end}: {e}"
)
continue
else:
raise ValueError(f"Could not create time filter: {e}")

try:
num_rows = dataset.count_rows(filter=time_filter)
except Exception as e:
log.warning(
f"Warning: Could not count rows for range {start} to {end}: {e}"
)
log.warning(f"Could not count rows for range {start} to {end}: {e}")
continue

if num_rows == 0:
Expand All @@ -174,7 +175,7 @@ def check_rows_with_date_range(ds: DataSource, date_ranges: list[dict]) -> list[
heapq.heappush(q, (split_mid, split_end, times_of_split + 1))

except Exception as e:
log.warning(f"Warning: Could not split range {start} to {end}: {e}")
log.warning(f"Could not split range {start} to {end}: {e}")
checked_date_ranges.append(
{
"start_date": start,
Expand All @@ -186,7 +187,10 @@ def check_rows_with_date_range(ds: DataSource, date_ranges: list[dict]) -> list[


def create_customised_time_filter(
dataset, start: pd.Timestamp, end: pd.Timestamp
dataset: pyarrow.dataset.Dataset,
start: pd.Timestamp,
end: pd.Timestamp,
time_varname: str = None,
) -> pyarrow.dataset.Expression:
"""
Creates a time filter using actual dataset temporal extent instead of partition boundaries.
Expand All @@ -199,6 +203,7 @@ def create_customised_time_filter(
dataset: PyArrow dataset object
start: Query start timestamp
end: Query end timestamp
time_varname: time variable name (e.g., "JULD", "TIME", "detection_timestamp") if provided, otherwise is None

Returns:
PyArrow filter expression
Expand All @@ -208,7 +213,7 @@ def create_customised_time_filter(
if end.tz is None:
end = ensure_timezone(end)

timestamp_start, timestamp_end = get_temporal_extent(dataset)
timestamp_start, timestamp_end = get_temporal_extent(dataset, time_varname)
timestamp_start = pd.to_datetime(timestamp_start)
timestamp_end = pd.to_datetime(timestamp_end)

Expand Down
4 changes: 2 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.10.14
- python==3.11.14
- pip<24.1
- virtualenv==20.28.1 # https://github.com/python-poetry/poetry/issues/10056#issuecomment-2594269592
- pip:
- poetry==2.0.1
- poetry==2.1.4
Loading