diff --git a/CHANGES.rst b/CHANGES.rst index 3eea33d0..73aa19a0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,7 +1,8 @@ 0.8.15 (unreleased) ------------------- -- Nothing changed yet. +- Add support for Iceberg string and binary types + (`Pull #297 `_) 0.8.14 (2023-04-07) diff --git a/sqlalchemy_redshift/dialect.py b/sqlalchemy_redshift/dialect.py index ce755ea6..16243fb5 100644 --- a/sqlalchemy_redshift/dialect.py +++ b/sqlalchemy_redshift/dialect.py @@ -290,6 +290,9 @@ class RedshiftImpl(postgresql.PostgresqlImpl): """ +MAX_VARCHAR_LENGTH = 65535 + + class RedshiftTypeEngine(TypeEngine): def _default_dialect(self, default=None): @@ -410,6 +413,67 @@ def get_dbapi_type(self, dbapi): return dbapi.HLLSKETCH +class ICEBERG_STRING(sa.types.TypeDecorator): + """ + ICEBERG_STRING is used to represent iceberg strings in a + Redshift/PostgreSQL database using SQLAlchemy. + """ + + impl = sa.types.String + + @property + def python_type(self): + return str + + def load_dialect_impl(self, dialect): + return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH) + + +class ICEBERG_BINARY(sa.types.TypeDecorator): + """ + ICEBERG_BINARY is used to represent iceberg binary data in a + Redshift/PostgreSQL database using SQLAlchemy. + """ + + impl = sa.types.LargeBinary + + @property + def python_type(self): + return bytes + + def process_bind_param(self, value, dialect): + if value is None: + return value + + if isinstance(value, bytes): + return value + + if isinstance(value, str): + encoding = getattr(dialect, 'encoding', 'utf-8') + return value.encode(encoding) + + return value + + def result_processor(self, dialect, coltype): + def process(value): + if value is None: + return value + + if isinstance(value, bytes): + return value + + if isinstance(value, str): + encoding = getattr(dialect, 'encoding', 'utf-8') + return value.encode(encoding) + + raise TypeError( + "Unexpected type for value in result_processor.process: ", + type(value) + ) + + return process + + # Mapping for database schema inspection of Amazon Redshift datatypes REDSHIFT_ISCHEMA_NAMES = { "geometry": GEOMETRY, @@ -417,6 +481,9 @@ def get_dbapi_type(self, dbapi): "time with time zone": TIMETZ, "timestamp with time zone": TIMESTAMPTZ, "hllsketch": HLLSKETCH, + # iceberg types + "string": ICEBERG_STRING, + "binary": ICEBERG_BINARY, } diff --git a/tests/test_column_loading.py b/tests/test_column_loading.py index 349a31cc..ccdd35fb 100644 --- a/tests/test_column_loading.py +++ b/tests/test_column_loading.py @@ -5,7 +5,8 @@ from sqlalchemy.types import NullType, VARCHAR from sqlalchemy_redshift.dialect import ( - RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi + RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi, + ICEBERG_STRING, ICEBERG_BINARY ) sa_version = Version(sa.__version__) @@ -47,3 +48,31 @@ def test_varchar_as_nulltype(self): identity=None ) assert isinstance(varchar_info['type'], VARCHAR) + + iceberg_string_info = dialect._get_column_info( + name='Iceberg String Column', + format_type='string', + default=None, + notnull=False, + domains={}, + enums=[], + schema='default', + encode='', + comment='test column', + identity=None + ) + assert isinstance(iceberg_string_info['type'], ICEBERG_STRING) + + iceberg_binary_info = dialect._get_column_info( + name='Iceberg Binary Column', + format_type='binary', + default=None, + notnull=False, + domains={}, + enums=[], + schema='default', + encode='', + comment='test column', + identity=None + ) + assert isinstance(iceberg_binary_info['type'], ICEBERG_BINARY)