Skip to content

Commit cd1601b

Browse files
micheal-oanishshri-db
authored andcommitted
[SPARK-54346][SS] Introduce state repartition API and repartition runner
### What changes were proposed in this pull request? Introducing the API for offline repartitioning of streaming state. This is currently not exposed, since it is still in development. Also implemented some of the core functionalities of the repartition batch runner, that validates the checkpoint, creates the repartition batch and commits. Subsequent PRs will build on this. Also Spark connect and pyspark APIs will be added in subsequent PRs. Also introduce the streamingCheckpointManager for performing operations on the streaming checkpoint. This is currently not exposed, since it is still in development. ### Why are the changes needed? Streaming state repartitioning ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test suite added ### Was this patch authored or co-authored using generative AI tooling? No Closes #53056 from micheal-o/repartition_api. Authored-by: micheal-o <micheal.okutubo@gmail.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
1 parent 6227fba commit cd1601b

File tree

13 files changed

+1020
-11
lines changed

13 files changed

+1020
-11
lines changed

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public enum LogKeys implements LogKey {
192192
END_INDEX,
193193
END_POINT,
194194
END_VERSION,
195+
ENFORCE_EXACTLY_ONCE,
195196
ENGINE,
196197
EPOCH,
197198
ERROR,

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5428,6 +5428,85 @@
54285428
],
54295429
"sqlState" : "42802"
54305430
},
5431+
"STATE_REPARTITION_INVALID_CHECKPOINT" : {
5432+
"message" : [
5433+
"The provided checkpoint location '<checkpointLocation>' is in an invalid state."
5434+
],
5435+
"subClass" : {
5436+
"LAST_BATCH_ABANDONED_REPARTITION" : {
5437+
"message" : [
5438+
"The last batch ID <lastBatchId> is a repartition batch with <lastBatchShufflePartitions> shuffle partitions and didn't finish successfully.",
5439+
"You're now requesting to repartition to <numPartitions> shuffle partitions.",
5440+
"Please retry with the same number of shuffle partitions as the previous attempt.",
5441+
"Once that completes successfully, you can repartition to another number of shuffle partitions."
5442+
]
5443+
},
5444+
"LAST_BATCH_FAILED" : {
5445+
"message" : [
5446+
"The last batch ID <lastBatchId> didn't finish successfully. Please make sure the streaming query finishes successfully, before repartitioning.",
5447+
"If using ProcessingTime trigger, you can use AvailableNow trigger instead, which will make sure the query terminates successfully by itself.",
5448+
"If you want to skip this check, set enforceExactlyOnceSink parameter in repartition to false.",
5449+
"But this can cause duplicate output records from the failed batch when using exactly-once sinks."
5450+
]
5451+
},
5452+
"MISSING_OFFSET_SEQ_METADATA" : {
5453+
"message" : [
5454+
"The OffsetSeq (v<version>) metadata is missing for batch ID <batchId>. Please make sure the checkpoint is from a supported Spark version (Spark 4.0+)."
5455+
]
5456+
},
5457+
"NO_BATCH_FOUND" : {
5458+
"message" : [
5459+
"No microbatch has been recorded in the checkpoint location. Make sure the streaming query has successfully completed at least one microbatch before repartitioning."
5460+
]
5461+
},
5462+
"NO_COMMITTED_BATCH" : {
5463+
"message" : [
5464+
"There is no committed microbatch. Make sure the streaming query has successfully completed at least one microbatch before repartitioning."
5465+
]
5466+
},
5467+
"OFFSET_SEQ_NOT_FOUND" : {
5468+
"message" : [
5469+
"Offset sequence entry for batch ID <batchId> not found. You might have set a very low value for",
5470+
"'spark.sql.streaming.minBatchesToRetain' config during the streaming query execution or you deleted files in the checkpoint location."
5471+
]
5472+
},
5473+
"SHUFFLE_PARTITIONS_ALREADY_MATCH" : {
5474+
"message" : [
5475+
"The number of shuffle partitions in the last committed batch (id=<batchId>) is the same as the requested <numPartitions> partitions.",
5476+
"Hence, already has the requested number of partitions, so no-op."
5477+
]
5478+
},
5479+
"UNSUPPORTED_OFFSET_SEQ_VERSION" : {
5480+
"message" : [
5481+
"Unsupported offset sequence version <version>. Please make sure the checkpoint is from a supported Spark version (Spark 4.0+)."
5482+
]
5483+
}
5484+
},
5485+
"sqlState" : "55019"
5486+
},
5487+
"STATE_REPARTITION_INVALID_PARAMETER" : {
5488+
"message" : [
5489+
"The repartition parameter <parameter> is invalid:"
5490+
],
5491+
"subClass" : {
5492+
"IS_EMPTY" : {
5493+
"message" : [
5494+
"cannot be empty."
5495+
]
5496+
},
5497+
"IS_NOT_GREATER_THAN_ZERO" : {
5498+
"message" : [
5499+
"must be greater than zero."
5500+
]
5501+
},
5502+
"IS_NULL" : {
5503+
"message" : [
5504+
"cannot be null."
5505+
]
5506+
}
5507+
},
5508+
"sqlState" : "42616"
5509+
},
54315510
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
54325511
"message" : [
54335512
"Failed to create column family with unsupported starting character and name=<colFamilyName>."
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.streaming
19+
20+
/**
21+
* A class to manage operations on streaming query checkpoints.
22+
*/
23+
private[spark] abstract class StreamingCheckpointManager {
24+
25+
/**
26+
* Repartition the stateful streaming operators state in the streaming checkpoint to have
27+
* `numPartitions` partitions. The streaming query MUST not be running. If `numPartitions` is
28+
* the same as the current number of partitions, this is a no-op, and an exception will be
29+
* thrown.
30+
*
31+
* This produces a new microbatch in the checkpoint that contains the repartitioned state i.e.
32+
* if the last streaming batch was batch `N`, this will create batch `N+1` with the
33+
* repartitioned state. Note that this new batch doesn't read input data from sources, it only
34+
* represents the repartition operation. The next time the streaming query is started, it will
35+
* pick up from this new batch.
36+
*
37+
* This will return only when the repartitioning is complete or fails.
38+
*
39+
* @note
40+
* This operation should only be performed after the streaming query has been stopped. If not,
41+
* can lead to undefined behavior or checkpoint corruption.
42+
* @param checkpointLocation
43+
* The checkpoint location of the streaming query, should be the `checkpointLocation` option
44+
* on the DataStreamWriter.
45+
* @param numPartitions
46+
* the target number of state partitions.
47+
* @param enforceExactlyOnceSink
48+
* if we shouldn't allow skipping failed batches, to avoid duplicates in exactly once sinks.
49+
*/
50+
private[spark] def repartition(
51+
checkpointLocation: String,
52+
numPartitions: Int,
53+
enforceExactlyOnceSink: Boolean = true): Unit
54+
}

sql/core/src/main/scala/org/apache/spark/sql/classic/SQLContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ class SQLContext private[sql] (override val sparkSession: SparkSession)
145145
*/
146146
def streams: StreamingQueryManager = sparkSession.streams
147147

148+
/**
149+
* Returns a `StreamingCheckpointManager` that allows managing any streaming checkpoint.
150+
*/
151+
private[spark] def streamingCheckpointManager: StreamingCheckpointManager =
152+
sparkSession.streamingCheckpointManager
153+
148154
/** @inheritdoc */
149155
override def sparkContext: SparkContext = super.sparkContext
150156

sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ class SparkSession private(
223223
@Unstable
224224
def streams: StreamingQueryManager = sessionState.streamingQueryManager
225225

226+
private[spark] def streamingCheckpointManager = sessionState.streamingCheckpointManager
227+
226228
/**
227229
* Returns an `ArtifactManager` that supports adding, managing and using session-scoped artifacts
228230
* (jars, classfiles, etc).
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.classic
19+
20+
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionErrors, OfflineStateRepartitionRunner}
22+
import org.apache.spark.sql.internal.SQLConf
23+
import org.apache.spark.sql.streaming
24+
25+
/** @inheritdoc */
26+
private[spark] class StreamingCheckpointManager(
27+
sparkSession: SparkSession,
28+
sqlConf: SQLConf) extends streaming.StreamingCheckpointManager with Logging {
29+
30+
/** @inheritdoc */
31+
override private[spark] def repartition(
32+
checkpointLocation: String,
33+
numPartitions: Int,
34+
enforceExactlyOnceSink: Boolean = true): Unit = {
35+
checkpointLocation match {
36+
case null =>
37+
throw OfflineStateRepartitionErrors.parameterIsNullError("checkpointLocation")
38+
case "" =>
39+
throw OfflineStateRepartitionErrors.parameterIsEmptyError("checkpointLocation")
40+
case _ => // Valid case, no action needed
41+
}
42+
43+
if (numPartitions <= 0) {
44+
throw OfflineStateRepartitionErrors.parameterIsNotGreaterThanZeroError("numPartitions")
45+
}
46+
47+
val runner = new OfflineStateRepartitionRunner(
48+
sparkSession,
49+
checkpointLocation,
50+
numPartitions,
51+
enforceExactlyOnceSink
52+
)
53+
runner.run()
54+
}
55+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.transformwith
4242
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE
4343
import org.apache.spark.sql.execution.streaming.runtime.StreamingQueryCheckpointMetadata
4444
import org.apache.spark.sql.execution.streaming.state.{InMemoryStateSchemaProvider, KeyStateEncoderSpec, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateSchemaCompatibilityChecker, StateSchemaMetadata, StateSchemaProvider, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreProviderId}
45+
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
4546
import org.apache.spark.sql.sources.DataSourceRegister
4647
import org.apache.spark.sql.streaming.TimeMode
4748
import org.apache.spark.sql.types.StructType
@@ -481,7 +482,8 @@ object StateSourceOptions extends DataSourceOptions {
481482
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
482483
}
483484

484-
val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, checkpointLocation)
485+
val resolvedCpLocation = StreamingUtils.resolvedCheckpointLocation(
486+
hadoopConf, checkpointLocation)
485487

486488
var batchId = Option(options.get(BATCH_ID)).map(_.toLong)
487489

@@ -617,14 +619,6 @@ object StateSourceOptions extends DataSourceOptions {
617619
startOperatorStateUniqueIds, endOperatorStateUniqueIds)
618620
}
619621

620-
private def resolvedCheckpointLocation(
621-
hadoopConf: Configuration,
622-
checkpointLocation: String): String = {
623-
val checkpointPath = new Path(checkpointLocation)
624-
val fs = checkpointPath.getFileSystem(hadoopConf)
625-
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
626-
}
627-
628622
private def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Long = {
629623
val commitLog = new StreamingQueryCheckpointMetadata(session, checkpointLocation).commitLog
630624
commitLog.getLatest() match {

0 commit comments

Comments
 (0)