Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e3e79e5
[SPARK-50123][TESTS] Move BitmapExpressionUtilsSuite & ExpressionImpl…
yaooqinn Oct 25, 2024
2f5e0fa
[SPARK-50150][BUILD][3.5] Upgrade Jetty to 9.4.56.v20240826
dongjoon-hyun Oct 29, 2024
175b5e9
[MINOR][BUILD] Skip `deepspeed` in requirements on MacOS
zhengruifeng Aug 10, 2023
4205b79
[SPARK-50155][3.5] Move scala and java files to their default folders
panbingkun Oct 30, 2024
0229c0e
[SPARK-50176][CONNECT][3.5] Disallow reattaching after the session is…
Nov 1, 2024
6df1966
[SPARK-50195][CORE] Fix `StandaloneRestServer` to propagate `spark.ap…
dongjoon-hyun Nov 1, 2024
9d47266
[SPARK-50199][PYTHON][TESTS] Use Spark 3.4.4 instead of 3.0.1 in `tes…
dongjoon-hyun Nov 1, 2024
08023c0
[SPARK-50176][CONNECT][FOLLOWUP][3.5] Fix ReattachableExecuteSuite fa…
Nov 4, 2024
1ddf4a9
[MINOR][DOCS][3.5] Fix specified java versions in `install.rst`
dvorst Nov 4, 2024
9b172de
[SPARK-50210][CORE] Fix `SparkSubmit` to show REST API `kill` respons…
dongjoon-hyun Nov 3, 2024
acccf53
[SPARK-50212][BUILD][3.5] Fix the conditional check for executing the…
LuciferYang Nov 5, 2024
d39f5ab
[SPARK-50235][SQL] Clean up ColumnVector resource after processing al…
viirya Nov 6, 2024
8da6987
[SPARK-50300][BUILD] Use mirror host instead of `archive.apache.org`
dongjoon-hyun Nov 13, 2024
8502a22
[SPARK-50304][INFRA] Remove `(any|empty).proto` from RAT exclusion
dongjoon-hyun Nov 14, 2024
e0bdfee
[SPARK-50316][BUILD][3.5] Upgrade ORC to 1.9.5
dongjoon-hyun Nov 14, 2024
242d333
[SPARK-50312][SQL] SparkThriftServer createServer parameter passing e…
CuiYanxiang Nov 15, 2024
08b195c
[MINOR][DOCS] Fix a HTML/Markdown syntax error in sql-migration-guide.md
yaooqinn Nov 20, 2024
df9b9de
[SPARK-50258][SQL] Fix output column order changed issue after AQE op…
wangyum Nov 20, 2024
5ff129a
[SPARK-50430][CORE] Use the standard Properties.clone instead of manu…
HyukjinKwon Nov 27, 2024
5e51e2c
[SPARK-49294][UI] Add width attribute for shuffle-write-time checkbox
xunxunmimi5577 Dec 2, 2024
1d6f7ad
[SPARK-50433][DOCS][TESTS][3.5] Fix configuring log4j2 guide docs for…
pan3793 Dec 3, 2024
5dc927b
[SPARK-50487][DOCS] Update broken jira link
huangxiaopingRD Dec 4, 2024
acedb15
[SPARK-50498][PYTHON] Avoid unnecessary py4j call in `listFunctions`
zhengruifeng Dec 5, 2024
86e29e9
[SPARK-50505][DOCS] Fix `spark.storage.replication.proactive` default…
dongjoon-hyun Dec 5, 2024
d01f34f
[SPARK-50492][SS] Fix java.util.NoSuchElementException when event tim…
liviazhu Dec 6, 2024
153cd9e
[SPARK-50492][SS][FOLLOWUP][3.5] Change `def references` to `lazy val…
LuciferYang Dec 6, 2024
bf29ab9
[SPARK-50421][CORE][3.5] Fix executor related memory config incorrect…
zjuwangg Dec 6, 2024
d8f3afa
[SPARK-50514][DOCS] Add `IDENTIFIER clause` page to `menu-sql.yaml`
dongjoon-hyun Dec 7, 2024
305d2a0
[SPARK-49695][SQL][3.5] Postgres fix xor push-down
andrej-db Dec 7, 2024
a57f3c2
[SPARK-50483][CORE][SQL][3.5] BlockMissingException should be thrown …
wangyum Dec 8, 2024
929a19f
Preparing Spark release v3.5.4-rc1
LuciferYang Dec 9, 2024
8e6507a
Preparing development version 3.5.5-SNAPSHOT
LuciferYang Dec 9, 2024
deabe49
[SPARK-49134][INFRA][3.5] Support retry for deploying artifacts to Ne…
yaooqinn Dec 9, 2024
a3cf28e
[SPARK-50463][SQL][3.5] Fix `ConstantColumnVector` with Columnar to R…
richardc-db Dec 10, 2024
bb953f9
[SPARK-50510][CONNECT][3.5] Fix sporadic ReattachableExecuteSuite fai…
Dec 11, 2024
e97580a
[SPARK-50087][SQL][3.5] Robust handling of boolean expressions in CAS…
andrej-db Dec 12, 2024
92e650c
[SPARK-50545][CORE][SQL][3.5] `AccessControlException` should be thro…
pan3793 Dec 12, 2024
91af6f9
Preparing Spark release v3.5.4-rc2
LuciferYang Dec 16, 2024
a764524
Preparing development version 3.5.5-SNAPSHOT
LuciferYang Dec 16, 2024
8168ea8
[SPARK-50430][CORE][FOLLOW-UP] Keep the logic of manual putting key a…
HyukjinKwon Dec 3, 2024
0fbe292
Revert "[SPARK-50430][CORE][FOLLOW-UP] Keep the logic of manual putti…
HyukjinKwon Dec 16, 2024
f7c48fe
Revert "[SPARK-50430][CORE] Use the standard Properties.clone instead…
HyukjinKwon Dec 16, 2024
b0a7d4d
[SPARK-50587][INFRA][3.5] Remove unsupported `curl` option `--retry-a…
LuciferYang Dec 16, 2024
a6f220d
Preparing Spark release v3.5.4-rc3
LuciferYang Dec 17, 2024
bcaa5a9
Preparing development version 3.5.5-SNAPSHOT
LuciferYang Dec 17, 2024
45349b6
[SPARK-50510][CONNECT][TEST][3.5] Fix flaky ReattachableExecuteSuite
Dec 17, 2024
a3d23fd
[MINOR][SS] Minor update to watermark propagation comments
neilramaswamy Dec 18, 2024
5a91172
[SPARK-50483][SPARK-50545][DOC][FOLLOWUP][3.5] Mention behavior chang…
pan3793 Dec 23, 2024
fc601e2
Merge remote-tracking branch 'spark/branch-3.5' into spark-3.5
ejblanco Jan 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
tpcds=false
docker=false
fi
build=`./dev/is-changed.py -m "core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive"`
build=`./dev/is-changed.py -m "core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive,connect,protobuf,api"`
precondition="
{
\"build\": \"$build\",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 3.5.3
Version: 3.5.4
Title: R Front End for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R:
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ install_app() {
local binary="${_DIR}/$6"
local remote_tarball="${mirror_host}/${url_path}${url_query}"
local local_checksum="${local_tarball}.${checksum_suffix}"
local remote_checksum="https://archive.apache.org/dist/${url_path}.${checksum_suffix}"
local remote_checksum="${mirror_host}/${url_path}.${checksum_suffix}${url_query}"

local curl_opts="--silent --show-error -L"
local wget_opts="--no-verbose"
Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion connector/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hdfs.BlockMissingException
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
Expand Down Expand Up @@ -140,6 +141,8 @@ private[sql] object AvroUtils extends Logging {
try {
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
} catch {
case e: BlockMissingException =>
throw new SparkException(s"Could not read file: $path", e)
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
Expand Down
2 changes: 1 addition & 1 deletion connector/connect/client/jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object IntegrationTestUtils {

// Redirect server log into console
"--conf",
s"spark.driver.extraJavaOptions=-Dlog4j.configuration=$log4j2")
s"spark.driver.extraJavaOptions=-Dlog4j.configurationFile=$log4j2")
} else Seq.empty
}

Expand Down
2 changes: 1 addition & 1 deletion connector/connect/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion connector/connect/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
Expand Down Expand Up @@ -237,7 +238,14 @@ private[connect] class ExecuteHolder(
// it does.
responseObserver.removeAll()
// post closed to UI
eventsManager.postClosed()
try {
eventsManager.postClosed()
} catch {
// Catching the exception to prevent the wrong error code from being returned to the
// user: SPARK-49688. The issue was fixed by completely refactoring the code in Spark 4.0.
case e: Throwable if NonFatal.apply(e) =>
logError(s"Error posting closed event to UI: ${e.getMessage()}")
}
}
// interrupt any attached grpcResponseSenders
grpcResponseSenders.foreach(_.interrupt())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class SparkConnectReattachExecuteHandler(
extends Logging {

def handle(v: proto.ReattachExecuteRequest): Unit = {
// An exception will be raised if the session is not available.
val sessionHolder =
SparkConnectService.getIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
assert(sessionHolder != null)

val executeHolder = SparkConnectService.executionManager
.getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId))
.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ object SparkConnectService extends Logging {
userSessionMapping.invalidateAll()
}

/**
* Used for testing
*/
private[connect] def invalidateSession(userId: String, sessionId: String): Unit = {
userSessionMapping.invalidate((userId, sessionId))
}

/**
* Used for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ trait SparkConnectServerTest extends SharedSparkSession {
withSparkEnvConfs((Connect.CONNECT_GRPC_BINDING_PORT.key, serverPort.toString)) {
SparkConnectService.start(spark.sparkContext)
}
// register udf directly on the server, we're not testing client UDFs here...
val serverSession =
SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session
serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms }))
}

override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
}
}

test("reattach after connection expired") {
withClient { client =>
withRawBlockingStub { stub =>
// emulate session expiration
SparkConnectService.invalidateSession(defaultUserId, defaultSessionId)

// session closed, bound to fail immediately
val operationId = UUID.randomUUID().toString
val iter = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
val e = intercept[StatusRuntimeException] {
iter.next()
}
assert(e.getMessage.contains("INVALID_HANDLE.SESSION_NOT_FOUND"))
}
}
}

test("raw interrupted RPC results in INVALID_CURSOR.DISCONNECTED error") {
withRawBlockingStub { stub =>
val iter = stub.executePlan(buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY)))
Expand Down Expand Up @@ -347,6 +364,10 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
}

test("long sleeping query") {
// register udf directly on the server, we're not testing client UDFs here...
val serverSession =
SparkConnectService.getOrCreateIsolatedSession(defaultUserId, defaultSessionId).session
serverSession.udf.register("sleep", ((ms: Int) => { Thread.sleep(ms); ms }))
// query will be sleeping and not returning results, while having multiple reattach
withSparkEnvConfs(
(Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key, "1s")) {
Expand Down
2 changes: 1 addition & 1 deletion connector/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.5.3</version>
<version>3.5.4</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import java.sql.Connection
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, SparkSQLFeatureNotSupportedException}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.MsSQLServerDatabaseOnDocker
import org.apache.spark.sql.types._
Expand All @@ -39,6 +43,17 @@ import org.apache.spark.tags.DockerTest
@DockerTest
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {

def getExternalEngineQuery(executedPlan: SparkPlan): String = {
getExternalEngineRdd(executedPlan).asInstanceOf[JDBCRDD].getExternalEngineQuery
}

def getExternalEngineRdd(executedPlan: SparkPlan): RDD[InternalRow] = {
val queryNode = executedPlan.collect { case r: RowDataSourceScanExec =>
r
}.head
queryNode.rdd
}

override def excluded: Seq[String] = Seq(
"simple scan with OFFSET",
"simple scan with LIMIT and OFFSET",
Expand Down Expand Up @@ -137,4 +152,68 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
"WHERE (dept > 1 AND ((name LIKE 'am%') = (name LIKE '%y')))")
assert(df3.collect().length == 3)
}

test("SPARK-50087: SqlServer handle booleans in CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN name = 'Legolas' THEN name = 'Elf' ELSE NOT (name = 'Wizard') END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF(("name" = 'Elf'), 1, 0) ELSE IIF(("name" <> 'Wizard'), 1, 0) END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle booleans in CASE WHEN with always true test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN (name = 'Elf') ELSE (1=1) END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF(("name" = 'Elf'), 1, 0) ELSE 1 END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle booleans in nested CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN
| CASE WHEN (name = 'Elf') THEN (name = 'Elrond') ELSE (name = 'Gandalf') END
| ELSE (name = 'Sauron') END
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE (CASE WHEN ("name" = 'Legolas') THEN IIF((CASE WHEN ("name" = 'Elf') THEN IIF(("name" = 'Elrond'), 1, 0) ELSE IIF(("name" = 'Gandalf'), 1, 0) END = 1), 1, 0) ELSE IIF(("name" = 'Sauron'), 1, 0) END = 1) """
)
// scalastyle:on
df.collect()
}

test("SPARK-50087: SqlServer handle non-booleans in nested CASE WHEN test") {
val df = sql(
s"""|SELECT * FROM $catalogName.employee
|WHERE CASE WHEN (name = 'Legolas') THEN
| CASE WHEN (name = 'Elf') THEN 'Elf' ELSE 'Wizard' END
| ELSE 'Sauron' END = name
|""".stripMargin
)

// scalastyle:off
assert(getExternalEngineQuery(df.queryExecution.executedPlan) ==
"""SELECT "dept","name","salary","bonus" FROM "employee" WHERE ("name" IS NOT NULL) AND ((CASE WHEN "name" = 'Legolas' THEN CASE WHEN "name" = 'Elf' THEN 'Elf' ELSE 'Wizard' END ELSE 'Sauron' END) = "name") """
)
// scalastyle:on
df.collect()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.sql.Connection
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.FilterExec
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DatabaseOnDocker
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -123,4 +124,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
)
}
}

test("SPARK-49695: Postgres fix xor push-down") {
val df = spark.sql(s"select dept, name from $catalogName.employee where dept ^ 6 = 0")
val rows = df.collect()
assert(!df.queryExecution.sparkPlan.exists(_.isInstanceOf[FilterExec]))
assert(rows.length == 1)
assert(rows(0).getInt(0) === 6)
assert(rows(0).getString(1) === "jen")
}
}
Loading
Loading