Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,24 @@
},
"sqlState" : "KD000"
},
"INCOMPATIBLE_FLOW_READ" : {
"message" : [
"Incompatible read issued for flow <flowIdentifier>."
],
"subClass" : {
"BATCH_READ_ON_STREAMING_FLOW" : {
"message" : [
"Attempted to issue batch read on a streaming flow."
]
},
"STREAMING_READ_ON_BATCH_FLOW" : {
"message" : [
"Attempted to issue a streaming read on a batch flow"
]
}
},
"sqlState": "42000"
},
"INCOMPATIBLE_JOIN_TYPES" : {
"message" : [
"The join types <joinType1> and <joinType2> are incompatible."
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
identifier = table.identifier,
specifiedSchema = table.specifiedSchema,
incomingFlowIdentifiers = flowsToTable.map(_.identifier).toSet,
availableFlows = resolvedFlowsToTable
availableFlows = resolvedFlowsToTable,
isStreamingTable = table.isStreamingTable
)
resolvedInputs.put(table.identifier, virtualTableInput)
Seq(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.sql.pipelines.graph
import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.pipelines.util.InputReadOptions
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -99,8 +98,7 @@ case class FlowFunctionResult(
streamingInputs: Set[ResolvedInput],
usedExternalInputs: Set[TableIdentifier],
dataFrame: Try[DataFrame],
sqlConf: Map[String, String],
analysisWarnings: Seq[AnalysisWarning] = Nil) {
sqlConf: Map[String, String]) {

/**
* Returns the names of all of the [[Input]]s used when resolving this [[Flow]]. If the
Expand Down Expand Up @@ -165,7 +163,22 @@ trait ResolvedFlow extends ResolutionCompletedFlow with Input {

/** Returns the schema of the output of this [[Flow]]. */
def schema: StructType = df.schema
override def load(readOptions: InputReadOptions): DataFrame = df

override def load(asStreaming: Boolean): DataFrame = {
if (asStreaming && !df.isStreaming) {
throw new AnalysisException(
"INCOMPATIBLE_FLOW_READ.BATCH_READ_ON_STREAMING_FLOW",
Map("flowIdentifier" -> identifier.quotedString)
)
}
if (!asStreaming && df.isStreaming) {
throw new AnalysisException(
"INCOMPATIBLE_FLOW_READ.STREAMING_READ_ON_BATCH_FLOW",
Map("flowIdentifier" -> identifier.quotedString)
)
}
df
}
def inputs: Set[TableIdentifier] = funcResult.inputs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package org.apache.spark.sql.pipelines.graph

import scala.util.Try
import scala.util.{Failure, Success, Try}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.classic.{DataFrame, Dataset, DataStreamReader, SparkSession}
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.classic.{DataFrame, DataFrameReader, Dataset, DataStreamReader, SparkSession}
import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier}
import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions}


object FlowAnalysis {
Expand Down Expand Up @@ -67,8 +65,7 @@ object FlowAnalysis {
streamingInputs = ctx.streamingInputs.toSet,
usedExternalInputs = ctx.externalInputs.toSet,
dataFrame = df,
sqlConf = confs,
analysisWarnings = ctx.analysisWarnings.toList
sqlConf = confs
)
}
}
Expand Down Expand Up @@ -116,8 +113,7 @@ object FlowAnalysis {
val resolved = readStreamInput(
context,
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
spark.readStream,
streamingReadOptions = StreamingReadOptions()
streamReader = spark.readStream.options(u.options)
).queryExecution.analyzed
// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan
// to allow correct analysis of the parent plan that contains this subquery
Expand All @@ -128,7 +124,7 @@ object FlowAnalysis {
val resolved = readBatchInput(
context,
name = IdentifierHelper.toQuotedString(u.multipartIdentifier),
batchReadOptions = BatchReadOptions()
batchReader = spark.read.options(u.options)
).queryExecution.analyzed
// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan
// to allow correct analysis of the parent plan that contains this subquery
Expand All @@ -147,23 +143,25 @@ object FlowAnalysis {
* All the public APIs that read from a dataset should call this function to read the dataset.
*
* @param name the name of the Dataset to be read.
* @param batchReadOptions Options for this batch read
* @param batchReader the batch dataframe reader, possibly with options, to execute the read
* with.
* @return batch DataFrame that represents data from the specified Dataset.
*/
final private def readBatchInput(
context: FlowAnalysisContext,
name: String,
batchReadOptions: BatchReadOptions
batchReader: DataFrameReader
): DataFrame = {
GraphIdentifierManager.parseAndQualifyInputIdentifier(context, name) match {
case inputIdentifier: InternalDatasetIdentifier =>
readGraphInput(context, inputIdentifier, batchReadOptions)
readGraphInput(context, inputIdentifier, isStreamingRead = false)

case inputIdentifier: ExternalDatasetIdentifier =>
readExternalBatchInput(
context,
inputIdentifier = inputIdentifier,
name = name
name = name,
batchReader = batchReader
)
}
}
Expand All @@ -177,21 +175,19 @@ object FlowAnalysis {
*
* @param name the name of the Dataset to be read.
* @param streamReader The [[DataStreamReader]] that may hold read options specified by the user.
* @param streamingReadOptions Options for this streaming read.
* @return streaming DataFrame that represents data from the specified Dataset.
*/
final private def readStreamInput(
context: FlowAnalysisContext,
name: String,
streamReader: DataStreamReader,
streamingReadOptions: StreamingReadOptions
streamReader: DataStreamReader
): DataFrame = {
GraphIdentifierManager.parseAndQualifyInputIdentifier(context, name) match {
case inputIdentifier: InternalDatasetIdentifier =>
readGraphInput(
context,
inputIdentifier,
streamingReadOptions
isStreamingRead = true
)

case inputIdentifier: ExternalDatasetIdentifier =>
Expand All @@ -208,13 +204,13 @@ object FlowAnalysis {
* Internal helper to reference dataset defined in the same [[DataflowGraph]].
*
* @param inputIdentifier The identifier of the Dataset to be read.
* @param readOptions Options for this read (may be either streaming or batch options)
* @param isStreamingRead Whether this is a streaming read or batch read.
* @return streaming or batch DataFrame that represents data from the specified Dataset.
*/
final private def readGraphInput(
ctx: FlowAnalysisContext,
inputIdentifier: InternalDatasetIdentifier,
readOptions: InputReadOptions
isStreamingRead: Boolean
): DataFrame = {
val datasetIdentifier = inputIdentifier.identifier

Expand All @@ -231,17 +227,38 @@ object FlowAnalysis {
ctx.availableInput(datasetIdentifier)
}

val inputDF = i.load(readOptions)
val inputDF = Try {
i.load(asStreaming = isStreamingRead)
} match {
case Success(df) => df
case Failure(ex: AnalysisException) => ex.errorClass match {
// Views are simply resolved as the flows they read from during graph construction, so we
// know a flow load exception here directly corresponds to a reading a view specifically.
// Rethrow relevant exceptions appropriately, with the view's identifier.
case Some("INCOMPATIBLE_FLOW_READ.BATCH_READ_ON_STREAMING_FLOW") =>
throw new AnalysisException(
"INCOMPATIBLE_BATCH_VIEW_READ",
Map("datasetIdentifier" -> datasetIdentifier.toString)
)
case Some("INCOMPATIBLE_FLOW_READ.STREAMING_READ_ON_BATCH_FLOW") =>
throw new AnalysisException(
"INCOMPATIBLE_STREAMING_VIEW_READ",
Map("datasetIdentifier" -> datasetIdentifier.toString)
)
case _ =>
throw ex
}
case Failure(ex: Throwable) =>
throw ex
}

i match {
// If the referenced input is a [[Flow]], because the query plans will be fused
// together, we also need to fuse their confs.
case f: Flow => f.sqlConf.foreach { case (k, v) => ctx.setConf(k, v) }
case _ =>
}

val incompatibleViewReadCheck =
ctx.spark.conf.get("pipelines.incompatibleViewCheck.enabled", "true").toBoolean

// Wrap the DF in an alias so that columns in the DF can be referenced with
// the following in the query:
// - <catalog>.<schema>.<dataset>.<column>
Expand All @@ -252,30 +269,10 @@ object FlowAnalysis {
qualifier = Seq(datasetIdentifier.catalog, datasetIdentifier.database).flatten
)

readOptions match {
case sro: StreamingReadOptions =>
if (!inputDF.isStreaming && incompatibleViewReadCheck) {
throw new AnalysisException(
"INCOMPATIBLE_BATCH_VIEW_READ",
Map("datasetIdentifier" -> datasetIdentifier.toString)
)
}

if (sro.droppedUserOptions.nonEmpty) {
ctx.analysisWarnings += AnalysisWarning.StreamingReaderOptionsDropped(
sourceName = datasetIdentifier.unquotedString,
droppedOptions = sro.droppedUserOptions.keys.toSeq
)
}
ctx.streamingInputs += ResolvedInput(i, aliasIdentifier)
case _ =>
if (inputDF.isStreaming && incompatibleViewReadCheck) {
throw new AnalysisException(
"INCOMPATIBLE_STREAMING_VIEW_READ",
Map("datasetIdentifier" -> datasetIdentifier.toString)
)
}
ctx.batchInputs += ResolvedInput(i, aliasIdentifier)
if (isStreamingRead) {
ctx.streamingInputs += ResolvedInput(i, aliasIdentifier)
} else {
ctx.batchInputs += ResolvedInput(i, aliasIdentifier)
}
Dataset.ofRows(
ctx.spark,
Expand All @@ -293,11 +290,11 @@ object FlowAnalysis {
final private def readExternalBatchInput(
context: FlowAnalysisContext,
inputIdentifier: ExternalDatasetIdentifier,
name: String): DataFrame = {
name: String,
batchReader: DataFrameReader): DataFrame = {

val spark = context.spark
context.externalInputs += inputIdentifier.identifier
spark.read.table(inputIdentifier.identifier.quotedString)
batchReader.table(inputIdentifier.identifier.quotedString)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.spark.sql.pipelines.graph

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.pipelines.AnalysisWarning

/**
* A context used when evaluating a `Flow`'s query into a concrete DataFrame.
Expand All @@ -44,7 +42,6 @@ private[pipelines] case class FlowAnalysisContext(
streamingInputs: mutable.HashSet[ResolvedInput] = mutable.HashSet.empty,
requestedInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty,
shouldLowerCaseNames: Boolean = false,
analysisWarnings: mutable.Buffer[AnalysisWarning] = new ListBuffer[AnalysisWarning],
spark: SparkSession,
externalInputs: mutable.HashSet[TableIdentifier] = mutable.HashSet.empty
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ trait GraphValidations extends Logging {
}

protected def validateUserSpecifiedSchemas(): Unit = {
flows.flatMap(f => table.get(f.identifier)).foreach { t: TableInput =>
flows.flatMap(f => table.get(f.identifier)).foreach { t: TableElement =>
// The output inferred schema of a table is the declared schema merged with the
// schema of all incoming flows. This must be equivalent to the declared schema.
val inferredSchema = SchemaInferenceUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ object State extends Logging {
* @param graph The graph to reset.
* @param env The current update context.
*/
private def findElementsToReset(graph: DataflowGraph, env: PipelineUpdateContext): Seq[Input] = {
private def findElementsToReset(
graph: DataflowGraph,
env: PipelineUpdateContext): Seq[GraphElement] = {
// If tableFilter is an instance of SomeTables, this is a refresh selection and all tables
// to reset should be resettable; Otherwise, this is a full graph update, and we reset all
// tables that are resettable.
Expand Down Expand Up @@ -71,8 +73,8 @@ object State extends Logging {
* - Clearing checkpoint data
* - Truncating table data
*/
def reset(resolvedGraph: DataflowGraph, env: PipelineUpdateContext): Seq[Input] = {
val elementsToReset: Seq[Input] = findElementsToReset(resolvedGraph, env)
def reset(resolvedGraph: DataflowGraph, env: PipelineUpdateContext): Seq[GraphElement] = {
val elementsToReset: Seq[GraphElement] = findElementsToReset(resolvedGraph, env)

elementsToReset.foreach {
case f: ResolvedFlow => reset(f, env, resolvedGraph)
Expand Down
Loading