Skip to content

Conversation

@sryza
Copy link
Contributor

@sryza sryza commented Nov 14, 2025

What changes were proposed in this pull request?

Updates the per-streaming table checkpoint path to use the fully qualified table path, instead of just its name.

Why are the changes needed?

A streaming table is a table fed by a stream. Streaming tables have checkpoint directories underneath their pipeline's storage root. These directories don't currently take the table namespace into account, which means that two tables with different schemas but the same name will be mapped to the same checkpoint directory. This could be very bad and cause data loss.

Does this PR introduce any user-facing change?

Yes, but for an unreleased feature.

How was this patch tested?

Added a test for the collision case. Verified that it fails with the prior logic and now passes.

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added the SQL label Nov 14, 2025
graph.sink.contains(flow.destinationIdentifier)) {
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
val flowTableName = flow.destinationIdentifier.table
val flowTableId = flow.destinationIdentifier.nameParts.mkString(Path.SEPARATOR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*.nameParts.mkString(Path.SEPARATOR) looks a little tacky to me. Could you add more function comment about this at line 41?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work for Apache Iceberg database and tables, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will add a comment.

This is agnostic to the table format; it's just controlling the directory that we're storing streaming checkpoints in. That directory is keyed by the table name, but we're not actually putting it inside the table directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it later as a follow-up. Let me merge this first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 here's the followup: #53089. I slotted it under the same JIRA because it's a continuation of this PR, but let me know if it's preferred to file a new JIRA.

): Path = {
val expectedRawCheckPointDir = tableOrSinkElement match {
case t if t.isInstanceOf[Table] || t.isInstanceOf[Sink] =>
val tableId = t.identifier.nameParts.mkString(Path.SEPARATOR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need this multiple places, we had better a utility method for this conversion to be safe, @sryza .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @sryza .

Merged to master/4.1 for Apache Spark 4.1.0.

dongjoon-hyun pushed a commit that referenced this pull request Nov 15, 2025
…ifferent schemas have same name

### What changes were proposed in this pull request?

Updates the per-streaming table checkpoint path to use the fully qualified table path, instead of just its name.

### Why are the changes needed?

A streaming table is a table fed by a stream. Streaming tables have checkpoint directories underneath their pipeline's storage root. These directories don't currently take the table namespace into account, which means that two tables with different schemas but the same name will be mapped to the same checkpoint directory. This could be very bad and cause data loss.

### Does this PR introduce _any_ user-facing change?

Yes, but for an unreleased feature.

### How was this patch tested?

Added a test for the collision case. Verified that it fails with the prior logic and now passes.

### Was this patch authored or co-authored using generative AI tooling?

Closes #53070 from sryza/collide.

Authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit e09c999)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants