From 5735add71dc76f8b4a36a2a19a01b380bb044cb5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Nov 2025 14:48:38 +0100 Subject: [PATCH 1/6] fixup --- .../spark/memory/TaskMemoryManagerSuite.java | 7 ++---- .../sql/execution/joins/HashedRelation.scala | 21 +++++------------ .../benchmark/AggregateBenchmark.scala | 14 ++++------- .../HashedRelationMetricsBenchmark.scala | 7 ++---- .../execution/joins/HashedRelationSuite.scala | 23 ++++++++++++++----- 5 files changed, 31 insertions(+), 41 deletions(-) diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 25543690b832..3d1380d71aac 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -30,11 +30,8 @@ public class TaskMemoryManagerSuite { @Test public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false), - Long.MAX_VALUE, - Long.MAX_VALUE / 2, - 1), + UnifiedMemoryManager.apply( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false), 1), 0); final MemoryConsumer c = new TestMemoryConsumer(manager); manager.allocatePage(4096, c); // leak memory diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 85c198290542..c3a3711cb4de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -141,11 +141,8 @@ private[execution] object HashedRelation { ignoresDuplicatedKey: Boolean = false): HashedRelation = { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), 0) } @@ -399,11 +396,8 @@ private[joins] class UnsafeHashedRelation( // TODO(josh): This needs to be revisited before we merge this patch; making this change now // so that tests compile: val taskMemoryManager = new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), 0) val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) @@ -574,11 +568,8 @@ private[execution] final class LongToUnsafeRowMap( def this() = { this( new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), 0), 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index b2f1ee31f9fb..9762bc041322 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -519,11 +519,8 @@ object AggregateBenchmark extends SqlBasedBenchmark { value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) value.setInt(0, 555) val taskMemoryManager = new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), 0) val map = new LongToUnsafeRowMap(taskMemoryManager, 64) while (i < 65536) { @@ -550,12 +547,9 @@ object AggregateBenchmark extends SqlBasedBenchmark { Seq("off", "on").foreach { heap => benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ => val taskMemoryManager = new TaskMemoryManager( - new UnifiedMemoryManager( + UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") - .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), 1), 0) val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L << 20) val keyBytes = new Array[Byte](16) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala index 857a86ab1c67..b99fd15dcf7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala @@ -44,11 +44,8 @@ object HashedRelationMetricsBenchmark extends SqlBasedBenchmark { val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, output = output) benchmark.addCase("LongToUnsafeRowMap") { iter => val taskMemoryManager = new TaskMemoryManager( - new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1), + UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), 0) val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index b88a76bbfb57..a4dc12e7080c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -27,7 +27,8 @@ import org.apache.spark.SparkConf import org.apache.spark.SparkException import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Kryo._ -import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager} +import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager, UnifiedMemoryManager} +import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -40,11 +41,8 @@ import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.collection.CompactBuffer class HashedRelationSuite extends SharedSparkSession { - val umm = new UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), - Long.MaxValue, - Long.MaxValue / 2, - 1) + val umm = UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1) val mm = new TaskMemoryManager(umm, 0) @@ -753,4 +751,17 @@ class HashedRelationSuite extends SharedSparkSession { map.free() } } + + test("UnsafeHashedRelation should throw OOM when there isn't enough memory") { + val relations = mutable.ArrayBuffer[HashedRelation]() + // We should finally see an OOM thrown since we are keeping allocating hashed relations. + assertThrows[SparkOutOfMemoryError] { + while (true) { + relations += UnsafeHashedRelation( + Iterator.empty, Nil, ByteUnit.MiB.toBytes(200).toInt, mm) + } + } + // Releases the allocated memory. + relations.foreach(_.close()) + } } From 43607e162d049e29a1deb074cb42d83f3681f8ed Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Nov 2025 15:22:47 +0100 Subject: [PATCH 2/6] fixup --- .../spark/memory/TaskMemoryManagerSuite.java | 7 +++++-- .../execution/benchmark/AggregateBenchmark.scala | 14 ++++++++++---- .../benchmark/HashedRelationMetricsBenchmark.scala | 7 +++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 3d1380d71aac..25543690b832 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -30,8 +30,11 @@ public class TaskMemoryManagerSuite { @Test public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( - UnifiedMemoryManager.apply( - new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false), 1), + new UnifiedMemoryManager( + new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false), + Long.MAX_VALUE, + Long.MAX_VALUE / 2, + 1), 0); final MemoryConsumer c = new TestMemoryConsumer(manager); manager.allocatePage(4096, c); // leak memory diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 9762bc041322..b2f1ee31f9fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -519,8 +519,11 @@ object AggregateBenchmark extends SqlBasedBenchmark { value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) value.setInt(0, 555) val taskMemoryManager = new TaskMemoryManager( - UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), + new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Long.MaxValue, + Long.MaxValue / 2, + 1), 0) val map = new LongToUnsafeRowMap(taskMemoryManager, 64) while (i < 65536) { @@ -547,9 +550,12 @@ object AggregateBenchmark extends SqlBasedBenchmark { Seq("off", "on").foreach { heap => benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ => val taskMemoryManager = new TaskMemoryManager( - UnifiedMemoryManager( + new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}") - .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), 1), + .set(MEMORY_OFFHEAP_SIZE.key, "102400000"), + Long.MaxValue, + Long.MaxValue / 2, + 1), 0) val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L << 20) val keyBytes = new Array[Byte](16) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala index b99fd15dcf7f..857a86ab1c67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala @@ -44,8 +44,11 @@ object HashedRelationMetricsBenchmark extends SqlBasedBenchmark { val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, output = output) benchmark.addCase("LongToUnsafeRowMap") { iter => val taskMemoryManager = new TaskMemoryManager( - UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), + new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Long.MaxValue, + Long.MaxValue / 2, + 1), 0) val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false))) From bc272e5eab7b827f88efb3812552e04f2bf7395f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Nov 2025 15:25:03 +0100 Subject: [PATCH 3/6] fixup --- .../sql/execution/joins/HashedRelation.scala | 18 ++++++++++++------ .../execution/joins/HashedRelationSuite.scala | 6 ++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index c3a3711cb4de..76a9e62d804c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -141,8 +141,10 @@ private[execution] object HashedRelation { ignoresDuplicatedKey: Boolean = false): HashedRelation = { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( - UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), + new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Runtime.getRuntime.maxMemory, + Runtime.getRuntime.maxMemory / 2, 1), 0) } @@ -396,8 +398,10 @@ private[joins] class UnsafeHashedRelation( // TODO(josh): This needs to be revisited before we merge this patch; making this change now // so that tests compile: val taskMemoryManager = new TaskMemoryManager( - UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), + new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Runtime.getRuntime.maxMemory, + Runtime.getRuntime.maxMemory / 2, 1), 0) val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) @@ -568,8 +572,10 @@ private[execution] final class LongToUnsafeRowMap( def this() = { this( new TaskMemoryManager( - UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1), + new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Runtime.getRuntime.maxMemory, + Runtime.getRuntime.maxMemory / 2, 1), 0), 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index a4dc12e7080c..ea4dfbdf458d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -41,8 +41,10 @@ import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.collection.CompactBuffer class HashedRelationSuite extends SharedSparkSession { - val umm = UnifiedMemoryManager( - new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), 1) + val umm = new UnifiedMemoryManager( + new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), + Runtime.getRuntime.maxMemory, + Runtime.getRuntime.maxMemory / 2, 1) val mm = new TaskMemoryManager(umm, 0) From 7a6ea2105f26faf20567dbc6c6a5b6327fd3014a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Nov 2025 15:34:16 +0100 Subject: [PATCH 4/6] fixup --- .../spark/sql/execution/joins/HashedRelationSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index ea4dfbdf458d..2e2b661b5061 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.SparkException import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager, UnifiedMemoryManager} -import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -759,8 +758,8 @@ class HashedRelationSuite extends SharedSparkSession { // We should finally see an OOM thrown since we are keeping allocating hashed relations. assertThrows[SparkOutOfMemoryError] { while (true) { - relations += UnsafeHashedRelation( - Iterator.empty, Nil, ByteUnit.MiB.toBytes(200).toInt, mm) + // Allocates ~128 MiB each time. + relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm) } } // Releases the allocated memory. From 69149abebe0797b38df94f6234603334297f8b20 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Nov 2025 15:48:51 +0100 Subject: [PATCH 5/6] fixup --- .../apache/spark/sql/execution/joins/HashedRelationSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 2e2b661b5061..b091f871d080 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -764,5 +764,6 @@ class HashedRelationSuite extends SharedSparkSession { } // Releases the allocated memory. relations.foreach(_.close()) + mm.cleanUpAllAllocatedMemory } } From 2c557ff728b88d4f1f5055bcef5cb325c8108186 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 19 Nov 2025 11:02:52 +0100 Subject: [PATCH 6/6] remove unstable test --- .../sql/execution/joins/HashedRelationSuite.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index b091f871d080..d4743afbaa93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -752,18 +752,4 @@ class HashedRelationSuite extends SharedSparkSession { map.free() } } - - test("UnsafeHashedRelation should throw OOM when there isn't enough memory") { - val relations = mutable.ArrayBuffer[HashedRelation]() - // We should finally see an OOM thrown since we are keeping allocating hashed relations. - assertThrows[SparkOutOfMemoryError] { - while (true) { - // Allocates ~128 MiB each time. - relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm) - } - } - // Releases the allocated memory. - relations.foreach(_.close()) - mm.cleanUpAllAllocatedMemory - } }