Skip to content

Commit ed23cc3

Browse files
ueshinHyukjinKwon
authored andcommitted
[SPARK-54344][PYTHON] Kill the worker if flush fails in daemon.py
### What changes were proposed in this pull request? Kills the worker if flush fails in `daemon.py`. - Spark conf: `spark.python.daemon.killWorkerOnFlushFailure` (default `true`) - SQL conf: `spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure` (fallback to the above) Before it just dies, reuse `faulthandler` feature and record the thread dump and it will appear in the error message if `faulthandler` is enabled. ``` WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 8) (127.0.0.1 executor 1): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Current thread 0x00000001f0796140 (most recent call first): File "/.../python/pyspark/daemon.py", line 95 in worker File "/.../python/pyspark/daemon.py", line 228 in manager File "/.../python/pyspark/daemon.py", line 253 in <module> File "<frozen runpy>", line 88 in _run_code File "<frozen runpy>", line 198 in _run_module_as_main at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:679) ... ``` Even when `faulthandler` is not eabled, the error will appear in the executor's `stderr` file. ``` Traceback (most recent call last): File "/.../python/pyspark/daemon.py", line 228, in manager code = worker(sock, authenticated) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/.../python/pyspark/daemon.py", line 88, in worker raise Exception("test") Exception: test ``` When this is disabled, the behavior is the same as before but with a log. ### Why are the changes needed? Currently an exception caused by `outfile.flush()` failure in `daemon.py` is ignored, but if the last command in `worker_main` is still not flushed, it could cause a UDF stuck in Java waiting for the response from the Python worker. It should just die and let Spark retry the task. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. <details> <summary>Test with the patch to emulate the case</summary> ```patch % git diff diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 54c9507..e107216d769 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py -84,6 +84,8 def worker(sock, authenticated): exit_code = compute_real_exit_code(exc.code) finally: try: + if worker_main.__globals__.get("TEST", False): + raise Exception("test") outfile.flush() except Exception: faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 6e34b04..ff210f4fd97 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py -3413,7 +3413,14 def main(infile, outfile): # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: - write_int(SpecialLengths.END_OF_STREAM, outfile) + import random + + if random.random() < 0.1: + # emulate the last command is not flushed yet + global TEST + TEST = True + else: + write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) -3423,6 +3430,9 def main(infile, outfile): faulthandler.cancel_dump_traceback_later() +TEST = False + + if __name__ == "__main__": # Read information about how to connect back to the JVM from the environment. conn_info = os.environ.get( ``` </details> With just `pass` (before this), it gets stuck, and after this it lets Spark retry the task. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53055 from ueshin/issues/SPARK-54344/daemon_flush. Lead-authored-by: Takuya Ueshin <ueshin@databricks.com> Co-authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 2115023 commit ed23cc3

File tree

11 files changed

+60
-2
lines changed

11 files changed

+60
-2
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
188188
protected val timelyFlushEnabled: Boolean = false
189189
protected val timelyFlushTimeoutNanos: Long = 0
190190
protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
191+
private val useDaemon = conf.get(PYTHON_USE_DAEMON)
191192
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
192193
protected val faultHandlerEnabled: Boolean = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
193194
protected val idleTimeoutSeconds: Long = conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
194195
protected val killOnIdleTimeout: Boolean = conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
195196
protected val tracebackDumpIntervalSeconds: Long =
196197
conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
198+
protected val killWorkerOnFlushFailure: Boolean =
199+
conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
197200
protected val hideTraceback: Boolean = false
198201
protected val simplifiedTraceback: Boolean = false
199202

@@ -294,13 +297,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
294297
if (tracebackDumpIntervalSeconds > 0L) {
295298
envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", tracebackDumpIntervalSeconds.toString)
296299
}
300+
if (useDaemon && killWorkerOnFlushFailure) {
301+
envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
302+
}
297303
// allow the user to set the batch size for the BatchedSerializer on UDFs
298304
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
299305

300306
envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default"))
301307

302308
val (worker: PythonWorker, handle: Option[ProcessHandle]) = env.createPythonWorker(
303-
pythonExec, workerModule, daemonModule, envVars.asScala.toMap)
309+
pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon)
304310
// Whether is the worker released into idle pool or closed. When any codes try to release or
305311
// close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make
306312
// sure there is only one winner that is going to release or close the worker.

core/src/main/scala/org/apache/spark/internal/config/Python.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,16 @@ private[spark] object Python {
138138
.intConf
139139
.checkValue(_ > 0, "If set, the idle worker max size must be > 0.")
140140
.createOptional
141+
142+
val PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
143+
ConfigBuilder("spark.python.daemon.killWorkerOnFlushFailure")
144+
.doc("When enabled, exceptions raised during output flush operations in the Python " +
145+
"worker managed under Python daemon are not caught, causing the worker to terminate " +
146+
"with the exception. This allows Spark to detect the failure and launch a new worker " +
147+
"and retry the task. " +
148+
"When disabled, flush exceptions are caught and logged but the worker continues, " +
149+
"which could cause the worker to get stuck due to protocol mismatch.")
150+
.version("4.1.0")
151+
.booleanConf
152+
.createWithDefault(true)
141153
}

python/pyspark/daemon.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import traceback
2525
import time
2626
import gc
27+
import faulthandler
2728
from errno import EINTR, EAGAIN
2829
from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
2930
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
@@ -85,7 +86,19 @@ def worker(sock, authenticated):
8586
try:
8687
outfile.flush()
8788
except Exception:
88-
pass
89+
if os.environ.get("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", False):
90+
faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
91+
if faulthandler_log_path:
92+
faulthandler_log_path = os.path.join(faulthandler_log_path, str(os.getpid()))
93+
with open(faulthandler_log_path, "w") as faulthandler_log_file:
94+
faulthandler.dump_traceback(file=faulthandler_log_file)
95+
raise
96+
else:
97+
print(
98+
"PySpark daemon failed to flush the output to the worker process:\n"
99+
+ traceback.format_exc(),
100+
file=sys.stderr,
101+
)
89102
return exit_code
90103

91104

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3985,6 +3985,14 @@ object SQLConf {
39853985
.version("4.1.0")
39863986
.fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
39873987

3988+
val PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
3989+
buildConf("spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure")
3990+
.doc(
3991+
s"Same as ${Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE.key} " +
3992+
"for Python execution with DataFrame and SQL. It can change during runtime.")
3993+
.version("4.1.0")
3994+
.fallbackConf(Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
3995+
39883996
val PYTHON_WORKER_LOGGING_ENABLED =
39893997
buildConf("spark.sql.pyspark.worker.logging.enabled")
39903998
.doc("When set to true, this configuration enables comprehensive logging within " +
@@ -7510,6 +7518,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
75107518
def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
75117519
getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
75127520

7521+
def pythonUDFDaemonKillWorkerOnFlushFailure: Boolean =
7522+
getConf(PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
7523+
75137524
def pythonWorkerLoggingEnabled: Boolean = getConf(PYTHON_WORKER_LOGGING_ENABLED)
75147525

75157526
def pythonUDFArrowConcurrencyLevel: Option[Int] = getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
6060
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
6161
override val tracebackDumpIntervalSeconds: Long =
6262
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
63+
override val killWorkerOnFlushFailure: Boolean =
64+
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
6365

6466
override val errorOnDuplicatedFieldNames: Boolean = true
6567

sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class ArrowPythonUDTFRunner(
8383
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
8484
override val tracebackDumpIntervalSeconds: Long =
8585
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
86+
override val killWorkerOnFlushFailure: Boolean =
87+
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
8688

8789
override val errorOnDuplicatedFieldNames: Boolean = true
8890

sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ class CoGroupedArrowPythonRunner(
7171
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
7272
override val tracebackDumpIntervalSeconds: Long =
7373
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
74+
override val killWorkerOnFlushFailure: Boolean =
75+
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
7476

7577
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
7678
override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) extends Logging {
5858
val idleTimeoutSeconds: Long = SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
5959
val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
6060
val tracebackDumpIntervalSeconds: Long = SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
61+
val killWorkerOnFlushFailure: Boolean = SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
6162
val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
6263
val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
6364
val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -98,6 +99,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) extends Logging {
9899
if (tracebackDumpIntervalSeconds > 0L) {
99100
envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", tracebackDumpIntervalSeconds.toString)
100101
}
102+
if (useDaemon && killWorkerOnFlushFailure) {
103+
envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
104+
}
101105

102106
envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default"))
103107
sessionUUID.foreach { uuid =>

sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ abstract class BasePythonUDFRunner(
5959
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
6060
override val tracebackDumpIntervalSeconds: Long =
6161
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
62+
override val killWorkerOnFlushFailure: Boolean =
63+
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
6264

6365
override val bufferSize: Int = SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
6466
override val batchSizeForPythonUDF: Int =

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class ApplyInPandasWithStatePythonRunner(
7979
override val killOnIdleTimeout: Boolean = SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
8080
override val tracebackDumpIntervalSeconds: Long =
8181
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
82+
override val killWorkerOnFlushFailure: Boolean =
83+
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
8284

8385
private val sqlConf = SQLConf.get
8486

0 commit comments

Comments
 (0)