Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion elt-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ version = "0.1.0"
description = "A set of common utility routines for data pipelines."
readme = "README.md"
requires-python = ">=3.13"
dependencies = ["dlt[parquet,s3]~=1.15.0", "pyiceberg~=0.9.1"]
dependencies = [
"dlt[parquet,s3]~=1.20.0",
"pyiceberg~=0.9.1",
]


[project.optional-dependencies]
Expand Down
11 changes: 10 additions & 1 deletion elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
PrimitiveType,
StringType,
TimeType,
TimestampType,
TimestamptzType,
)
from pyiceberg.typedef import Identifier
Expand Down Expand Up @@ -107,7 +108,10 @@ def dlt_type_to_iceberg(column: TColumnType) -> PrimitiveType:
raise TypeError(
f"Iceberg v1 & v2 does not support timestamps in '{TIMESTAMP_PRECISION_TO_UNIT[9]}' precision." # type:ignore
)
return TimestamptzType()
if column.get("timezone", True):
return TimestamptzType()
else:
return TimestampType()
elif dlt_type == "binary":
return BinaryType()
else:
Expand Down Expand Up @@ -137,9 +141,14 @@ def iceberg_to_dlt_type(field: NestedField) -> TColumnType:
dlt_type["data_type"] = "date"
elif isinstance(field_type, TimeType):
dlt_type["data_type"] = "time"
elif isinstance(field_type, TimestampType):
dlt_type["data_type"] = "timestamp"
dlt_type["precision"] = 6
dlt_type["timezone"] = False
elif isinstance(field_type, TimestamptzType):
dlt_type["data_type"] = "timestamp"
dlt_type["precision"] = 6
dlt_type["timezone"] = True
elif isinstance(field_type, BinaryType):
dlt_type["data_type"] = "binary"
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def dlt_schema() -> PreparedTableSchema:
row_id=TColumnSchema(data_type="bigint", nullable=False, primary_key=True),
entry_name=TColumnSchema(data_type="text", nullable=False),
entry_timestamp=TColumnSchema(data_type="timestamp", nullable=False),
entry_timestamp_notz=TColumnSchema(data_type="timestamp", nullable=False, timezone=False),
entry_weight=TColumnSchema(data_type="double"),
)
return PreparedTableSchema(name="table_name", columns=columns)
Expand Down Expand Up @@ -81,6 +82,7 @@ def test_iceberg_field_to_dlt_type_gives_known_types():
for type_name in (
"StringType",
"DoubleType",
"TimestampType",
"TimestamptzType",
"DateType",
"TimeType",
Expand All @@ -92,6 +94,8 @@ def test_iceberg_field_to_dlt_type_gives_known_types():
dlt_column = iceberg_to_dlt_type(field_required)
assert dlt_column.get("data_type") in DLT_DATA_TYPES
assert not dlt_column.get("nullable")
if type_name.startswith("Timestamp"):
assert dlt_column.get("timezone") == (type_name == "TimestamptzType")

field_not_required = pyiceberg.types.NestedField(
1, "name", getattr(pyiceberg.types, type_name)(), required=False
Expand Down Expand Up @@ -125,6 +129,7 @@ def test_create_iceberg_schema(dlt_schema: PreparedTableSchema):
row_id=(pyiceberg.types.LongType, True),
entry_name=(pyiceberg.types.StringType, True),
entry_timestamp=(pyiceberg.types.TimestamptzType, True),
entry_timestamp_notz=(pyiceberg.types.TimestampType, True),
entry_weight=(pyiceberg.types.DoubleType, False),
)
for field in iceberg_schema.fields:
Expand Down
3 changes: 1 addition & 2 deletions elt-common/tests/unit_tests/dlt_sources/test_m365.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ def test_extract_sharepoint_source_raises_error_without_config(pipeline: dlt.Pip

config_exc = exc.value.exception
assert isinstance(config_exc, ConfigFieldMissingException)
for field in ("tenant_id", "client_id", "client_secret"):
assert field in config_exc.fields
assert "credentials" in config_exc.fields

# then other config
with pytest.raises(PipelineStepFailed) as exc:
Expand Down
24 changes: 9 additions & 15 deletions elt-common/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions warehouses/accelerator/extract_load/opralogweb/extract_and_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from dlt.sources.sql_database import sql_table
from html2text import html2text
import pyarrow as pa
from sqlalchemy.sql import Select
import sqlalchemy as sa

import elt_common.cli as cli_utils

OPRALOG_EPOCH = dt.datetime(2017, 4, 25, 0, 0, 0, tzinfo=dt.UTC)
OPRALOG_EPOCH = dt.datetime(2017, 4, 25, 0, 0, 0)
SQL_TABLE_KWARGS = dict(
schema=dlt.config.value,
backend="pyarrow",
Expand All @@ -36,8 +36,19 @@
EXTRACTED_ENTRY_IDS: List[int] = []


def with_resource_limit(
resource: DltResource, limit_max_items: int | None = None
) -> DltResource:
if limit_max_items is not None:
resource.add_limit(limit_max_items)

return resource


@dlt.source()
def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]:
def opralogwebdb(
chunk_size: int = 50000, limit_max_items: int | None = None
) -> Generator[DltResource]:
"""Opralog usage began in 04/2017. We split tables into two categories:

- append-only tables: previous records are never updated, use 'append' write_disposition
Expand All @@ -48,6 +59,8 @@ def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]:
were updated. We use the 'LastChangedDate' column of 'Entries' to find the list of
new or updated EntryId values and load the MoreEntryColumn records for these Entries
into the destination.

`chunk_size` and `limit_max_items` are primarily used for testing and debugging
"""

tables_append_records = {
Expand All @@ -65,13 +78,13 @@ def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]:
chunk_size=chunk_size,
**SQL_TABLE_KWARGS,
)
yield resource
yield with_resource_limit(resource, limit_max_items)

# Now the Entries table, with incremental cursor, that tells us what EntryIds have been updated
yield entries_table(chunk_size)
yield with_resource_limit(entries_table(chunk_size), limit_max_items)

# Finally the MoreEntryColumns table based on the loaded EntryIds
yield more_entry_columns_table(chunk_size)
yield with_resource_limit(more_entry_columns_table(chunk_size), limit_max_items)


def entries_table(chunk_size: int) -> DltResource:
Expand Down Expand Up @@ -123,7 +136,7 @@ def store_extracted_entry_ids(table: pa.Table) -> pa.Table:
def more_entry_columns_table(chunk_size: int) -> DltResource:
"""Return a resource wrapper for the MoreEntryColumns table"""

def more_entry_columns_query(query: Select, table):
def more_entry_columns_query(query: sa.Select, table):
return query.filter(table.c.EntryId.in_(EXTRACTED_ENTRY_IDS))

resource = sql_table(
Expand Down