diff --git a/elt-common/pyproject.toml b/elt-common/pyproject.toml index 93522d6c..7ecd66df 100644 --- a/elt-common/pyproject.toml +++ b/elt-common/pyproject.toml @@ -8,7 +8,10 @@ version = "0.1.0" description = "A set of common utility routines for data pipelines." readme = "README.md" requires-python = ">=3.13" -dependencies = ["dlt[parquet,s3]~=1.15.0", "pyiceberg~=0.9.1"] +dependencies = [ + "dlt[parquet,s3]~=1.20.0", + "pyiceberg~=0.9.1", +] [project.optional-dependencies] diff --git a/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py b/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py index 2b6a978e..c6a21363 100644 --- a/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py +++ b/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py @@ -29,6 +29,7 @@ PrimitiveType, StringType, TimeType, + TimestampType, TimestamptzType, ) from pyiceberg.typedef import Identifier @@ -107,7 +108,10 @@ def dlt_type_to_iceberg(column: TColumnType) -> PrimitiveType: raise TypeError( f"Iceberg v1 & v2 does not support timestamps in '{TIMESTAMP_PRECISION_TO_UNIT[9]}' precision." # type:ignore ) - return TimestamptzType() + if column.get("timezone", True): + return TimestamptzType() + else: + return TimestampType() elif dlt_type == "binary": return BinaryType() else: @@ -137,9 +141,14 @@ def iceberg_to_dlt_type(field: NestedField) -> TColumnType: dlt_type["data_type"] = "date" elif isinstance(field_type, TimeType): dlt_type["data_type"] = "time" + elif isinstance(field_type, TimestampType): + dlt_type["data_type"] = "timestamp" + dlt_type["precision"] = 6 + dlt_type["timezone"] = False elif isinstance(field_type, TimestamptzType): dlt_type["data_type"] = "timestamp" dlt_type["precision"] = 6 + dlt_type["timezone"] = True elif isinstance(field_type, BinaryType): dlt_type["data_type"] = "binary" else: diff --git a/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py b/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py index 807da082..67f03584 100644 --- a/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py +++ b/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py @@ -25,6 +25,7 @@ def dlt_schema() -> PreparedTableSchema: row_id=TColumnSchema(data_type="bigint", nullable=False, primary_key=True), entry_name=TColumnSchema(data_type="text", nullable=False), entry_timestamp=TColumnSchema(data_type="timestamp", nullable=False), + entry_timestamp_notz=TColumnSchema(data_type="timestamp", nullable=False, timezone=False), entry_weight=TColumnSchema(data_type="double"), ) return PreparedTableSchema(name="table_name", columns=columns) @@ -81,6 +82,7 @@ def test_iceberg_field_to_dlt_type_gives_known_types(): for type_name in ( "StringType", "DoubleType", + "TimestampType", "TimestamptzType", "DateType", "TimeType", @@ -92,6 +94,8 @@ def test_iceberg_field_to_dlt_type_gives_known_types(): dlt_column = iceberg_to_dlt_type(field_required) assert dlt_column.get("data_type") in DLT_DATA_TYPES assert not dlt_column.get("nullable") + if type_name.startswith("Timestamp"): + assert dlt_column.get("timezone") == (type_name == "TimestamptzType") field_not_required = pyiceberg.types.NestedField( 1, "name", getattr(pyiceberg.types, type_name)(), required=False @@ -125,6 +129,7 @@ def test_create_iceberg_schema(dlt_schema: PreparedTableSchema): row_id=(pyiceberg.types.LongType, True), entry_name=(pyiceberg.types.StringType, True), entry_timestamp=(pyiceberg.types.TimestamptzType, True), + entry_timestamp_notz=(pyiceberg.types.TimestampType, True), entry_weight=(pyiceberg.types.DoubleType, False), ) for field in iceberg_schema.fields: diff --git a/elt-common/tests/unit_tests/dlt_sources/test_m365.py b/elt-common/tests/unit_tests/dlt_sources/test_m365.py index 1ea57412..42eb52b5 100644 --- a/elt-common/tests/unit_tests/dlt_sources/test_m365.py +++ b/elt-common/tests/unit_tests/dlt_sources/test_m365.py @@ -22,8 +22,7 @@ def test_extract_sharepoint_source_raises_error_without_config(pipeline: dlt.Pip config_exc = exc.value.exception assert isinstance(config_exc, ConfigFieldMissingException) - for field in ("tenant_id", "client_id", "client_secret"): - assert field in config_exc.fields + assert "credentials" in config_exc.fields # then other config with pytest.raises(PipelineStepFailed) as exc: diff --git a/elt-common/uv.lock b/elt-common/uv.lock index e897e9d6..dfa3da6f 100644 --- a/elt-common/uv.lock +++ b/elt-common/uv.lock @@ -302,17 +302,16 @@ wheels = [ [[package]] name = "dlt" -version = "1.15.0" +version = "1.20.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, { name = "fsspec" }, { name = "gitpython" }, { name = "giturlparse" }, - { name = "hexbytes" }, { name = "humanize" }, { name = "jsonpath-ng" }, - { name = "orjson", marker = "python_full_version >= '3.14' or sys_platform != 'emscripten'" }, + { name = "orjson", marker = "(python_full_version >= '3.14' and platform_python_implementation == 'PyPy') or (python_full_version >= '3.14' and sys_platform == 'emscripten') or (platform_python_implementation != 'PyPy' and sys_platform != 'emscripten')" }, { name = "packaging" }, { name = "pathvalidate" }, { name = "pendulum" }, @@ -332,9 +331,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "tzdata" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/78/62/0ec88b393e5e00a1b15bdc72216893a29261489475671c4d6d23fc2388a5/dlt-1.15.0.tar.gz", hash = "sha256:3dff1419649c984c183ba2ae53bfa60f4d0d7cf3590c1388997886dbe7bfee97", size = 790948, upload-time = "2025-08-05T17:48:18.886Z" } +sdist = { url = "https://files.pythonhosted.org/packages/76/29/a1ff457d07abf09930f317f8875cfe31dcc58cdc46a417adfffe04524433/dlt-1.20.0.tar.gz", hash = "sha256:be5feb887efe9cb33fbca1402c2069517b343b22edb4a09a389a6455ff68e4fb", size = 882567, upload-time = "2025-12-09T22:57:48.777Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b8/a8/37c8f5888cb41ab9460356c43be36f16cf987f9673274833e8777b847de0/dlt-1.15.0-py3-none-any.whl", hash = "sha256:fdc1e8a47b6daae9d7f235de1146427a40518960f46089c3ae2b3c7ce5f66cd9", size = 1009448, upload-time = "2025-08-05T17:48:15.23Z" }, + { url = "https://files.pythonhosted.org/packages/30/ad/5e75d9b5c63c5191bf520e8e8baf68357f8a336dd3cd7188c8db88a2ea39/dlt-1.20.0-py3-none-any.whl", hash = "sha256:894968cd485474f438753cd5f6ab4f43ae1dab6bc06e746abc6c70fd5754f350", size = 1120030, upload-time = "2025-12-09T22:57:46.507Z" }, ] [package.optional-dependencies] @@ -384,7 +383,7 @@ dev = [ requires-dist = [ { name = "authlib", marker = "extra == 'm365'", specifier = "~=1.6.1" }, { name = "click", marker = "extra == 'iceberg-maintenance'", specifier = "~=8.2.1" }, - { name = "dlt", extras = ["parquet", "s3"], specifier = "~=1.15.0" }, + { name = "dlt", extras = ["parquet", "s3"], specifier = "~=1.20.0" }, { name = "httpx", marker = "extra == 'm365'", specifier = "~=0.28.1" }, { name = "pyiceberg", specifier = "~=0.9.1" }, { name = "sqlalchemy", marker = "extra == 'iceberg-maintenance'", specifier = "~=2.0.43" }, @@ -505,6 +504,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, { url = "https://files.pythonhosted.org/packages/a2/15/0d5e4e1a66fab130d98168fe984c509249c833c1a3c16806b90f253ce7b9/greenlet-3.2.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:d25c5091190f2dc0eaa3f950252122edbbadbb682aa7b1ef2f8af0f8c0afefae", size = 1149210, upload-time = "2025-08-07T13:18:24.072Z" }, + { url = "https://files.pythonhosted.org/packages/1c/53/f9c440463b3057485b8594d7a638bed53ba531165ef0ca0e6c364b5cc807/greenlet-3.2.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6e343822feb58ac4d0a1211bd9399de2b3a04963ddeec21530fc426cc121f19b", size = 1564759, upload-time = "2025-11-04T12:42:19.395Z" }, + { url = "https://files.pythonhosted.org/packages/47/e4/3bb4240abdd0a8d23f4f88adec746a3099f0d86bfedb623f063b2e3b4df0/greenlet-3.2.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ca7f6f1f2649b89ce02f6f229d7c19f680a6238af656f61e0115b24857917929", size = 1634288, upload-time = "2025-11-04T12:42:21.174Z" }, { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685, upload-time = "2025-08-07T13:24:38.824Z" }, { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, @@ -512,6 +513,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, + { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, + { url = "https://files.pythonhosted.org/packages/0d/da/343cd760ab2f92bac1845ca07ee3faea9fe52bee65f7bcb19f16ad7de08b/greenlet-3.2.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:015d48959d4add5d6c9f6c5210ee3803a830dce46356e3bc326d6776bde54681", size = 1680760, upload-time = "2025-11-04T12:42:25.341Z" }, { url = "https://files.pythonhosted.org/packages/e3/a5/6ddab2b4c112be95601c13428db1d8b6608a8b6039816f2ba09c346c08fc/greenlet-3.2.4-cp314-cp314-win_amd64.whl", hash = "sha256:e37ab26028f12dbb0ff65f29a8d3d44a765c61e729647bf2ddfbbed621726f01", size = 303425, upload-time = "2025-08-07T13:32:27.59Z" }, ] @@ -524,15 +527,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, ] -[[package]] -name = "hexbytes" -version = "1.3.1" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/7f/87/adf4635b4b8c050283d74e6db9a81496063229c9263e6acc1903ab79fbec/hexbytes-1.3.1.tar.gz", hash = "sha256:a657eebebdfe27254336f98d8af6e2236f3f83aed164b87466b6cf6c5f5a4765", size = 8633, upload-time = "2025-05-14T16:45:17.5Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/8d/e0/3b31492b1c89da3c5a846680517871455b30c54738486fc57ac79a5761bd/hexbytes-1.3.1-py3-none-any.whl", hash = "sha256:da01ff24a1a9a2b1881c4b85f0e9f9b0f51b526b379ffa23832ae7899d29c2c7", size = 5074, upload-time = "2025-05-14T16:45:16.179Z" }, -] - [[package]] name = "httpcore" version = "1.0.9" diff --git a/warehouses/accelerator/extract_load/opralogweb/extract_and_load.py b/warehouses/accelerator/extract_load/opralogweb/extract_and_load.py index f6b9f4c3..15c84ec1 100755 --- a/warehouses/accelerator/extract_load/opralogweb/extract_and_load.py +++ b/warehouses/accelerator/extract_load/opralogweb/extract_and_load.py @@ -21,11 +21,11 @@ from dlt.sources.sql_database import sql_table from html2text import html2text import pyarrow as pa -from sqlalchemy.sql import Select +import sqlalchemy as sa import elt_common.cli as cli_utils -OPRALOG_EPOCH = dt.datetime(2017, 4, 25, 0, 0, 0, tzinfo=dt.UTC) +OPRALOG_EPOCH = dt.datetime(2017, 4, 25, 0, 0, 0) SQL_TABLE_KWARGS = dict( schema=dlt.config.value, backend="pyarrow", @@ -36,8 +36,19 @@ EXTRACTED_ENTRY_IDS: List[int] = [] +def with_resource_limit( + resource: DltResource, limit_max_items: int | None = None +) -> DltResource: + if limit_max_items is not None: + resource.add_limit(limit_max_items) + + return resource + + @dlt.source() -def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]: +def opralogwebdb( + chunk_size: int = 50000, limit_max_items: int | None = None +) -> Generator[DltResource]: """Opralog usage began in 04/2017. We split tables into two categories: - append-only tables: previous records are never updated, use 'append' write_disposition @@ -48,6 +59,8 @@ def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]: were updated. We use the 'LastChangedDate' column of 'Entries' to find the list of new or updated EntryId values and load the MoreEntryColumn records for these Entries into the destination. + + `chunk_size` and `limit_max_items` are primarily used for testing and debugging """ tables_append_records = { @@ -65,13 +78,13 @@ def opralogwebdb(chunk_size: int = 50000) -> Generator[DltResource]: chunk_size=chunk_size, **SQL_TABLE_KWARGS, ) - yield resource + yield with_resource_limit(resource, limit_max_items) # Now the Entries table, with incremental cursor, that tells us what EntryIds have been updated - yield entries_table(chunk_size) + yield with_resource_limit(entries_table(chunk_size), limit_max_items) # Finally the MoreEntryColumns table based on the loaded EntryIds - yield more_entry_columns_table(chunk_size) + yield with_resource_limit(more_entry_columns_table(chunk_size), limit_max_items) def entries_table(chunk_size: int) -> DltResource: @@ -123,7 +136,7 @@ def store_extracted_entry_ids(table: pa.Table) -> pa.Table: def more_entry_columns_table(chunk_size: int) -> DltResource: """Return a resource wrapper for the MoreEntryColumns table""" - def more_entry_columns_query(query: Select, table): + def more_entry_columns_query(query: sa.Select, table): return query.filter(table.c.EntryId.in_(EXTRACTED_ENTRY_IDS)) resource = sql_table(