diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 66e204fee44b..7f1dc7fc86fc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -188,12 +188,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( protected val timelyFlushEnabled: Boolean = false protected val timelyFlushTimeoutNanos: Long = 0 protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + private val useDaemon = conf.get(PYTHON_USE_DAEMON) private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) protected val faultHandlerEnabled: Boolean = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED) protected val idleTimeoutSeconds: Long = conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS) protected val killOnIdleTimeout: Boolean = conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT) protected val tracebackDumpIntervalSeconds: Long = conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) + protected val killWorkerOnFlushFailure: Boolean = + conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE) protected val hideTraceback: Boolean = false protected val simplifiedTraceback: Boolean = false @@ -294,13 +297,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( if (tracebackDumpIntervalSeconds > 0L) { envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", tracebackDumpIntervalSeconds.toString) } + if (useDaemon && killWorkerOnFlushFailure) { + envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1") + } // allow the user to set the batch size for the BatchedSerializer on UDFs envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString) envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) val (worker: PythonWorker, handle: Option[ProcessHandle]) = env.createPythonWorker( - pythonExec, workerModule, daemonModule, envVars.asScala.toMap) + pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon) // Whether is the worker released into idle pool or closed. When any codes try to release or // close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make // sure there is only one winner that is going to release or close the worker. diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala b/core/src/main/scala/org/apache/spark/internal/config/Python.scala index de95e2fa1f7a..dc16d1ff255d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala @@ -138,4 +138,16 @@ private[spark] object Python { .intConf .checkValue(_ > 0, "If set, the idle worker max size must be > 0.") .createOptional + + val PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE = + ConfigBuilder("spark.python.daemon.killWorkerOnFlushFailure") + .doc("When enabled, exceptions raised during output flush operations in the Python " + + "worker managed under Python daemon are not caught, causing the worker to terminate " + + "with the exception. This allows Spark to detect the failure and launch a new worker " + + "and retry the task. " + + "When disabled, flush exceptions are caught and logged but the worker continues, " + + "which could cause the worker to get stuck due to protocol mismatch.") + .version("4.1.0") + .booleanConf + .createWithDefault(true) } diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index ca33ce2c39ef..e75eca68fd0e 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -24,6 +24,7 @@ import traceback import time import gc +import faulthandler from errno import EINTR, EAGAIN from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT @@ -85,7 +86,19 @@ def worker(sock, authenticated): try: outfile.flush() except Exception: - pass + if os.environ.get("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", False): + faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None) + if faulthandler_log_path: + faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid())) + with open(faulthandler_log_path, "w") as faulthandler_log_file: + faulthandler.dump_traceback(file=faulthandler_log_file) + raise + else: + print( + "PySpark daemon failed to flush the output to the worker process:\n" + + traceback.format_exc(), + file=sys.stderr, + ) return exit_code diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4654dd39cd7a..86a90f23fac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3985,6 +3985,14 @@ object SQLConf { .version("4.1.0") .fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) + val PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE = + buildConf("spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure") + .doc( + s"Same as ${Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE.key} " + + "for Python execution with DataFrame and SQL. It can change during runtime.") + .version("4.1.0") + .fallbackConf(Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE) + val PYTHON_WORKER_LOGGING_ENABLED = buildConf("spark.sql.pyspark.worker.logging.enabled") .doc("When set to true, this configuration enables comprehensive logging within " + @@ -7490,6 +7498,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def pythonUDFWorkerTracebackDumpIntervalSeconds: Long = getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) + def pythonUDFDaemonKillWorkerOnFlushFailure: Boolean = + getConf(PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE) + def pythonWorkerLoggingEnabled: Boolean = getConf(PYTHON_WORKER_LOGGING_ENABLED) def pythonUDFArrowConcurrencyLevel: Option[Int] = getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index b94e00bc11ef..f5f968ee9522 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -60,6 +60,8 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef]( override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure override val errorOnDuplicatedFieldNames: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala index 7b73818bf0ec..1d5df9bad924 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala @@ -83,6 +83,8 @@ class ArrowPythonUDTFRunner( override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure override val errorOnDuplicatedFieldNames: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala index 50013e533819..7f6efbae8881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala @@ -71,6 +71,8 @@ class CoGroupedArrowPythonRunner( override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala index 0f4ac4ddad71..92e99cdc11d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala @@ -58,6 +58,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) extends Logging { val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + val killWorkerOnFlushFailure: Boolean = SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory @@ -98,6 +99,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) extends Logging { if (tracebackDumpIntervalSeconds > 0L) { envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", tracebackDumpIntervalSeconds.toString) } + if (useDaemon && killWorkerOnFlushFailure) { + envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1") + } envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) sessionUUID.foreach { uuid => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index 61f493deeee4..759aa998832d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -59,6 +59,8 @@ abstract class BasePythonUDFRunner( override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure override val bufferSize: Int = SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE) override val batchSizeForPythonUDF: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala index 51d9f6f523a2..14054ba89a94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala @@ -79,6 +79,8 @@ class ApplyInPandasWithStatePythonRunner( override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure private val sqlConf = SQLConf.get diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala index 37716d2d8413..cc7745210a4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala @@ -106,6 +106,8 @@ class PythonForeachWriter(func: PythonFunction, schema: StructType) override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout override val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds + override val killWorkerOnFlushFailure: Boolean = + SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback