From 1a240e546595d55e5b9840e18529447cf8aacb7a Mon Sep 17 00:00:00 2001 From: anupksingh Date: Mon, 1 Sep 2025 07:29:00 +0000 Subject: [PATCH] BigDecimalSplitter rounding fix. --- .../db/source/DataDrivenETLDBInputFormat.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java index 3416360a9..b870ba537 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java @@ -26,19 +26,24 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.db.DBSplitter; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.Properties; /** @@ -169,6 +174,26 @@ public void close() throws IOException { }; } + @Override + protected DBSplitter getSplitter(int sqlDataType) { + if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) { + return new CustomBigDecimalSplitter(); + } + return super.getSplitter(sqlDataType); + } + + static class CustomBigDecimalSplitter extends BigDecimalSplitter { + @Override + protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) { + BigDecimal size = numerator.divide(denominator, RoundingMode.HALF_UP); + if (size.compareTo(new BigDecimal("0")) <= 0) { + int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + 5; + return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP); + } + return size; + } + } + @Override protected void closeConnection() { super.closeConnection();