From 61f71244905b39fc0f665b502c821a37bb8f2bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 16 Jul 2024 14:05:31 -0600 Subject: [PATCH] feat: Emit Postgres schema as `_sdc_postgres_schema` --- tap_postgres/client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index cd31eaa9..d8461d4f 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -199,6 +199,13 @@ class PostgresStream(SQLStream): # JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY + @functools.cached_property + def schema(self) -> dict: + """Override schema adding _sdc columns.""" + schema_dict = self._singer_catalog_entry.schema.to_dict() + schema_dict["properties"]["_sdc_postgres_schema"] = {"type": ["string", "null"]} + return schema_dict + def max_record_count(self) -> int | None: """Return the maximum number of records to fetch in a single query.""" return self.config.get("max_record_count") @@ -264,6 +271,7 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]: if transformed_record is None: # Record filtered out during post_process() continue + transformed_record["_sdc_postgres_schema"] = table.schema yield transformed_record