Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ private[execution] object HashedRelation {
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Runtime.getRuntime.maxMemory / 2, 1),
Runtime.getRuntime.maxMemory / 2,
1),

0)
}

Expand Down Expand Up @@ -401,9 +400,8 @@ private[joins] class UnsafeHashedRelation(
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
0)

val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
Copy link
Member

@yaooqinn yaooqinn Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the per-JVM memory manager here can be used?

Copy link
Member Author

@zhztheplayer zhztheplayer Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not right now but I think it's the final target. We cannot directly modify the memory manager here, because currently Spark has to stick BHJ's memory allocation on the JVM heap. The per-JVM memory manager may be using off-heap mode.

#52817 will improve SHJ to make sure it follows the per-JVM memory manager's memory mode, but we need a separate solution for BHJ in the future (which relies on the read code path marked here).

Expand Down Expand Up @@ -576,9 +574,8 @@ private[execution] final class LongToUnsafeRowMap(
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
0),
0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -42,9 +42,8 @@ import org.apache.spark.util.collection.CompactBuffer
class HashedRelationSuite extends SharedSparkSession {
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1)

val mm = new TaskMemoryManager(umm, 0)

Expand Down