From f1cdc5246c7e893446fba293e10d5e81ec873527 Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Wed, 17 Jan 2024 06:48:29 -0800 Subject: [PATCH 1/8] add basic support for iceberg string and binary types --- sqlalchemy_redshift/dialect.py | 47 ++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index ce755ea6..5d365443 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -290,6 +290,9 @@ class RedshiftImpl(postgresql.PostgresqlImpl): """ +MAX_VARCHAR_LENGTH = 65535 + + class RedshiftTypeEngine(TypeEngine): def _default_dialect(self, default=None): @@ -410,6 +413,47 @@ def get_dbapi_type(self, dbapi): return dbapi.HLLSKETCH +class IcebergString(sa.types.TypeDecorator): + impl = sa.types.String + + def load_dialect_impl(self, dialect): + return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH) + + +class IcebergBinary(sa.types.TypeDecorator): + impl = sa.types.LargeBinary + + def process_bind_param(self, value, dialect): + if value is None: + return value + + encoding = getattr(dialect, 'encoding', 'utf-8') + + if isinstance(value, bytes): + return value + + if isinstance(value, str): + return value.encode(encoding) + + return value + + def result_processor(self, dialect, coltype): + def process(value): + if value is None: + return value + + if isinstance(value, bytes): + return value + + if isinstance(value, str): + encoding = getattr(dialect, 'encoding', 'utf-8') + return value.encode(encoding) + + raise TypeError(f"Unexpected type for value in result_processor.process: {type(value)}") + + return process + + # Mapping for database schema inspection of Amazon Redshift datatypes REDSHIFT_ISCHEMA_NAMES = { "geometry": GEOMETRY, @@ -417,6 +461,9 @@ def get_dbapi_type(self, dbapi): "time with time zone": TIMETZ, "timestamp with time zone": TIMESTAMPTZ, "hllsketch": HLLSKETCH, + # iceberg types + "string": IcebergString, + "binary": IcebergBinary, } From 39b95520cf9d7356c0b56188a70702bc2ea2a5d2 Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Wed, 17 Jan 2024 08:18:36 -0800 Subject: [PATCH 2/8] formatting and tests --- sqlalchemy_redshift/dialect.py | 7 +++++-- tests/test_column_loading.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index 5d365443..9ab74a24 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -422,7 +422,7 @@ def load_dialect_impl(self, dialect): class IcebergBinary(sa.types.TypeDecorator): impl = sa.types.LargeBinary - + def process_bind_param(self, value, dialect): if value is None: return value @@ -449,7 +449,10 @@ def process(value): encoding = getattr(dialect, 'encoding', 'utf-8') return value.encode(encoding) - raise TypeError(f"Unexpected type for value in result_processor.process: {type(value)}") + raise TypeError( + "Unexpected type for value in result_processor.process: ", + type(value) + ) return process diff --git a/tests/test_column_loading.py b/tests/test_column_loading.py index 349a31cc..cf5408a3 100644 --- a/tests/test_column_loading.py +++ b/tests/test_column_loading.py @@ -5,7 +5,8 @@ from sqlalchemy.types import NullType, VARCHAR from sqlalchemy_redshift.dialect import ( - RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi + RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi, + IcebergString, IcebergBinary ) sa_version = Version(sa.__version__) @@ -47,3 +48,31 @@ def test_varchar_as_nulltype(self): identity=None ) assert isinstance(varchar_info['type'], VARCHAR) + + iceberg_string_info = dialect._get_column_info( + name='Iceberg String Column', + format_type='string', + default=None, + notnull=False, + domains={}, + enums=[], + schema='default', + encode='', + comment='test column', + identity=None + ) + assert isinstance(iceberg_string_info['type'], IcebergString) + + iceberg_binary_info = dialect._get_column_info( + name='Iceberg Binary Column', + format_type='binary', + default=None, + notnull=False, + domains={}, + enums=[], + schema='default', + encode='', + comment='test column', + identity=None + ) + assert isinstance(iceberg_binary_info['type'], IcebergBinary) From 3839caf3198d63a9a0465a139bb5a86675fb3fed Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Wed, 17 Jan 2024 08:59:11 -0800 Subject: [PATCH 3/8] Column names should be uppercase since they are database-specific --- sqlalchemy_redshift/dialect.py | 8 ++++---- tests/test_column_loading.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index 9ab74a24..b5c25e52 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -413,14 +413,14 @@ def get_dbapi_type(self, dbapi): return dbapi.HLLSKETCH -class IcebergString(sa.types.TypeDecorator): +class ICEBERG_STRING(sa.types.TypeDecorator): impl = sa.types.String def load_dialect_impl(self, dialect): return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH) -class IcebergBinary(sa.types.TypeDecorator): +class ICEBERG_BINARY(sa.types.TypeDecorator): impl = sa.types.LargeBinary def process_bind_param(self, value, dialect): @@ -465,8 +465,8 @@ def process(value): "timestamp with time zone": TIMESTAMPTZ, "hllsketch": HLLSKETCH, # iceberg types - "string": IcebergString, - "binary": IcebergBinary, + "string": ICEBERG_STRING, + "binary": ICEBERG_BINARY, } diff --git a/tests/test_column_loading.py b/tests/test_column_loading.py index cf5408a3..ccdd35fb 100644 --- a/tests/test_column_loading.py +++ b/tests/test_column_loading.py @@ -6,7 +6,7 @@ from sqlalchemy_redshift.dialect import ( RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi, - IcebergString, IcebergBinary + ICEBERG_STRING, ICEBERG_BINARY ) sa_version = Version(sa.__version__) @@ -61,7 +61,7 @@ def test_varchar_as_nulltype(self): comment='test column', identity=None ) - assert isinstance(iceberg_string_info['type'], IcebergString) + assert isinstance(iceberg_string_info['type'], ICEBERG_STRING) iceberg_binary_info = dialect._get_column_info( name='Iceberg Binary Column', @@ -75,4 +75,4 @@ def test_varchar_as_nulltype(self): comment='test column', identity=None ) - assert isinstance(iceberg_binary_info['type'], IcebergBinary) + assert isinstance(iceberg_binary_info['type'], ICEBERG_BINARY) From e70fa20acc96d6b8e37a13a6d1534ed6bdabbf42 Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Wed, 17 Jan 2024 09:02:50 -0800 Subject: [PATCH 4/8] consistency --- sqlalchemy_redshift/dialect.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index b5c25e52..ef9f9e5e 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -427,12 +427,11 @@ def process_bind_param(self, value, dialect): if value is None: return value - encoding = getattr(dialect, 'encoding', 'utf-8') - if isinstance(value, bytes): return value if isinstance(value, str): + encoding = getattr(dialect, 'encoding', 'utf-8') return value.encode(encoding) return value From 26ffea3888ea279378395add67921dea7951986f Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Thu, 18 Jan 2024 12:54:42 -0800 Subject: [PATCH 5/8] add python_type --- sqlalchemy_redshift/dialect.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index ef9f9e5e..adebf6e7 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -416,13 +416,20 @@ def get_dbapi_type(self, dbapi): class ICEBERG_STRING(sa.types.TypeDecorator): impl = sa.types.String + @property + def python_type(self): + return str + def load_dialect_impl(self, dialect): return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH) - class ICEBERG_BINARY(sa.types.TypeDecorator): impl = sa.types.LargeBinary + @property + def python_type(self): + return bytes + def process_bind_param(self, value, dialect): if value is None: return value From 487173653d91c7c50acea19647873a697e0d319f Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Fri, 19 Jan 2024 06:53:07 -0800 Subject: [PATCH 6/8] Update CHANGES.rst --- CHANGES.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 3eea33d0..73aa19a0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,7 +1,8 @@ 0.8.15 (unreleased) ------------------- -- Nothing changed yet. +- Add support for Iceberg string and binary types + (`Pull #297 `_) 0.8.14 (2023-04-07) From 7a57ce200692b3f0d6c8db4501ba5b4d3727e3ef Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Fri, 19 Jan 2024 07:05:41 -0800 Subject: [PATCH 7/8] Add docstrings --- sqlalchemy_redshift/dialect.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index adebf6e7..7ebf4f7e 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -414,6 +414,13 @@ def get_dbapi_type(self, dbapi): class ICEBERG_STRING(sa.types.TypeDecorator): + """ + A custom SQLAlchemy type decorator for representing iceberg strings. + + This type decorator is used to represent iceberg strings in a + Redshift/PostgreSQL database using SQLAlchemy. + """ + impl = sa.types.String @property @@ -423,13 +430,22 @@ def python_type(self): def load_dialect_impl(self, dialect): return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH) + class ICEBERG_BINARY(sa.types.TypeDecorator): + """ + A custom SQLAlchemy type decorator for storing/querying binary data in an + Iceberg database. + + This type decorator is used to represent iceberg binary data in a + Redshift/PostgreSQL database using SQLAlchemy. + """ + impl = sa.types.LargeBinary @property def python_type(self): return bytes - + def process_bind_param(self, value, dialect): if value is None: return value From 11a7d323062183576d0cd0c2912b8051de42e3c8 Mon Sep 17 00:00:00 2001 From: "A. Fox" Date: Fri, 19 Jan 2024 07:09:18 -0800 Subject: [PATCH 8/8] update docstrings --- sqlalchemy_redshift/dialect.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index 7ebf4f7e..16243fb5 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -415,9 +415,7 @@ def get_dbapi_type(self, dbapi): class ICEBERG_STRING(sa.types.TypeDecorator): """ - A custom SQLAlchemy type decorator for representing iceberg strings. - - This type decorator is used to represent iceberg strings in a + ICEBERG_STRING is used to represent iceberg strings in a Redshift/PostgreSQL database using SQLAlchemy. """ @@ -433,10 +431,7 @@ def load_dialect_impl(self, dialect): class ICEBERG_BINARY(sa.types.TypeDecorator): """ - A custom SQLAlchemy type decorator for storing/querying binary data in an - Iceberg database. - - This type decorator is used to represent iceberg binary data in a + ICEBERG_BINARY is used to represent iceberg binary data in a Redshift/PostgreSQL database using SQLAlchemy. """