From 4155ecd665ab2d9c0bb7a98664f83c6f2b6025b4 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 28 Apr 2025 06:25:42 +0200 Subject: [PATCH 1/3] handling missing values in label encoder --- python/hsfs/builtin_transformations.py | 4 +- ...t_python_spark_transformation_functions.py | 66 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/python/hsfs/builtin_transformations.py b/python/hsfs/builtin_transformations.py index 4426268ccf..cb62cc4261 100644 --- a/python/hsfs/builtin_transformations.py +++ b/python/hsfs/builtin_transformations.py @@ -45,7 +45,9 @@ def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Serie @udf(int, drop=["feature"], mode="pandas") def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series: - unique_data = sorted([value for value in statistics.feature.unique_values]) + unique_data = sorted( + [value for value in statistics.feature.unique_values if not pd.isna(value)] + ) value_to_index = {value: index for index, value in enumerate(unique_data)} # Unknown categories not present in training dataset are encoded as -1. return pd.Series( diff --git a/python/tests/engine/test_python_spark_transformation_functions.py b/python/tests/engine/test_python_spark_transformation_functions.py index bf81642909..27c7efbd06 100644 --- a/python/tests/engine/test_python_spark_transformation_functions.py +++ b/python/tests/engine/test_python_spark_transformation_functions.py @@ -610,6 +610,72 @@ def tf_fun(col_0): td, spark_df, expected_spark_df, transformation_functions ) + def test_apply_builtin_label_encoder(self, mocker): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + mocker.patch("hsfs.core.statistics_engine.StatisticsEngine._save_statistics") + spark_engine = spark.Engine() + + schema = StructType( + [ + StructField("col_0", IntegerType(), True), + StructField("col_1", StringType(), True), + StructField("col_2", BooleanType(), True), + ] + ) + df = pd.DataFrame( + data={ + "col_0": [1, 2, 3], + "col_1": ["test_1", "test_2", "test_3"], + "col_2": [True, False, True], + } + ) + spark_df = spark_engine._spark_session.createDataFrame(df, schema=schema) + + expected_schema = StructType( + [ + StructField("col_0", IntegerType(), True), + StructField("col_2", BooleanType(), True), + StructField("label_encoder_col_1", IntegerType(), True), + ] + ) + expected_df = pd.DataFrame( + data={ + "col_0": [1, 2, 3], + "col_2": [True, False, True], + "label_encoder_col_1": [0, 1, 0], + } + ) + expected_spark_df = spark_engine._spark_session.createDataFrame( + expected_df, schema=expected_schema + ) + + # Arrange + from hsfs.builtin_transformations import label_encoder + + td = self._create_training_dataset() + + transformation_functions = [ + transformation_function.TransformationFunction( + hopsworks_udf=label_encoder("col_1"), + featurestore_id=99, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + ] + + extended_statistics = {"unique_values": ["test_1", "test_2"]} + transformation_functions[0].transformation_statistics = [ + FeatureDescriptiveStatistics( + feature_name="col_0", extended_statistics=extended_statistics + ) + ] + + # Assert + self._validate_on_python_engine(td, df, expected_df, transformation_functions) + self._validate_on_spark_engine( + td, spark_df, expected_spark_df, transformation_functions + ) + def test_apply_plus_one_int_python(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") From fd6dbf43841250dbae0479683a1124705183419f Mon Sep 17 00:00:00 2001 From: manu-sj Date: Mon, 28 Apr 2025 09:12:04 +0200 Subject: [PATCH 2/3] fixing unit tests --- .../engine/test_python_spark_transformation_functions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/tests/engine/test_python_spark_transformation_functions.py b/python/tests/engine/test_python_spark_transformation_functions.py index 27c7efbd06..04a6f88b90 100644 --- a/python/tests/engine/test_python_spark_transformation_functions.py +++ b/python/tests/engine/test_python_spark_transformation_functions.py @@ -626,7 +626,7 @@ def test_apply_builtin_label_encoder(self, mocker): df = pd.DataFrame( data={ "col_0": [1, 2, 3], - "col_1": ["test_1", "test_2", "test_3"], + "col_1": ["test_1", "test_2", "test_1"], "col_2": [True, False, True], } ) @@ -636,14 +636,14 @@ def test_apply_builtin_label_encoder(self, mocker): [ StructField("col_0", IntegerType(), True), StructField("col_2", BooleanType(), True), - StructField("label_encoder_col_1", IntegerType(), True), + StructField("label_encoder_col_1_", IntegerType(), True), ] ) expected_df = pd.DataFrame( data={ "col_0": [1, 2, 3], "col_2": [True, False, True], - "label_encoder_col_1": [0, 1, 0], + "label_encoder_col_1_": [0, 1, 0], } ) expected_spark_df = spark_engine._spark_session.createDataFrame( @@ -666,7 +666,7 @@ def test_apply_builtin_label_encoder(self, mocker): extended_statistics = {"unique_values": ["test_1", "test_2"]} transformation_functions[0].transformation_statistics = [ FeatureDescriptiveStatistics( - feature_name="col_0", extended_statistics=extended_statistics + feature_name="col_1", extended_statistics=extended_statistics ) ] From c71be942ac9b7686f98309c9494ea4c605f920df Mon Sep 17 00:00:00 2001 From: manu-sj Date: Tue, 29 Apr 2025 09:47:26 +0200 Subject: [PATCH 3/3] adding unit tests --- ...t_python_spark_transformation_functions.py | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/python/tests/engine/test_python_spark_transformation_functions.py b/python/tests/engine/test_python_spark_transformation_functions.py index 04a6f88b90..1057d7dc0f 100644 --- a/python/tests/engine/test_python_spark_transformation_functions.py +++ b/python/tests/engine/test_python_spark_transformation_functions.py @@ -16,6 +16,7 @@ from __future__ import annotations import datetime +import math import os import statistics @@ -636,7 +637,7 @@ def test_apply_builtin_label_encoder(self, mocker): [ StructField("col_0", IntegerType(), True), StructField("col_2", BooleanType(), True), - StructField("label_encoder_col_1_", IntegerType(), True), + StructField("label_encoder_col_1_", LongType(), True), ] ) expected_df = pd.DataFrame( @@ -676,6 +677,73 @@ def test_apply_builtin_label_encoder(self, mocker): td, spark_df, expected_spark_df, transformation_functions ) + def test_apply_builtin_label_encoder_null_values(self, mocker): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + mocker.patch("hsfs.core.statistics_engine.StatisticsEngine._save_statistics") + spark_engine = spark.Engine() + + schema = StructType( + [ + StructField("col_0", IntegerType(), True), + StructField("col_1", StringType(), True), + StructField("col_2", BooleanType(), True), + ] + ) + df = pd.DataFrame( + data={ + "col_0": [1, 2, 3, 4], + "col_1": ["test_1", "test_2", None, "test_1"], + "col_2": [True, False, True, True], + } + ) + spark_df = spark_engine._spark_session.createDataFrame(df, schema=schema) + + expected_schema = StructType( + [ + StructField("col_0", IntegerType(), True), + StructField("col_2", BooleanType(), True), + StructField("label_encoder_col_1_", LongType(), True), + ] + ) + expected_df = pd.DataFrame( + data={ + "col_0": [1, 2, 3, 4], + "col_2": [True, False, True, True], + "label_encoder_col_1_": [0, 1, math.nan, 0], + } + ) + data = [(1, True, 0), (2, False, 1), (3, True, None), (4, True, 0)] + expected_spark_df = spark_engine._spark_session.createDataFrame( + data, schema=expected_schema + ) + + # Arrange + from hsfs.builtin_transformations import label_encoder + + td = self._create_training_dataset() + + transformation_functions = [ + transformation_function.TransformationFunction( + hopsworks_udf=label_encoder("col_1"), + featurestore_id=99, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + ] + + extended_statistics = {"unique_values": ["test_1", None, "test_2"]} + transformation_functions[0].transformation_statistics = [ + FeatureDescriptiveStatistics( + feature_name="col_1", extended_statistics=extended_statistics + ) + ] + + # Assert + self._validate_on_python_engine(td, df, expected_df, transformation_functions) + self._validate_on_spark_engine( + td, spark_df, expected_spark_df, transformation_functions + ) + def test_apply_plus_one_int_python(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance")