Skip to content

Commit 1c283c0

Browse files
Ubuntuanishshri-db
authored andcommitted
[SPARK-54063][SS] Trigger snapshot for next batch when upload lag
## What changes were proposed in this pull request? We are adding a functionality for StateStoreProvider to force creating a snapshot on commit when creating a StateStore when it detects that the corresponding StateStoreProvider is lagging in creating snapshot (i.e. having too many changelog). This applies to both RocksDB and HDFS. This feature is only enabled when `STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG` is true as well (introduced in [PR 50123](https://github.com/apache/spark/pull/50123/files#diff-7c577967a171f51523afddd7a5eca49806432fae2da8dc35c114d9699e6e3e40)) We also added a StateStoreCustomMetric `CUSTOM_METRIC_FORCE_SNAPSHOT` to track the number of forced snapshot The changes made in this PR includes - `StateStoreCoordinator` now keeps track of a list of lagging state stores and update it accordingly on receiveing `LogLaggingStateStores` (update the set) and `DeactivateInstances` (remove all lagging stores from set) - Change `ReportActiveInstance` in StateStoreCoordinator to also return `shouldForceSnapshot`, which indicates if current `StateStore` should create a snapshot on next commit - Add a new param `forceSnapshotOnCommit: Boolean = false` to StateStoreProvider function `getStore`, and its inherited provider can create state stores that will force a snapshot on commit - When StateStore singleton `get()` is called, it will call `ReportActiveInstance` to check if it should create a snapshot in the next commit. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added integration test in StateStoreCoordinatorSuite, where it tests if RocksDBStateStore and HDFSBackedStateStore can both auto-trigger a snapshot on commit when delay in snapshot upload is detected Also added unit test in StateStoreBaseSuite to test if the metric is populated correctly when a snapshot is forced. ### Was this patch authored or co-authored using generative AI tooling? Yes Closes #52773 from zifeif2/trigger-snapshot. Authored-by: Ubuntu <zifei.feng@your.hostname.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
1 parent 19ff859 commit 1c283c0

File tree

15 files changed

+467
-60
lines changed

15 files changed

+467
-60
lines changed

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ public enum LogKeys implements LogKey {
787787
STREAM_CHUNK_ID,
788788
STREAM_ID,
789789
STREAM_NAME,
790+
STREAM_SHOULD_FORCE_SNAPSHOT,
790791
SUBMISSION_ID,
791792
SUBSAMPLING_RATE,
792793
SUB_QUERY,

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2710,6 +2710,18 @@ object SQLConf {
27102710
.checkValue(k => k >= 0, "Must be greater than or equal to 0")
27112711
.createWithDefault(if (Utils.isTesting) 1 else 0)
27122712

2713+
val STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG =
2714+
buildConf("spark.sql.streaming.stateStore.forceSnapshotUploadOnLag")
2715+
.internal()
2716+
.doc(
2717+
"When enabled, state stores with lagging snapshot uploads will automatically trigger " +
2718+
"a snapshot on the next commit. Requires spark.sql.streaming.stateStore.coordinator" +
2719+
"ReportSnapshotUploadLag to be true."
2720+
)
2721+
.version("4.2.0")
2722+
.booleanConf
2723+
.createWithDefault(false)
2724+
27132725
val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
27142726
buildConf("spark.sql.streaming.internal.stateStore.partitions")
27152727
.doc("WARN: This config is used internally and is not intended to be user-facing. This " +
@@ -6921,6 +6933,17 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
69216933
def stateStoreRowChecksumReadVerificationRatio: Long =
69226934
getConf(STATE_STORE_ROW_CHECKSUM_READ_VERIFICATION_RATIO)
69236935

6936+
def stateStoreForceSnapshotUploadOnLag: Boolean = {
6937+
val value = getConf(STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG)
6938+
if (value && !stateStoreCoordinatorReportSnapshotUploadLag) {
6939+
throw new IllegalArgumentException(
6940+
"spark.sql.streaming.stateStore.forceSnapshotUploadOnLag can only be true if " +
6941+
"spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag is also true."
6942+
)
6943+
}
6944+
value
6945+
}
6946+
69246947
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
69256948

69266949
def checkpointFileChecksumEnabled: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED)

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
101101
assert(lastProgress.stateOperators.nonEmpty)
102102
assert(
103103
lastProgress.stateOperators.head.customMetrics.keySet().asScala == Set(
104+
"forceSnapshotCount",
104105
"loadedMapCacheHitCount",
105106
"loadedMapCacheMissCount",
106107
"numSnapshotsAutoRepaired",

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
109109
/** Implementation of [[StateStore]] API which is backed by an HDFS-compatible file system */
110110
class HDFSBackedStateStore(
111111
val version: Long,
112-
private val mapToUpdate: HDFSBackedStateStoreMap)
112+
private val mapToUpdate: HDFSBackedStateStoreMap,
113+
shouldForceSnapshot: Boolean = false)
113114
extends StateStore {
114115

115116
/** Trait and classes representing the internal state of the store */
@@ -196,7 +197,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
196197
override def commit(): Long = {
197198
try {
198199
verify(state == UPDATING, "Cannot commit after already committed or aborted")
199-
commitUpdates(newVersion, mapToUpdate, compressedStream)
200+
commitUpdates(newVersion, mapToUpdate, compressedStream, shouldForceSnapshot)
200201
state = COMMITTED
201202
logInfo(log"Committed version ${MDC(LogKeys.COMMITTED_VERSION, newVersion)} " +
202203
log"for ${MDC(LogKeys.STATE_STORE_PROVIDER, this)} to file " +
@@ -254,7 +255,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
254255
val customMetrics = metricsFromProvider.flatMap { case (name, value) =>
255256
// just allow searching from list cause the list is small enough
256257
supportedCustomMetrics.find(_.name == name).map(_ -> value)
257-
} + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate))
258+
} + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate)) +
259+
(metricForceSnapshot -> (if (shouldForceSnapshot) 1L else 0L))
258260

259261
val instanceMetrics = Map(
260262
instanceMetricSnapshotLastUpload.withNewId(
@@ -317,16 +319,21 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
317319
}
318320

319321
/** Get the state store for making updates to create a new `version` of the store. */
320-
override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = {
322+
override def getStore(
323+
version: Long,
324+
uniqueId: Option[String] = None,
325+
forceSnapshotOnCommit: Boolean = false): StateStore = {
321326
if (uniqueId.isDefined) {
322327
throw StateStoreErrors.stateStoreCheckpointIdsNotSupported(
323328
"HDFSBackedStateStoreProvider does not support checkpointFormatVersion > 1 " +
324329
"but a state store checkpointID is passed in")
325330
}
326331
val newMap = getLoadedMapForStore(version)
327332
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, version)} " +
328-
log"of ${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
329-
new HDFSBackedStateStore(version, newMap)
333+
log"of ${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} " +
334+
log"for update, forceSnapshotOnCommit=" +
335+
log"${MDC(LogKeys.STREAM_SHOULD_FORCE_SNAPSHOT, forceSnapshotOnCommit)}")
336+
new HDFSBackedStateStore(version, newMap, forceSnapshotOnCommit)
330337
}
331338

332339
/** Get the state store for reading to specific `version` of the store. */
@@ -446,8 +453,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
446453

447454
override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {
448455
metricStateOnCurrentVersionSizeBytes :: metricLoadedMapCacheHit :: metricLoadedMapCacheMiss ::
449-
metricNumSnapshotsAutoRepaired ::
450-
Nil
456+
metricNumSnapshotsAutoRepaired :: metricForceSnapshot :: Nil
451457
}
452458

453459
override def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] =
@@ -524,6 +530,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
524530
StateStoreCustomSumMetric("numSnapshotsAutoRepaired",
525531
"number of snapshots that were automatically repaired during store load")
526532

533+
private lazy val metricForceSnapshot: StateStoreCustomMetric =
534+
StateStoreCustomSumMetric("forceSnapshotCount",
535+
"number of stores that had forced snapshot")
536+
527537
private lazy val instanceMetricSnapshotLastUpload: StateStoreInstanceMetric =
528538
StateStoreSnapshotLastUploadInstanceMetric()
529539

@@ -532,9 +542,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
532542
private def commitUpdates(
533543
newVersion: Long,
534544
map: HDFSBackedStateStoreMap,
535-
output: DataOutputStream): Unit = {
545+
output: DataOutputStream,
546+
shouldForceSnapshot: Boolean = false): Unit = {
536547
synchronized {
537548
finalizeDeltaFile(output)
549+
if (shouldForceSnapshot) {
550+
writeSnapshotFile(newVersion, map, "commit")
551+
}
538552
putStateIntoStateCacheMap(newVersion, map)
539553
}
540554
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1469,13 +1469,17 @@ class RocksDB(
14691469
* - Create a RocksDB checkpoint in a new local dir
14701470
* - Sync the checkpoint dir files to DFS
14711471
*/
1472-
def commit(): (Long, StateStoreCheckpointInfo) = {
1472+
def commit(forceSnapshot: Boolean = false): (Long, StateStoreCheckpointInfo) = {
14731473
commitLatencyMs.clear()
14741474
updateMemoryUsageIfNeeded()
14751475
val newVersion = loadedVersion + 1
14761476
try {
14771477
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}")
14781478

1479+
if (forceSnapshot) {
1480+
shouldForceSnapshot.set(true)
1481+
}
1482+
14791483
var snapshot: Option[RocksDBSnapshot] = None
14801484
if (shouldCreateSnapshot() || shouldForceSnapshot.get()) {
14811485
val (newSnapshot, snapshotLatency) = createSnapshot(newVersion, sessionStateStoreCkptId)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ private[sql] class RocksDBStateStoreProvider
4646
class RocksDBStateStore(
4747
lastVersion: Long,
4848
private[RocksDBStateStoreProvider] val stamp: Long,
49-
private[RocksDBStateStoreProvider] var readOnly: Boolean) extends StateStore {
49+
private[RocksDBStateStoreProvider] var readOnly: Boolean,
50+
private[RocksDBStateStoreProvider] var forceSnapshotOnCommit: Boolean) extends StateStore {
5051

5152
private sealed trait OPERATION
5253
private case object UPDATE extends OPERATION
@@ -448,7 +449,7 @@ private[sql] class RocksDBStateStoreProvider
448449
validateState(UPDATING)
449450
try {
450451
stateMachine.verifyStamp(stamp)
451-
val (newVersion, newCheckpointInfo) = rocksDB.commit()
452+
val (newVersion, newCheckpointInfo) = rocksDB.commit(forceSnapshotOnCommit)
452453
checkpointInfo = Some(newCheckpointInfo)
453454
storedMetrics = rocksDB.metricsOpt
454455
validateAndTransitionState(COMMIT)
@@ -568,7 +569,8 @@ private[sql] class RocksDBStateStoreProvider
568569
CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS -> rocksDBMetrics.numInternalKeys,
569570
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES -> internalColFamilyCnt(),
570571
CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES -> externalColFamilyCnt(),
571-
CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED -> rocksDBMetrics.numSnapshotsAutoRepaired
572+
CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED -> rocksDBMetrics.numSnapshotsAutoRepaired,
573+
CUSTOM_METRIC_FORCE_SNAPSHOT -> (if (forceSnapshotOnCommit) 1L else 0L)
572574
) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
573575
Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map())
574576

@@ -716,13 +718,15 @@ private[sql] class RocksDBStateStoreProvider
716718
* @param uniqueId Optional unique identifier for checkpoint
717719
* @param readOnly Whether to open the store in read-only mode
718720
* @param existingStore Optional existing store to reuse instead of creating a new one
721+
* @param forceSnapshotOnCommit Whether to force a snapshot upload on commit
719722
* @return The loaded state store
720723
*/
721724
private def loadStateStore(
722725
version: Long,
723726
uniqueId: Option[String] = None,
724727
readOnly: Boolean,
725-
existingStore: Option[RocksDBStateStore] = None): StateStore = {
728+
existingStore: Option[RocksDBStateStore] = None,
729+
forceSnapshotOnCommit: Boolean = false): StateStore = {
726730
var acquiredStamp: Option[Long] = None
727731
var storeLoaded = false
728732
try {
@@ -761,11 +765,12 @@ private[sql] class RocksDBStateStoreProvider
761765
case Some(store: RocksDBStateStore) =>
762766
// Mark store as being used for write operations
763767
store.readOnly = readOnly
768+
store.forceSnapshotOnCommit = forceSnapshotOnCommit
764769
store
765770
case None =>
766771
// Create new store instance. The stamp should be defined
767772
// in this case
768-
new RocksDBStateStore(version, stamp.get, readOnly)
773+
new RocksDBStateStore(version, stamp.get, readOnly, forceSnapshotOnCommit)
769774
}
770775
storeLoaded = true
771776
store
@@ -792,23 +797,34 @@ private[sql] class RocksDBStateStoreProvider
792797
}
793798

794799
override def getStore(
795-
version: Long, uniqueId: Option[String] = None): StateStore = {
796-
loadStateStore(version, uniqueId, readOnly = false)
800+
version: Long,
801+
uniqueId: Option[String] = None,
802+
forceSnapshotOnCommit: Boolean = false): StateStore = {
803+
loadStateStore(
804+
version,
805+
uniqueId,
806+
readOnly = false,
807+
forceSnapshotOnCommit = forceSnapshotOnCommit
808+
)
797809
}
798810

799811
override def upgradeReadStoreToWriteStore(
800812
readStore: ReadStateStore,
801813
version: Long,
802-
uniqueId: Option[String] = None): StateStore = {
814+
uniqueId: Option[String] = None,
815+
forceSnapshotOnCommit: Boolean = false): StateStore = {
803816
assert(version == readStore.version,
804817
s"Can only upgrade readStore to writeStore with the same version," +
805818
s" readStoreVersion: ${readStore.version}, writeStoreVersion: ${version}")
806819
assert(this.stateStoreId == readStore.id, "Can only upgrade readStore to writeStore with" +
807820
" the same stateStoreId")
808821
assert(readStore.isInstanceOf[RocksDBStateStore], "Can only upgrade state store if it is a " +
809822
"RocksDBStateStore")
810-
loadStateStore(version, uniqueId, readOnly = false, existingStore =
811-
Some(readStore.asInstanceOf[RocksDBStateStore]))
823+
loadStateStore(version,
824+
uniqueId,
825+
readOnly = false,
826+
existingStore = Some(readStore.asInstanceOf[RocksDBStateStore]),
827+
forceSnapshotOnCommit = forceSnapshotOnCommit)
812828
}
813829

814830
override def getReadStore(
@@ -932,7 +948,7 @@ private[sql] class RocksDBStateStoreProvider
932948
endVersion,
933949
snapshotVersionStateStoreCkptId,
934950
endVersionStateStoreCkptId)
935-
new RocksDBStateStore(endVersion, stamp, readOnly)
951+
new RocksDBStateStore(endVersion, stamp, readOnly, forceSnapshotOnCommit = false)
936952
} catch {
937953
case e: Throwable =>
938954
stateMachine.releaseStamp(stamp)
@@ -1283,6 +1299,9 @@ object RocksDBStateStoreProvider {
12831299
"rocksdbNumSnapshotsAutoRepaired",
12841300
"RocksDB: number of snapshots that were automatically repaired during store load")
12851301

1302+
val CUSTOM_METRIC_FORCE_SNAPSHOT = StateStoreCustomSumMetric(
1303+
"rocksdbForceSnapshotCount", "RocksDB: number of stores that had forced snapshot on commit")
1304+
12861305
val ALL_CUSTOM_METRICS = Seq(
12871306
CUSTOM_METRIC_SST_FILE_SIZE, CUSTOM_METRIC_GET_TIME, CUSTOM_METRIC_PUT_TIME,
12881307
CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME,
@@ -1297,7 +1316,8 @@ object RocksDBStateStoreProvider {
12971316
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES_KEYS,
12981317
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES,
12991318
CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME, CUSTOM_METRIC_LOAD_TIME, CUSTOM_METRIC_REPLAY_CHANGE_LOG,
1300-
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES, CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED)
1319+
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES, CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED,
1320+
CUSTOM_METRIC_FORCE_SNAPSHOT)
13011321

13021322
val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric()
13031323

0 commit comments

Comments
 (0)