Skip to content

Commit dce992b

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-54387][SQL] Fix recaching of DSv2 tables
### What changes were proposed in this pull request? This PR fixes recaching of DSv2 tables. ### Why are the changes needed? These changes are needed to restore correct caching behavior for DSv2 tables if a connector doesn't reuse table instances. Currently, the following use case is broken: ``` // create and populate table sql("CREATE TABLE testcat.ns.tbl (id bigint, data string) USING foo") Seq((1L, "a"), (2L, "b")).toDF("id", "data").write.insertInto("testcat.ns.tbl") // cache table val df1 = spark.table("testcat.ns.tbl") df1.cache() df1.show() // 1 -> a, 2 -> b // insert more data, refreshing cache entry Seq((3L, "c"), (4L, "d")).toDF("id", "data").write.insertInto("testcat.ns.tbl") // query val df2 = spark.table("testcat.ns.tbl") df2.show() // CACHE MISS BEFORE CHANGE! ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing + new tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53109 from aokolnychyi/spark-54387. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent aa387f3 commit dce992b

File tree

8 files changed

+47
-13
lines changed

8 files changed

+47
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ case class ReplaceTableAsSelect(
692692
// RTAS may drop and recreate table before query execution, breaking self-references
693693
// refresh and pin versions here to read from original table versions instead of
694694
// newly created empty table that is meant to serve as target for append/overwrite
695-
val refreshedQuery = V2TableRefreshUtil.refreshVersions(query)
695+
val refreshedQuery = V2TableRefreshUtil.refresh(query, versionedOnly = true)
696696
val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
697697
copy(query = pinnedQuery, isAnalyzed = true)
698698
}

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ case class DataSourceV2Relation(
133133

134134
def autoSchemaEvolution(): Boolean =
135135
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
136+
137+
def isVersioned: Boolean = table.currentVersion != null
136138
}
137139

138140
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
4040
def pinVersions(plan: LogicalPlan): LogicalPlan = {
4141
plan transform {
4242
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
43-
if r.table.currentVersion != null && r.timeTravelSpec.isEmpty =>
43+
if r.isVersioned && r.timeTravelSpec.isEmpty =>
4444
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
4545
val version = r.table.currentVersion
4646
logDebug(s"Pinning table version for $tableName to $version")
@@ -49,21 +49,25 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
4949
}
5050

5151
/**
52-
* Refreshes table metadata for all versioned tables in the plan.
52+
* Refreshes table metadata for tables in the plan.
5353
*
5454
* This method reloads table metadata from the catalog and validates:
5555
* - Table identity: Ensures table ID has not changed
5656
* - Data columns: Verifies captured columns match the current schema
5757
* - Metadata columns: Checks metadata column consistency
5858
*
59+
* Tables with time travel specifications are skipped as they reference a specific point
60+
* in time and don't have to be refreshed.
61+
*
5962
* @param plan the logical plan to refresh
63+
* @param versionedOnly indicates whether to refresh only versioned tables
6064
* @return plan with refreshed table metadata
6165
*/
62-
def refreshVersions(plan: LogicalPlan): LogicalPlan = {
66+
def refresh(plan: LogicalPlan, versionedOnly: Boolean = false): LogicalPlan = {
6367
val cache = mutable.HashMap.empty[(TableCatalog, Identifier), Table]
6468
plan transform {
6569
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
66-
if r.table.currentVersion != null && r.timeTravelSpec.isEmpty =>
70+
if (r.isVersioned || !versionedOnly) && r.timeTravelSpec.isEmpty =>
6771
val currentTable = cache.getOrElseUpdate((catalog, ident), {
6872
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
6973
logDebug(s"Refreshing table metadata for $tableName")

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation
3636
import org.apache.spark.sql.execution.command.CommandUtils
3737
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
3838
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
39+
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
3940
import org.apache.spark.sql.internal.SQLConf
4041
import org.apache.spark.storage.StorageLevel
4142
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -352,11 +353,12 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
352353
needToRecache.foreach { cd =>
353354
cd.cachedRepresentation.cacheBuilder.clearCache()
354355
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
355-
val newCache = sessionWithConfigsOff.withActive {
356-
val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan)
357-
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
356+
val (newKey, newCache) = sessionWithConfigsOff.withActive {
357+
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
358+
val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
359+
qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
358360
}
359-
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
361+
val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache)
360362
this.synchronized {
361363
if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
362364
logWarning("While recaching, data was already added to cache.")

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class QueryExecution(
207207
// there may be delay between analysis and subsequent phases
208208
// therefore, refresh captured table versions to reflect latest data
209209
private val lazyTableVersionsRefreshed = LazyTry {
210-
V2TableRefreshUtil.refreshVersions(commandExecuted)
210+
V2TableRefreshUtil.refresh(commandExecuted, versionedOnly = true)
211211
}
212212

213213
private[sql] def tableVersionsRefreshed: LogicalPlan = lazyTableVersionsRefreshed.get

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,11 +406,12 @@ object InMemoryRelation {
406406
def apply(cacheBuilder: CachedRDDBuilder, qe: QueryExecution): InMemoryRelation = {
407407
val optimizedPlan = qe.optimizedPlan
408408
val serializer = cacheBuilder.serializer
409-
val newBuilder = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
410-
cacheBuilder.copy(cachedPlan = serializer.convertToColumnarPlanIfPossible(qe.executedPlan))
409+
val newCachedPlan = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
410+
serializer.convertToColumnarPlanIfPossible(qe.executedPlan)
411411
} else {
412-
cacheBuilder.copy(cachedPlan = qe.executedPlan)
412+
qe.executedPlan
413413
}
414+
val newBuilder = cacheBuilder.copy(cachedPlan = newCachedPlan, logicalPlan = qe.logical)
414415
val relation = new InMemoryRelation(
415416
newBuilder.cachedPlan.output, newBuilder, optimizedPlan.outputOrdering)
416417
relation.statsOfPlanToCache = optimizedPlan.stats

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
6868

6969
override def sparkConf: SparkConf = super.sparkConf
7070
.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
71+
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
7172

7273
setupTestData()
7374

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,6 +1620,30 @@ class DataSourceV2DataFrameSuite
16201620
}
16211621
}
16221622

1623+
test("cached DSv2 table DataFrame is refreshed and reused after insert") {
1624+
val t = "testcat.ns1.ns2.tbl"
1625+
withTable(t) {
1626+
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
1627+
val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
1628+
df1.write.insertInto(t)
1629+
1630+
// cache DataFrame pointing to table
1631+
val readDF1 = spark.table(t)
1632+
readDF1.cache()
1633+
assertCached(readDF1)
1634+
checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))
1635+
1636+
// insert more data, invalidating and refreshing cache entry
1637+
val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
1638+
df2.write.insertInto(t)
1639+
1640+
// verify underlying plan is recached and picks up new data
1641+
val readDF2 = spark.table(t)
1642+
assertCached(readDF2)
1643+
checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d")))
1644+
}
1645+
}
1646+
16231647
private def pinTable(catalogName: String, ident: Identifier, version: String): Unit = {
16241648
catalog(catalogName) match {
16251649
case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, version)

0 commit comments

Comments
 (0)