Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
0.8.15 (unreleased)
-------------------

- Nothing changed yet.
- Add support for Iceberg string and binary types
(`Pull #297 <https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/pull/297>`_)


0.8.14 (2023-04-07)
Expand Down
67 changes: 67 additions & 0 deletions sqlalchemy_redshift/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ class RedshiftImpl(postgresql.PostgresqlImpl):
"""


MAX_VARCHAR_LENGTH = 65535


class RedshiftTypeEngine(TypeEngine):

def _default_dialect(self, default=None):
Expand Down Expand Up @@ -410,13 +413,77 @@ def get_dbapi_type(self, dbapi):
return dbapi.HLLSKETCH


class ICEBERG_STRING(sa.types.TypeDecorator):
"""
ICEBERG_STRING is used to represent iceberg strings in a
Redshift/PostgreSQL database using SQLAlchemy.
"""

impl = sa.types.String

@property
def python_type(self):
return str

def load_dialect_impl(self, dialect):
return sa.dialects.postgresql.VARCHAR(length=MAX_VARCHAR_LENGTH)


class ICEBERG_BINARY(sa.types.TypeDecorator):
"""
ICEBERG_BINARY is used to represent iceberg binary data in a
Redshift/PostgreSQL database using SQLAlchemy.
"""

impl = sa.types.LargeBinary

@property
def python_type(self):
return bytes

def process_bind_param(self, value, dialect):
if value is None:
return value

if isinstance(value, bytes):
return value

if isinstance(value, str):
encoding = getattr(dialect, 'encoding', 'utf-8')
return value.encode(encoding)

return value

def result_processor(self, dialect, coltype):
def process(value):
if value is None:
return value

if isinstance(value, bytes):
return value

if isinstance(value, str):
encoding = getattr(dialect, 'encoding', 'utf-8')
return value.encode(encoding)

raise TypeError(
"Unexpected type for value in result_processor.process: ",
type(value)
)

return process


# Mapping for database schema inspection of Amazon Redshift datatypes
REDSHIFT_ISCHEMA_NAMES = {
"geometry": GEOMETRY,
"super": SUPER,
"time with time zone": TIMETZ,
"timestamp with time zone": TIMESTAMPTZ,
"hllsketch": HLLSKETCH,
# iceberg types
"string": ICEBERG_STRING,
"binary": ICEBERG_BINARY,
}


Expand Down
31 changes: 30 additions & 1 deletion tests/test_column_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from sqlalchemy.types import NullType, VARCHAR

from sqlalchemy_redshift.dialect import (
RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi
RedshiftDialect_psycopg2, RedshiftDialect_psycopg2cffi,
ICEBERG_STRING, ICEBERG_BINARY
)

sa_version = Version(sa.__version__)
Expand Down Expand Up @@ -47,3 +48,31 @@ def test_varchar_as_nulltype(self):
identity=None
)
assert isinstance(varchar_info['type'], VARCHAR)

iceberg_string_info = dialect._get_column_info(
name='Iceberg String Column',
format_type='string',
default=None,
notnull=False,
domains={},
enums=[],
schema='default',
encode='',
comment='test column',
identity=None
)
assert isinstance(iceberg_string_info['type'], ICEBERG_STRING)

iceberg_binary_info = dialect._get_column_info(
name='Iceberg Binary Column',
format_type='binary',
default=None,
notnull=False,
domains={},
enums=[],
schema='default',
encode='',
comment='test column',
identity=None
)
assert isinstance(iceberg_binary_info['type'], ICEBERG_BINARY)