From 6573c0202ca74470d4a11872c07efd96d00262d9 Mon Sep 17 00:00:00 2001 From: Jessie Luo Date: Mon, 29 Sep 2025 16:34:55 -0700 Subject: [PATCH 1/2] use unquoted for response fields --- .../spark/sql/catalyst/identifiers.scala | 18 +++++++++--------- .../connect/pipelines/PipelinesHandler.scala | 12 ++++++------ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index 625a3272d11b5..ceced9313940a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -36,17 +36,17 @@ sealed trait CatalystIdentifier { */ private def quoteIdentifier(name: String): String = name.replace("`", "``") - def resolvedId: String = quoteIdentifier(identifier) - def resolvedDb: Option[String] = database.map(quoteIdentifier) - def resolvedCatalog: Option[String] = catalog.map(quoteIdentifier) - def quotedString: String = { - if (resolvedCatalog.isDefined && resolvedDb.isDefined) { - s"`${resolvedCatalog.get}`.`${resolvedDb.get}`.`$resolvedId`" - } else if (resolvedDb.isDefined) { - s"`${resolvedDb.get}`.`$resolvedId`" + val replacedId = quoteIdentifier(identifier) + val replacedDb = database.map(quoteIdentifier) + val replacedCatalog = catalog.map(quoteIdentifier) + + if (replacedCatalog.isDefined && replacedDb.isDefined) { + s"`${replacedCatalog.get}`.`${replacedDb.get}`.`$replacedId`" + } else if (replacedDb.isDefined) { + s"`${replacedDb.get}`.`$replacedId`" } else { - s"`$resolvedId`" + s"`$replacedId`" } } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 4ff9818d13d95..1b2e039be7159 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -84,11 +84,11 @@ private[connect] object PipelinesHandler extends Logging { val resolvedDataset = defineDataset(cmd.getDefineDataset, sessionHolder) val identifierBuilder = ResolvedIdentifier.newBuilder() - resolvedDataset.resolvedCatalog.foreach(identifierBuilder.setCatalogName) - resolvedDataset.resolvedDb.foreach { ns => + resolvedDataset.catalog.foreach(identifierBuilder.setCatalogName) + resolvedDataset.database.foreach { ns => identifierBuilder.addNamespace(ns) } - identifierBuilder.setTableName(resolvedDataset.resolvedId) + identifierBuilder.setTableName(resolvedDataset.identifier) val identifier = identifierBuilder.build() PipelineCommandResult .newBuilder() @@ -103,11 +103,11 @@ private[connect] object PipelinesHandler extends Logging { val resolvedFlow = defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder) val identifierBuilder = ResolvedIdentifier.newBuilder() - resolvedFlow.resolvedCatalog.foreach(identifierBuilder.setCatalogName) - resolvedFlow.resolvedDb.foreach { ns => + resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName) + resolvedFlow.database.foreach { ns => identifierBuilder.addNamespace(ns) } - identifierBuilder.setTableName(resolvedFlow.resolvedId) + identifierBuilder.setTableName(resolvedFlow.identifier) val identifier = identifierBuilder.build() PipelineCommandResult .newBuilder() From 3a27465c24496a8800f4d723aec96a0c2e248656 Mon Sep 17 00:00:00 2001 From: Jessie Luo Date: Mon, 29 Sep 2025 16:56:42 -0700 Subject: [PATCH 2/2] edit test suite to reflect unquoted string --- ...SparkDeclarativePipelinesServerSuite.scala | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 3f92997054e47..772b43656b141 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -495,6 +495,7 @@ class SparkDeclarativePipelinesServerSuite datasetName: String, defaultCatalog: String = "", defaultDatabase: String = "", + expectedResolvedDatasetName: String, expectedResolvedCatalog: String, expectedResolvedNamespace: Seq[String]) @@ -503,18 +504,21 @@ class SparkDeclarativePipelinesServerSuite name = "TEMPORARY_VIEW", datasetType = DatasetType.TEMPORARY_VIEW, datasetName = "tv", + expectedResolvedDatasetName = "tv", expectedResolvedCatalog = "", expectedResolvedNamespace = Seq.empty), DefineDatasetTestCase( name = "TABLE", datasetType = DatasetType.TABLE, - datasetName = "tb", + datasetName = "`tb`", + expectedResolvedDatasetName = "tb", expectedResolvedCatalog = "spark_catalog", expectedResolvedNamespace = Seq("default")), DefineDatasetTestCase( name = "MV", datasetType = DatasetType.MATERIALIZED_VIEW, datasetName = "mv", + expectedResolvedDatasetName = "mv", expectedResolvedCatalog = "spark_catalog", expectedResolvedNamespace = Seq("default"))).map(tc => tc.name -> tc).toMap @@ -525,22 +529,25 @@ class SparkDeclarativePipelinesServerSuite datasetName = "tv", defaultCatalog = "custom_catalog", defaultDatabase = "custom_db", + expectedResolvedDatasetName = "tv", expectedResolvedCatalog = "", expectedResolvedNamespace = Seq.empty), DefineDatasetTestCase( name = "TABLE", datasetType = DatasetType.TABLE, - datasetName = "tb", - defaultCatalog = "my_catalog", - defaultDatabase = "my_db", - expectedResolvedCatalog = "my_catalog", - expectedResolvedNamespace = Seq("my_db")), + datasetName = "`tb`", + defaultCatalog = "`my_catalog`", + defaultDatabase = "`my_db`", + expectedResolvedDatasetName = "tb", + expectedResolvedCatalog = "`my_catalog`", + expectedResolvedNamespace = Seq("`my_db`")), DefineDatasetTestCase( name = "MV", datasetType = DatasetType.MATERIALIZED_VIEW, datasetName = "mv", defaultCatalog = "another_catalog", defaultDatabase = "another_db", + expectedResolvedDatasetName = "mv", expectedResolvedCatalog = "another_catalog", expectedResolvedNamespace = Seq("another_db"))) .map(tc => tc.name -> tc) @@ -571,7 +578,7 @@ class SparkDeclarativePipelinesServerSuite assert(identifier.getCatalogName == testCase.expectedResolvedCatalog) assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace) - assert(identifier.getTableName == testCase.datasetName) + assert(identifier.getTableName == testCase.expectedResolvedDatasetName) } } @@ -608,7 +615,7 @@ class SparkDeclarativePipelinesServerSuite assert(identifier.getCatalogName == testCase.expectedResolvedCatalog) assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace) - assert(identifier.getTableName == testCase.datasetName) + assert(identifier.getTableName == testCase.expectedResolvedDatasetName) } } @@ -618,6 +625,7 @@ class SparkDeclarativePipelinesServerSuite flowName: String, defaultCatalog: String, defaultDatabase: String, + expectedResolvedFlowName: String, expectedResolvedCatalog: String, expectedResolvedNamespace: Seq[String]) @@ -625,9 +633,10 @@ class SparkDeclarativePipelinesServerSuite DefineFlowTestCase( name = "MV", datasetType = DatasetType.MATERIALIZED_VIEW, - flowName = "mv", - defaultCatalog = "spark_catalog", - defaultDatabase = "default", + flowName = "`mv`", + defaultCatalog = "`spark_catalog`", + defaultDatabase = "`default`", + expectedResolvedFlowName = "mv", expectedResolvedCatalog = "spark_catalog", expectedResolvedNamespace = Seq("default")), DefineFlowTestCase( @@ -636,6 +645,7 @@ class SparkDeclarativePipelinesServerSuite flowName = "tv", defaultCatalog = "spark_catalog", defaultDatabase = "default", + expectedResolvedFlowName = "tv", expectedResolvedCatalog = "", expectedResolvedNamespace = Seq.empty)).map(tc => tc.name -> tc).toMap @@ -646,6 +656,7 @@ class SparkDeclarativePipelinesServerSuite flowName = "mv", defaultCatalog = "custom_catalog", defaultDatabase = "custom_db", + expectedResolvedFlowName = "mv", expectedResolvedCatalog = "custom_catalog", expectedResolvedNamespace = Seq("custom_db")), DefineFlowTestCase( @@ -654,6 +665,7 @@ class SparkDeclarativePipelinesServerSuite flowName = "tv", defaultCatalog = "custom_catalog", defaultDatabase = "custom_db", + expectedResolvedFlowName = "tv", expectedResolvedCatalog = "", expectedResolvedNamespace = Seq.empty)).map(tc => tc.name -> tc).toMap @@ -711,7 +723,7 @@ class SparkDeclarativePipelinesServerSuite assert(identifier.getCatalogName == testCase.expectedResolvedCatalog) assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace) - assert(identifier.getTableName == testCase.flowName) + assert(identifier.getTableName == testCase.expectedResolvedFlowName) } } @@ -775,7 +787,7 @@ class SparkDeclarativePipelinesServerSuite assert(identifier.getCatalogName == testCase.expectedResolvedCatalog) assert(identifier.getNamespaceList.asScala == testCase.expectedResolvedNamespace) - assert(identifier.getTableName == testCase.flowName) + assert(identifier.getTableName == testCase.expectedResolvedFlowName) } } }