Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tsn_adapters/blocks/tn_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _retry_condition(task: Task[Any, Any], task_run: TaskRun, state: State[Any])
except TNNodeNetworkError:
# Always retry on network errors
return True
except Exception as exc:
except Exception:
# For non-network errors, use the task_run's run_count to decide.
if task_run.run_count <= max_other_error_retries:
return True
Expand Down
1 change: 1 addition & 0 deletions src/tsn_adapters/common/trufnetwork/models/tn_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Config(pa.DataFrameModel.Config):
coerce = True
strict = "filter"


class StreamLocatorModel(pa.DataFrameModel):
stream_id: Series[str]
data_provider: Series[pa.String] = pa.Field(
Expand Down
24 changes: 12 additions & 12 deletions src/tsn_adapters/flows/fmp/historical_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,44 +145,44 @@ def get_earliest_data_date(tn_block: TNAccessBlock, stream_id: str) -> Optional[
raise TNQueryError(str(e)) from e


def ensure_unix_timestamp(dt: pd.Series) -> pd.Series: # type: ignore -> pd Series doesn't fit with any
def ensure_unix_timestamp(dt: pd.Series) -> pd.Series: # type: ignore -> pd Series doesn't fit with any
"""Convert datetime series to Unix timestamp (seconds since epoch).

This function handles various datetime formats and ensures the output
is always in seconds since epoch (Unix timestamp). It explicitly handles
the conversion from nanoseconds and validates the output range.

Args:
dt: A pandas Series containing datetime data in various formats

Returns:
A pandas Series containing Unix timestamps (seconds since epoch)

Raises:
ValueError: If the resulting timestamps are outside the valid range
or if the conversion results in unexpected units
"""
# Convert to datetime if not already
if not pd.api.types.is_datetime64_any_dtype(dt):
dt = pd.to_datetime(dt, utc=True)

# Get nanoseconds since epoch
ns_timestamps = dt.astype('int64')
ns_timestamps = dt.astype("int64")

# Convert to seconds (integer division by 1e9 for nanoseconds)
second_timestamps = ns_timestamps // 10**9

# Validate the range (basic sanity check)
# Unix timestamps should be between 1970 and 2100 approximately
min_valid_timestamp = 0 # 1970-01-01
max_valid_timestamp = 4102444800 # 2100-01-01

if (second_timestamps < min_valid_timestamp).any() or (second_timestamps > max_valid_timestamp).any():
raise ValueError(
f"Converted timestamps outside valid range: "
f"min={second_timestamps.min()}, max={second_timestamps.max()}"
)

return second_timestamps


Expand Down Expand Up @@ -292,7 +292,7 @@ def run_ticker_pipeline(
Exceptions during processing are logged and raised.
"""

def process_ticker(row: pd.Series) -> TickerResult: # type: ignore -> pd Series doesn't fit with any
def process_ticker(row: pd.Series) -> TickerResult: # type: ignore -> pd Series doesn't fit with any
"""Process a single ticker row."""
symbol = row["source_id"]
stream_id = row["stream_id"]
Expand Down
19 changes: 9 additions & 10 deletions src/tsn_adapters/flows/fmp/real_time_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,22 @@ def convert_quotes_to_tn_data(

return DataFrame[TnDataRowModel](result_df)


def ensure_unix_timestamp(time: int) -> int:
"""
Ensure the timestamp is a valid unix timestamp (seconds since epoch).

This function validates and converts timestamps to ensure they are in
seconds since epoch format. It handles cases where the input might be
in milliseconds or microseconds.

Args:
time: Integer timestamp that might be in seconds, milliseconds,
microseconds, or nanoseconds since epoch

Returns:
Integer timestamp in seconds since epoch

Raises:
ValueError: If the timestamp is invalid or outside the reasonable range
"""
Expand All @@ -121,7 +122,7 @@ def ensure_unix_timestamp(time: int) -> int:
# Define valid range for seconds since epoch
min_valid_timestamp = 0 # 1970-01-01
max_valid_timestamp = 4102444800 # 2100-01-01

# Convert to seconds if in a larger unit
converted_time = time
if time > max_valid_timestamp:
Expand All @@ -134,13 +135,11 @@ def ensure_unix_timestamp(time: int) -> int:
converted_time = time // 10**3
else: # assume seconds but with some future date
converted_time = time // 10**9 # aggressive conversion to be safe

# Validate the range after conversion
if converted_time < min_valid_timestamp or converted_time > max_valid_timestamp:
raise ValueError(
f"Timestamp outside valid range (1970-2100): {converted_time}"
)

raise ValueError(f"Timestamp outside valid range (1970-2100): {converted_time}")

return converted_time


Expand Down
4 changes: 2 additions & 2 deletions src/tsn_adapters/flows/stream_deploy_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
concurrency limiter and then waits for confirmation using tn-read concurrency via TNAccessBlock.
"""

import time
from typing import Literal

from pandera.typing import DataFrame
Expand All @@ -32,6 +31,7 @@ class DeployStreamResult(TypedDict):
stream_id: str
status: Literal["deployed", "skipped"]


@task(tags=["tn", "tn-write"], retries=3, retry_delay_seconds=5)
def check_and_deploy_stream(stream_id: str, tna_block: TNAccessBlock, is_unix: bool = False) -> DeployStreamResult:
"""
Expand Down Expand Up @@ -109,7 +109,7 @@ def deploy_streams_flow(
logger.info(f"Found {len(stream_ids)} stream descriptors.")

# we will deploy in batches of 500 to avoid infinite threads creation
batches = [stream_ids[i:i+batch_size] for i in range(0, len(stream_ids), batch_size)]
batches = [stream_ids[i : i + batch_size] for i in range(0, len(stream_ids), batch_size)]

aggregated_results: list[DeployStreamResult] = []
for batch in batches[start_from_batch:]:
Expand Down
2 changes: 2 additions & 0 deletions src/tsn_adapters/tasks/argentina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from tsn_adapters.tasks.argentina.models.sepa import SepaWebsiteDataItem
from tsn_adapters.tasks.argentina.models.sepa.website_item import SepaWebsiteScraper
from tsn_adapters.tasks.argentina.utils.processors import SepaDirectoryProcessor
from tsn_adapters.tasks.argentina.errors import errors

__all__ = [
"SepaWebsiteDataItem",
"SepaWebsiteScraper",
"SepaDirectoryProcessor",
"errors",
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
"""

from tsn_adapters.tasks.argentina.aggregate.uncategorized import get_uncategorized_products
from tsn_adapters.tasks.argentina.errors import (
EmptyCategoryMapError,
ErrorAccumulator,
UncategorizedProductsError,
)
from tsn_adapters.tasks.argentina.types import AggregatedPricesDF, AvgPriceDF, CategoryMapDF, UncategorizedDF
from tsn_adapters.utils.logging import get_logger_safe

Expand Down Expand Up @@ -40,11 +45,16 @@ def aggregate_prices_by_category(
1. Merges product prices with their category assignments
2. Groups the data by category and date
3. Computes the mean price for each category-date combination

Raises
------
EmptyCategoryMapError
If the product category mapping DataFrame is empty
"""
# Input validation and logging
if product_category_map_df.empty:
logger.error("Product category mapping DataFrame is empty")
raise ValueError("Cannot aggregate prices: product category mapping is empty")
raise EmptyCategoryMapError(url="<unknown>")

if avg_price_product_df.empty:
logger.warning("Average price product DataFrame is empty")
Expand All @@ -56,17 +66,6 @@ def aggregate_prices_by_category(
f"and {len(avg_price_product_df)} price records"
)

# Check for products without categories before merge
unique_products_with_prices = set(avg_price_product_df["id_producto"].unique())
unique_products_with_categories = set(product_category_map_df["id_producto"].unique())

products_without_categories = unique_products_with_prices - unique_products_with_categories
if products_without_categories:
logger.warning(
f"Found {len(products_without_categories)} products with prices but no category mapping. "
"These will be excluded from aggregation."
)

# Merge the product categories with prices
merged_df = avg_price_product_df.merge(
product_category_map_df,
Expand Down Expand Up @@ -94,4 +93,13 @@ def aggregate_prices_by_category(

uncategorized_df = get_uncategorized_products(avg_price_product_df, product_category_map_df)

if not uncategorized_df.empty:
logger.warning(f"Found {len(uncategorized_df)} uncategorized products")
accumulator = ErrorAccumulator.get_or_create_from_context()
accumulator.add_error(
UncategorizedProductsError(
count=len(uncategorized_df),
)
)

return AggregatedPricesDF(aggregated_df), uncategorized_df
3 changes: 0 additions & 3 deletions src/tsn_adapters/tasks/argentina/aggregate/uncategorized.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,4 @@ def get_uncategorized_products(
"""
diff_df = data[~data["id_producto"].isin(category_map["id_producto"])]

# get data without id_producto (=null)
data[data["id_producto"].isnull()]

return UncategorizedDF(diff_df)
33 changes: 33 additions & 0 deletions src/tsn_adapters/tasks/argentina/errors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Error handling for Argentina SEPA processing.
"""

from tsn_adapters.tasks.argentina.errors.accumulator import ErrorAccumulator
from tsn_adapters.tasks.argentina.errors.errors import (
AccountableRole,
ArgentinaSEPAError,
DateMismatchError,
EmptyCategoryMapError,
InvalidCategorySchemaError,
InvalidCSVSchemaError,
InvalidDateFormatError,
InvalidStructureZIPError,
InvalidProductsError,
MissingProductosCSVError,
UncategorizedProductsError,
)

__all__ = [
"ErrorAccumulator",
"AccountableRole",
"ArgentinaSEPAError",
"DateMismatchError",
"EmptyCategoryMapError",
"InvalidCategorySchemaError",
"InvalidCSVSchemaError",
"InvalidDateFormatError",
"InvalidStructureZIPError",
"InvalidProductsError",
"MissingProductosCSVError",
"UncategorizedProductsError",
]
71 changes: 71 additions & 0 deletions src/tsn_adapters/tasks/argentina/errors/accumulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any

from prefect.artifacts import create_markdown_artifact

from tsn_adapters.tasks.argentina.errors.errors import ArgentinaSEPAError

error_ctx = ContextVar["ErrorAccumulator | None"]("argentina_sepa_errors")


class ErrorAccumulator:
def __init__(self):
self.errors: list[ArgentinaSEPAError] = []

def add_error(self, error: ArgentinaSEPAError):
self.errors.append(error)

def model_dump(self):
return [error.to_dict() for error in self.errors]

def model_load(self, data: list[dict[str, Any]]):
self.errors = [ArgentinaSEPAError(**error) for error in data]

@classmethod
def get_or_create_from_context(cls) -> "ErrorAccumulator":
errors = error_ctx.get()
if errors is None:
errors = ErrorAccumulator()
errors.set_to_context()
return errors

def set_to_context(self):
error_ctx.set(self)

@contextmanager
def error_collection():
"""Context manager for collecting errors during processing."""
accumulator = ErrorAccumulator()
accumulator.set_to_context()
try:
yield accumulator
except ArgentinaSEPAError as e:
accumulator.add_error(e)
raise
finally:
# Create error summary if there are errors
if accumulator.errors:
error_summary = [
"# Processing Errors\n",
"The following errors occurred during processing:\n",
]
for error in accumulator.errors:
error_dict = error.to_dict()
error_summary.extend(
[
f"\n## {error_dict['code']}: {error_dict['message']}\n",
f"Responsibility: {error_dict['responsibility'].value}\n",
"Context:\n",
"```json\n",
f"{error_dict['context']}\n",
"```\n",
]
)

# Create markdown artifact with the error summary
create_markdown_artifact(
key="processing-errors",
markdown="".join(error_summary),
description="Errors encountered during SEPA data processing",
)
47 changes: 47 additions & 0 deletions src/tsn_adapters/tasks/argentina/errors/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from typing import Any

# A context variable to hold a dictionary of key/value pairs for error context.
_error_context_var = contextvars.ContextVar("error_context", default={})


def set_error_context(key: str, value: Any) -> None:
"""
Set a key/value pair in the global error context.
"""
current = _error_context_var.get().copy()
current[key] = value
_error_context_var.set(current)


def get_error_context() -> dict[str, Any]:
"""
Retrieve the current global error context.
"""
return _error_context_var.get()


def clear_error_context() -> None:
"""
Clear the global error context.
"""
_error_context_var.set({})


@contextmanager
def error_context(**kwargs: Any) -> Generator[None, None, None]:
"""
Context manager to temporarily update the error context.
Example:
with error_context(store_id="STORE-123", user_id="USER-456"):
...your logic...
"""
current = get_error_context().copy()
current.update(kwargs)
token = _error_context_var.set(current)
try:
yield
finally:
_error_context_var.reset(token)
Loading