From fc030b325d5b27378a28045d3102a056d1d60932 Mon Sep 17 00:00:00 2001 From: tm Date: Wed, 15 Apr 2026 17:47:27 +0200 Subject: [PATCH 1/8] add pipe method to DataFrame --- src/snowflake/snowpark/dataframe.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index c7f7b07e96..3f3fb3b9ef 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -7027,6 +7027,33 @@ def print_schema(self, level: Optional[int] = None) -> None: # naturalJoin = natural_join # withColumns = with_columns + def pipe(self, f, *args, **kwargs): + """ + Applies a function to the DataFrame and returns the result. + + Args: + f: A function to apply to the DataFrame. + *args: Additional positional arguments to pass to the function. + **kwargs: Additional keyword arguments to pass to the function. + + Returns: + The result of applying the function to the DataFrame. + + Examples:: + >>> df = session.create_dataframe([(1, "a"), (2, "b")], schema=["col1", "col2"]) + >>> def add_column(df): + ... return df.with_column("col3", lit(3)) + >>> new_df = df.pipe(add_column) + >>> new_df.show() + -------------------- + |"COL1" |"COL2" |"COL3" | + -------------------- + |1 |a |3 | + |2 |b |3 | + -------------------- + """ + return f(self, *args, **kwargs) + def map( dataframe: DataFrame, From 0ca8ba8017974f696d7e7d2b58fa2e6bb6a00889 Mon Sep 17 00:00:00 2001 From: tm Date: Wed, 15 Apr 2026 18:11:20 +0200 Subject: [PATCH 2/8] added unit test --- tests/unit/test_dataframe.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py index 4d3e94f5f2..f86c4534a8 100644 --- a/tests/unit/test_dataframe.py +++ b/tests/unit/test_dataframe.py @@ -409,6 +409,24 @@ def test_dataFrame_printSchema(capfd, mock_server_connection): ) +def test_dataFrame_pipe(mock_server_connection): + session = snowflake.snowpark.session.Session(mock_server_connection) + df = session.create_dataframe([[1, ""], [3, None]]) + df._plan._metadata = PlanMetadata( + attributes=[ + Attribute("A", IntegerType(), False), + Attribute("B", StringType()), + ], + quoted_identifiers=None, + ) + + def test_func(df): + return df + + result_df, expected_result = df.pipe(test_func), test_func(df) + assert result_df == expected_result + + def test_session(): fake_session = mock.create_autospec(Session, _session_id=123456) fake_session._analyzer = mock.Mock() From 6e887cde3e90ea005e1cd2e11adc25393a21d1a9 Mon Sep 17 00:00:00 2001 From: Tijoxa Date: Thu, 16 Apr 2026 16:43:54 +0200 Subject: [PATCH 3/8] Added documentation for pipe method --- CHANGELOG.md | 4 ++++ docs/source/snowpark/dataframe.rst | 1 + 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cddf0a0796..a08f4bdd18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ ### Snowpark Python API Updates +#### New Features + +- Added support for `DataFrame.pipe`. + #### Bug Fixes - Fixed a bug where CTE optimization incorrectly deduplicated subtrees containing non-deterministic data generation functions (e.g. `uuid_string()`). diff --git a/docs/source/snowpark/dataframe.rst b/docs/source/snowpark/dataframe.rst index a62bab7bc2..81a34a9839 100644 --- a/docs/source/snowpark/dataframe.rst +++ b/docs/source/snowpark/dataframe.rst @@ -67,6 +67,7 @@ DataFrame DataFrame.natural_join DataFrame.orderBy DataFrame.order_by + DataFrame.pipe DataFrame.pivot DataFrame.print_schema DataFrame.printSchema From a0be0133bffddc09c28de50f8f3dcfea850fd08d Mon Sep 17 00:00:00 2001 From: Tijoxa Date: Thu, 16 Apr 2026 16:47:20 +0200 Subject: [PATCH 4/8] Added typing to the pipe method --- src/snowflake/snowpark/dataframe.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 3f3fb3b9ef..9874495595 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -24,6 +24,7 @@ Optional, Set, Tuple, + TypeVar, Union, overload, ) @@ -251,6 +252,9 @@ _UNALIASED_REGEX = re.compile(f"""._[a-zA-Z0-9]{{{_NUM_PREFIX_DIGITS}}}_(.*)""") +T = TypeVar("T") + + def _generate_prefix(prefix: str) -> str: return f"{prefix}_{generate_random_alphanumeric(_NUM_PREFIX_DIGITS)}_" @@ -7027,14 +7031,14 @@ def print_schema(self, level: Optional[int] = None) -> None: # naturalJoin = natural_join # withColumns = with_columns - def pipe(self, f, *args, **kwargs): + def pipe(self, function: Callable[..., T], *args, **kwargs) -> T: """ Applies a function to the DataFrame and returns the result. Args: - f: A function to apply to the DataFrame. - *args: Additional positional arguments to pass to the function. - **kwargs: Additional keyword arguments to pass to the function. + function: A user-defined function (UDF) to apply to the DataFrame. + *args: Additional positional arguments to pass to the UDF. + **kwargs: Additional keyword arguments to pass to the UDF. Returns: The result of applying the function to the DataFrame. @@ -7052,7 +7056,7 @@ def pipe(self, f, *args, **kwargs): |2 |b |3 | -------------------- """ - return f(self, *args, **kwargs) + return function(self, *args, **kwargs) def map( From 0616463f9da2a1820a1e1bdfd84d354b40117caa Mon Sep 17 00:00:00 2001 From: Tijoxa Date: Thu, 16 Apr 2026 17:34:25 +0200 Subject: [PATCH 5/8] Improved docstring and improve test quality --- src/snowflake/snowpark/dataframe.py | 29 +++++++++++++++-------------- tests/unit/test_dataframe.py | 28 ++++++++++++++-------------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 9874495595..c238a7e6f1 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -7032,8 +7032,7 @@ def print_schema(self, level: Optional[int] = None) -> None: # withColumns = with_columns def pipe(self, function: Callable[..., T], *args, **kwargs) -> T: - """ - Applies a function to the DataFrame and returns the result. + """Applies a function to the DataFrame and returns the result. Args: function: A user-defined function (UDF) to apply to the DataFrame. @@ -7043,18 +7042,20 @@ def pipe(self, function: Callable[..., T], *args, **kwargs) -> T: Returns: The result of applying the function to the DataFrame. - Examples:: - >>> df = session.create_dataframe([(1, "a"), (2, "b")], schema=["col1", "col2"]) - >>> def add_column(df): - ... return df.with_column("col3", lit(3)) - >>> new_df = df.pipe(add_column) - >>> new_df.show() - -------------------- - |"COL1" |"COL2" |"COL3" | - -------------------- - |1 |a |3 | - |2 |b |3 | - -------------------- + Example:: + + >>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) + >>> def test_function(df: DataFrame, col: str, threshold: float = 0): + ... df = df.filter(df[col] > threshold) + ... return df.collect() + >>> result = df.pipe(test_function, "a", threshold=1) + >>> result.show() + ------------- + |"A" |"B" | + ------------- + |3 |4 | + ------------- + """ return function(self, *args, **kwargs) diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py index f86c4534a8..ec4f583da3 100644 --- a/tests/unit/test_dataframe.py +++ b/tests/unit/test_dataframe.py @@ -409,22 +409,22 @@ def test_dataFrame_printSchema(capfd, mock_server_connection): ) -def test_dataFrame_pipe(mock_server_connection): - session = snowflake.snowpark.session.Session(mock_server_connection) - df = session.create_dataframe([[1, ""], [3, None]]) - df._plan._metadata = PlanMetadata( - attributes=[ - Attribute("A", IntegerType(), False), - Attribute("B", StringType()), - ], - quoted_identifiers=None, - ) +def test_dataframe_pipe(session): + df: DataFrame = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) + + # test normal function + def test_function(df: DataFrame, col: str, threshold: float = 0.0): + df = df.filter(df[col] > threshold) + return df.collect(), df.count() + + result, expected_result = df.pipe(test_function, "a", threshold=1), test_function(df, "a", 1) + + assert result == expected_result - def test_func(df): - return df + # test lambda function + result, expected_result = df.pipe(lambda x: int(x.count())), (lambda x: int(x.count()))(df) - result_df, expected_result = df.pipe(test_func), test_func(df) - assert result_df == expected_result + assert result == expected_result def test_session(): From 401fadc0b8b7436c775750355c27f2b4b4a908aa Mon Sep 17 00:00:00 2001 From: Tijoxa Date: Thu, 16 Apr 2026 22:19:37 +0200 Subject: [PATCH 6/8] Updated type hint --- src/snowflake/snowpark/dataframe.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index c238a7e6f1..e1f9c594ee 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -242,9 +242,15 @@ from collections.abc import Iterable if TYPE_CHECKING: + from typing import Concatenate, ParamSpec + import modin.pandas # pragma: no cover from table import Table # pragma: no cover + T = TypeVar("T") + P = ParamSpec("P") + + _logger = getLogger(__name__) _ONE_MILLION = 1000000 @@ -252,9 +258,6 @@ _UNALIASED_REGEX = re.compile(f"""._[a-zA-Z0-9]{{{_NUM_PREFIX_DIGITS}}}_(.*)""") -T = TypeVar("T") - - def _generate_prefix(prefix: str) -> str: return f"{prefix}_{generate_random_alphanumeric(_NUM_PREFIX_DIGITS)}_" @@ -7031,7 +7034,7 @@ def print_schema(self, level: Optional[int] = None) -> None: # naturalJoin = natural_join # withColumns = with_columns - def pipe(self, function: Callable[..., T], *args, **kwargs) -> T: + def pipe(self, function: Callable[Concatenate["DataFrame", P], T], *args: P.args, **kwargs: P.kwargs) -> T: """Applies a function to the DataFrame and returns the result. Args: From 523c9887f76190b626bebbe1d09ccd4eb2320d43 Mon Sep 17 00:00:00 2001 From: Tijoxa Date: Fri, 17 Apr 2026 21:43:44 +0200 Subject: [PATCH 7/8] fixed type_checking for python 3.9 --- src/snowflake/snowpark/dataframe.py | 20 ++++++++++++++++---- tests/unit/test_dataframe.py | 8 ++++++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index e1f9c594ee..461167a7b8 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -241,14 +241,21 @@ else: from collections.abc import Iterable -if TYPE_CHECKING: +# Python 3.9 needs to use typing_extensions.ParamSpec and typing_extensions.Concatenate +# Python 3.10+ can use typing.ParamSpec and typing.Concatenate because they are available in the standard library +if sys.version_info < (3, 10): + from typing_extensions import Concatenate, ParamSpec +else: from typing import Concatenate, ParamSpec + +if TYPE_CHECKING: import modin.pandas # pragma: no cover from table import Table # pragma: no cover - T = TypeVar("T") - P = ParamSpec("P") + +T = TypeVar("T") +P = ParamSpec("P") _logger = getLogger(__name__) @@ -7034,7 +7041,12 @@ def print_schema(self, level: Optional[int] = None) -> None: # naturalJoin = natural_join # withColumns = with_columns - def pipe(self, function: Callable[Concatenate["DataFrame", P], T], *args: P.args, **kwargs: P.kwargs) -> T: + def pipe( + self, + function: Callable[Concatenate["DataFrame", P], T], + *args: P.args, + **kwargs: P.kwargs, + ) -> T: """Applies a function to the DataFrame and returns the result. Args: diff --git a/tests/unit/test_dataframe.py b/tests/unit/test_dataframe.py index ec4f583da3..aa34d75999 100644 --- a/tests/unit/test_dataframe.py +++ b/tests/unit/test_dataframe.py @@ -417,12 +417,16 @@ def test_function(df: DataFrame, col: str, threshold: float = 0.0): df = df.filter(df[col] > threshold) return df.collect(), df.count() - result, expected_result = df.pipe(test_function, "a", threshold=1), test_function(df, "a", 1) + result, expected_result = df.pipe(test_function, "a", threshold=1), test_function( + df, "a", 1 + ) assert result == expected_result # test lambda function - result, expected_result = df.pipe(lambda x: int(x.count())), (lambda x: int(x.count()))(df) + result, expected_result = df.pipe(lambda x: int(x.count())), ( + lambda x: int(x.count()) + )(df) assert result == expected_result From 9165712f5ab0de9a2af7493c2467b07bba410e75 Mon Sep 17 00:00:00 2001 From: Jonathan Shi Date: Tue, 21 Apr 2026 10:01:17 -0700 Subject: [PATCH 8/8] fix doctest --- src/snowflake/snowpark/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 461167a7b8..468d643b47 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -7062,7 +7062,7 @@ def pipe( >>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]) >>> def test_function(df: DataFrame, col: str, threshold: float = 0): ... df = df.filter(df[col] > threshold) - ... return df.collect() + ... return df >>> result = df.pipe(test_function, "a", threshold=1) >>> result.show() -------------