Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ sealed trait CatalystIdentifier {
*/
private def quoteIdentifier(name: String): String = name.replace("`", "``")

def resolvedId: String = quoteIdentifier(identifier)
def resolvedDb: Option[String] = database.map(quoteIdentifier)
def resolvedCatalog: Option[String] = catalog.map(quoteIdentifier)

def quotedString: String = {
if (resolvedCatalog.isDefined && resolvedDb.isDefined) {
s"`${resolvedCatalog.get}`.`${resolvedDb.get}`.`$resolvedId`"
} else if (resolvedDb.isDefined) {
s"`${resolvedDb.get}`.`$resolvedId`"
val replacedId = quoteIdentifier(identifier)
val replacedDb = database.map(quoteIdentifier)
val replacedCatalog = catalog.map(quoteIdentifier)

if (replacedCatalog.isDefined && replacedDb.isDefined) {
s"`${replacedCatalog.get}`.`${replacedDb.get}`.`$replacedId`"
} else if (replacedDb.isDefined) {
s"`${replacedDb.get}`.`$replacedId`"
} else {
s"`$resolvedId`"
s"`$replacedId`"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ private[connect] object PipelinesHandler extends Logging {
val resolvedDataset =
defineDataset(cmd.getDefineDataset, sessionHolder)
val identifierBuilder = ResolvedIdentifier.newBuilder()
resolvedDataset.resolvedCatalog.foreach(identifierBuilder.setCatalogName)
resolvedDataset.resolvedDb.foreach { ns =>
resolvedDataset.catalog.foreach(identifierBuilder.setCatalogName)
resolvedDataset.database.foreach { ns =>
identifierBuilder.addNamespace(ns)
}
identifierBuilder.setTableName(resolvedDataset.resolvedId)
identifierBuilder.setTableName(resolvedDataset.identifier)
val identifier = identifierBuilder.build()
PipelineCommandResult
.newBuilder()
Expand All @@ -103,11 +103,11 @@ private[connect] object PipelinesHandler extends Logging {
val resolvedFlow =
defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder)
val identifierBuilder = ResolvedIdentifier.newBuilder()
resolvedFlow.resolvedCatalog.foreach(identifierBuilder.setCatalogName)
resolvedFlow.resolvedDb.foreach { ns =>
resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName)
resolvedFlow.database.foreach { ns =>
identifierBuilder.addNamespace(ns)
}
identifierBuilder.setTableName(resolvedFlow.resolvedId)
identifierBuilder.setTableName(resolvedFlow.identifier)
val identifier = identifierBuilder.build()
PipelineCommandResult
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ class SparkDeclarativePipelinesServerSuite
datasetName: String,
defaultCatalog: String = "",
defaultDatabase: String = "",
expectedResolvedDatasetName: String,
expectedResolvedCatalog: String,
expectedResolvedNamespace: Seq[String])

Expand All @@ -503,18 +504,21 @@ class SparkDeclarativePipelinesServerSuite
name = "TEMPORARY_VIEW",
datasetType = DatasetType.TEMPORARY_VIEW,
datasetName = "tv",
expectedResolvedDatasetName = "tv",
expectedResolvedCatalog = "",
expectedResolvedNamespace = Seq.empty),
DefineDatasetTestCase(
name = "TABLE",
datasetType = DatasetType.TABLE,
datasetName = "tb",
datasetName = "`tb`",
expectedResolvedDatasetName = "tb",
expectedResolvedCatalog = "spark_catalog",
expectedResolvedNamespace = Seq("default")),
DefineDatasetTestCase(
name = "MV",
datasetType = DatasetType.MATERIALIZED_VIEW,
datasetName = "mv",
expectedResolvedDatasetName = "mv",
expectedResolvedCatalog = "spark_catalog",
expectedResolvedNamespace = Seq("default"))).map(tc => tc.name -> tc).toMap

Expand All @@ -525,22 +529,25 @@ class SparkDeclarativePipelinesServerSuite
datasetName = "tv",
defaultCatalog = "custom_catalog",
defaultDatabase = "custom_db",
expectedResolvedDatasetName = "tv",
expectedResolvedCatalog = "",
expectedResolvedNamespace = Seq.empty),
DefineDatasetTestCase(
name = "TABLE",
datasetType = DatasetType.TABLE,
datasetName = "tb",
defaultCatalog = "my_catalog",
defaultDatabase = "my_db",
expectedResolvedCatalog = "my_catalog",
expectedResolvedNamespace = Seq("my_db")),
datasetName = "`tb`",
defaultCatalog = "`my_catalog`",
defaultDatabase = "`my_db`",
expectedResolvedDatasetName = "tb",
expectedResolvedCatalog = "`my_catalog`",
expectedResolvedNamespace = Seq("`my_db`")),
DefineDatasetTestCase(
name = "MV",
datasetType = DatasetType.MATERIALIZED_VIEW,
datasetName = "mv",
defaultCatalog = "another_catalog",
defaultDatabase = "another_db",
expectedResolvedDatasetName = "mv",
expectedResolvedCatalog = "another_catalog",
expectedResolvedNamespace = Seq("another_db")))
.map(tc => tc.name -> tc)
Expand Down Expand Up @@ -571,7 +578,7 @@ class SparkDeclarativePipelinesServerSuite

assert(identifier.getCatalogName == testCase.expectedResolvedCatalog)
assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace)
assert(identifier.getTableName == testCase.datasetName)
assert(identifier.getTableName == testCase.expectedResolvedDatasetName)
}
}

Expand Down Expand Up @@ -608,7 +615,7 @@ class SparkDeclarativePipelinesServerSuite

assert(identifier.getCatalogName == testCase.expectedResolvedCatalog)
assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace)
assert(identifier.getTableName == testCase.datasetName)
assert(identifier.getTableName == testCase.expectedResolvedDatasetName)
}
}

Expand All @@ -618,16 +625,18 @@ class SparkDeclarativePipelinesServerSuite
flowName: String,
defaultCatalog: String,
defaultDatabase: String,
expectedResolvedFlowName: String,
expectedResolvedCatalog: String,
expectedResolvedNamespace: Seq[String])

private val defineFlowDefaultTests = Seq(
DefineFlowTestCase(
name = "MV",
datasetType = DatasetType.MATERIALIZED_VIEW,
flowName = "mv",
defaultCatalog = "spark_catalog",
defaultDatabase = "default",
flowName = "`mv`",
defaultCatalog = "`spark_catalog`",
defaultDatabase = "`default`",
expectedResolvedFlowName = "mv",
expectedResolvedCatalog = "spark_catalog",
expectedResolvedNamespace = Seq("default")),
DefineFlowTestCase(
Expand All @@ -636,6 +645,7 @@ class SparkDeclarativePipelinesServerSuite
flowName = "tv",
defaultCatalog = "spark_catalog",
defaultDatabase = "default",
expectedResolvedFlowName = "tv",
expectedResolvedCatalog = "",
expectedResolvedNamespace = Seq.empty)).map(tc => tc.name -> tc).toMap

Expand All @@ -646,6 +656,7 @@ class SparkDeclarativePipelinesServerSuite
flowName = "mv",
defaultCatalog = "custom_catalog",
defaultDatabase = "custom_db",
expectedResolvedFlowName = "mv",
expectedResolvedCatalog = "custom_catalog",
expectedResolvedNamespace = Seq("custom_db")),
DefineFlowTestCase(
Expand All @@ -654,6 +665,7 @@ class SparkDeclarativePipelinesServerSuite
flowName = "tv",
defaultCatalog = "custom_catalog",
defaultDatabase = "custom_db",
expectedResolvedFlowName = "tv",
expectedResolvedCatalog = "",
expectedResolvedNamespace = Seq.empty)).map(tc => tc.name -> tc).toMap

Expand Down Expand Up @@ -711,7 +723,7 @@ class SparkDeclarativePipelinesServerSuite

assert(identifier.getCatalogName == testCase.expectedResolvedCatalog)
assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace)
assert(identifier.getTableName == testCase.flowName)
assert(identifier.getTableName == testCase.expectedResolvedFlowName)
}
}

Expand Down Expand Up @@ -775,7 +787,7 @@ class SparkDeclarativePipelinesServerSuite

assert(identifier.getCatalogName == testCase.expectedResolvedCatalog)
assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace)
assert(identifier.getTableName == testCase.flowName)
assert(identifier.getTableName == testCase.expectedResolvedFlowName)
}
}
}