Skip to content

[DRAFT] Cluster mode HiveThriftServer2 #51899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,4 @@ in the online documentation for an overview on how to configure Spark.

Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html)
for information on how to get started contributing to the project.

Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
* `NRHiveThriftServer2` thrift server.
*/
object HiveThriftServer2 extends Logging {
object NRHiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab] = None
var listener: HiveThriftServer2Listener = _
var eventManager: HiveThriftServer2EventManager = _
Expand All @@ -52,15 +52,15 @@ object HiveThriftServer2 extends Logging {
* Starts a new thrift server with the given SparkSession.
*
* @param sparkSession SparkSession to use for the server
* @param exitOnError Whether to exit the JVM if HiveThriftServer2 fails to initialize. When true,
* @param exitOnError Whether to exit the JVM if NRHiveThriftServer2 fails to initialize. When true,
* the call logs the error and exits the JVM with exit code -1. When false, the
* call throws an exception instead.
*/
@Since("4.0.0")
@DeveloperApi
def startWithSparkSession(
sparkSession: SparkSession,
exitOnError: Boolean): HiveThriftServer2 = {
exitOnError: Boolean): NRHiveThriftServer2 = {
systemExitOnError.set(exitOnError)

val executionHive = HiveUtils.newClientForExecution(
Expand All @@ -69,11 +69,11 @@ object HiveThriftServer2 extends Logging {

// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(executionHive.conf)
val server = new HiveThriftServer2(sparkSession)
val server = new NRHiveThriftServer2(sparkSession)

server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
logInfo("NRHiveThriftServer2 started")
createListenerAndUI(server, sparkSession.sparkContext)
server
}
Expand All @@ -87,11 +87,11 @@ object HiveThriftServer2 extends Logging {
@deprecated("Use startWithSparkSession instead", since = "4.0.0")
@Since("2.0.0")
@DeveloperApi
def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = {
def startWithContext(sqlContext: SQLContext): NRHiveThriftServer2 = {
startWithSparkSession(sqlContext.sparkSession, exitOnError = true)
}

private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = {
private def createListenerAndUI(server: NRHiveThriftServer2, sc: SparkContext): Unit = {
val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
eventManager = new HiveThriftServer2EventManager(sc)
listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server))
Expand Down Expand Up @@ -126,17 +126,21 @@ object HiveThriftServer2 extends Logging {

try {
startWithContext(SparkSQLEnv.sparkSession.sqlContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// If application was killed before NRHiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
System.exit(-1)
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
logError("Error starting NRHiveThriftServer2", e)
System.exit(-1)
}

while (true) {
Thread.sleep(9999999)
}
}

private[thriftserver] object ExecutionState extends Enumeration {
Expand All @@ -145,7 +149,7 @@ object HiveThriftServer2 extends Logging {
}
}

private[hive] class HiveThriftServer2(sparkSession: SparkSession)
private[hive] class NRHiveThriftServer2(sparkSession: SparkSession)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
Expand Down