Skip to content

Commit 5a25b05

Browse files
sryzadongjoon-hyun
authored andcommitted
[SPARK-54358][SDP] Checkpoint dirs collide when streaming tables in different schemas have same name
### What changes were proposed in this pull request? Updates the per-streaming table checkpoint path to use the fully qualified table path, instead of just its name. ### Why are the changes needed? A streaming table is a table fed by a stream. Streaming tables have checkpoint directories underneath their pipeline's storage root. These directories don't currently take the table namespace into account, which means that two tables with different schemas but the same name will be mapped to the same checkpoint directory. This could be very bad and cause data loss. ### Does this PR introduce _any_ user-facing change? Yes, but for an unreleased feature. ### How was this patch tested? Added a test for the collision case. Verified that it fails with the prior logic and now passes. ### Was this patch authored or co-authored using generative AI tooling? Closes #53070 from sryza/collide. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit e09c999) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 724ef24 commit 5a25b05

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ case class FlowSystemMetadata(
4040
* which is storage/_checkpoints/flow_destination_table/flow_name.
4141
* @return the checkpoint root directory for `flow`
4242
*/
43-
private def flowCheckpointsDirOpt(): Option[Path] = {
43+
def flowCheckpointsDirOpt(): Option[Path] = {
4444
Option(if (graph.table.contains(flow.destinationIdentifier) ||
4545
graph.sink.contains(flow.destinationIdentifier)) {
4646
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
47-
val flowTableName = flow.destinationIdentifier.table
47+
val flowTableId = flow.destinationIdentifier.nameParts.mkString(Path.SEPARATOR)
4848
val flowName = flow.identifier.table
4949
val checkpointDir = new Path(
50-
new Path(checkpointRoot, flowTableName),
50+
new Path(checkpointRoot, flowTableId),
5151
flowName
5252
)
5353
logInfo(

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,38 @@ class SystemMetadataSuite
227227
updateContext2
228228
)
229229
}
230+
231+
test("checkpoint dirs for tables with same name but different schema don't collide") {
232+
val session = spark
233+
import session.implicits._
234+
235+
// create a pipeline with only a single ST
236+
val graph = new TestGraphRegistrationContext(spark) {
237+
implicit val sparkSession: SparkSession = spark
238+
val mem: MemoryStream[Int] = MemoryStream[Int]
239+
mem.addData(1, 2, 3)
240+
registerView("a", query = dfFlowFunc(mem.toDF()))
241+
registerTable("st")
242+
registerFlow("st", "st", query = readStreamFlowFunc("a"))
243+
registerTable("schema2.st")
244+
registerFlow("schema2.st", "schema2.st", query = readStreamFlowFunc("a"))
245+
}.toDataflowGraph
246+
247+
val updateContext = TestPipelineUpdateContext(
248+
unresolvedGraph = graph,
249+
spark = spark,
250+
storageRoot = storageRoot,
251+
failOnErrorEvent = true
252+
)
253+
254+
val stFlow = graph.flow(fullyQualifiedIdentifier("st"))
255+
val schema2StFlow = graph.flow(fullyQualifiedIdentifier("st", database = Option("schema2")))
256+
val stSystemMetadata = FlowSystemMetadata(updateContext, stFlow, graph)
257+
val schema2StSystemMetadata = FlowSystemMetadata(updateContext, schema2StFlow, graph)
258+
assert(
259+
stSystemMetadata.flowCheckpointsDirOpt() != schema2StSystemMetadata.flowCheckpointsDirOpt()
260+
)
261+
}
230262
}
231263

232264
trait SystemMetadataTestHelpers {
@@ -242,8 +274,9 @@ trait SystemMetadataTestHelpers {
242274
): Path = {
243275
val expectedRawCheckPointDir = tableOrSinkElement match {
244276
case t if t.isInstanceOf[Table] || t.isInstanceOf[Sink] =>
277+
val tableId = t.identifier.nameParts.mkString(Path.SEPARATOR)
245278
new Path(updateContext.storageRoot)
246-
.suffix(s"/_checkpoints/${t.identifier.table}/${flowElement.identifier.table}")
279+
.suffix(s"/_checkpoints/$tableId/${flowElement.identifier.table}")
247280
.toString
248281
case _ =>
249282
fail(

0 commit comments

Comments
 (0)