Skip to content

Commit 1aa8d5a

Browse files
committed
[SPARK-54375][CONNECT][TESTS][FOLLOWUP] Make PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable
### What changes were proposed in this pull request? This pr aims to make the test cases in `PythonPipelineSuite` perform a default check for `PythonTestDepsChecker.isConnectDepsAvailable`. ### Why are the changes needed? Simplify the dependency checks for Python modules in test cases within `PythonPipelineSuite`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #53106 from LuciferYang/refactor-PythonPipelineSuite. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent dff0620 commit 1aa8d5a

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import java.util.concurrent.TimeUnit
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.util.Try
2828

29+
import org.scalactic.source.Position
30+
import org.scalatest.Tag
31+
2932
import org.apache.spark.api.python.PythonUtils
3033
import org.apache.spark.sql.AnalysisException
3134
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -49,7 +52,6 @@ class PythonPipelineSuite
4952
with EventVerificationTestHelpers {
5053

5154
def buildGraph(pythonText: String): DataflowGraph = {
52-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
5355
val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n")
5456
// create a unique identifier to allow identifying the session and dataflow graph
5557
val customSessionIdentifier = UUID.randomUUID().toString
@@ -530,7 +532,6 @@ class PythonPipelineSuite
530532
"eager analysis or execution will fail")(
531533
Seq("""spark.sql("SELECT * FROM src")""", """spark.read.table("src").collect()""")) {
532534
command =>
533-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
534535
val ex = intercept[RuntimeException] {
535536
buildGraph(s"""
536537
|@dp.materialized_view
@@ -549,7 +550,6 @@ class PythonPipelineSuite
549550
}
550551

551552
test("create dataset with the same name will fail") {
552-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
553553
val ex = intercept[AnalysisException] {
554554
buildGraph(s"""
555555
|@dp.materialized_view
@@ -623,7 +623,6 @@ class PythonPipelineSuite
623623
}
624624

625625
test("create datasets with three part names") {
626-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
627626
val graphTry = Try {
628627
buildGraph(s"""
629628
|@dp.table(name = "some_catalog.some_schema.mv")
@@ -676,7 +675,6 @@ class PythonPipelineSuite
676675
}
677676

678677
test("create named flow with multipart name will fail") {
679-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
680678
val ex = intercept[RuntimeException] {
681679
buildGraph(s"""
682680
|@dp.table
@@ -825,7 +823,6 @@ class PythonPipelineSuite
825823
}
826824

827825
test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") {
828-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
829826
checkError(
830827
exception = intercept[AnalysisException] {
831828
buildGraph(s"""
@@ -837,7 +834,6 @@ class PythonPipelineSuite
837834
}
838835

839836
test("create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception") {
840-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
841837
checkError(
842838
exception = intercept[AnalysisException] {
843839
buildGraph(s"""
@@ -851,7 +847,6 @@ class PythonPipelineSuite
851847
}
852848

853849
test("create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception") {
854-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
855850
checkError(
856851
exception = intercept[AnalysisException] {
857852
buildGraph(s"""
@@ -1048,7 +1043,6 @@ class PythonPipelineSuite
10481043

10491044
gridTest("Unsupported SQL command outside query function should result in a failure")(
10501045
unsupportedSqlCommandList) { unsupportedSqlCommand =>
1051-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
10521046
val ex = intercept[RuntimeException] {
10531047
buildGraph(s"""
10541048
|spark.sql("$unsupportedSqlCommand")
@@ -1063,7 +1057,6 @@ class PythonPipelineSuite
10631057

10641058
gridTest("Unsupported SQL command inside query function should result in a failure")(
10651059
unsupportedSqlCommandList) { unsupportedSqlCommand =>
1066-
assume(PythonTestDepsChecker.isConnectDepsAvailable)
10671060
val ex = intercept[RuntimeException] {
10681061
buildGraph(s"""
10691062
|@dp.materialized_view()
@@ -1111,4 +1104,13 @@ class PythonPipelineSuite
11111104
| return spark.range(5)
11121105
|""".stripMargin)
11131106
}
1107+
1108+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
1109+
pos: Position): Unit = {
1110+
if (PythonTestDepsChecker.isConnectDepsAvailable) {
1111+
super.test(testName, testTags: _*)(testFun)
1112+
} else {
1113+
super.ignore(testName, testTags: _*)(testFun)
1114+
}
1115+
}
11141116
}

0 commit comments

Comments
 (0)