Skip to content

Commit 5fb072e

Browse files
gaogaotiantianzhengruifeng
authored andcommitted
[SPARK-54285][PYTHON] Cache timezone info to avoid expensive timestamp conversion
### What changes were proposed in this pull request? We cache the tzinfo on local machine for timestamp conversion to avoid extra latency for calling `datetime.datetime.fromtimestamp()` ### Why are the changes needed? In Python, a forked process on Unix (that uses glibc I believe) will have a bad lock/cache state for timezone, which result in a extremely slow `datetime.datetime.from_timestamp()` (2000x slower on my machine). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? It was tested locally by hand to confirm the timestamp result is the same and the performance is normal. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52980 from gaogaotiantian/fix-timestamp-convert. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
1 parent d823ccf commit 5fb072e

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

python/pyspark/sql/types.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,12 @@ def __repr__(self) -> str:
441441
class TimestampType(DatetimeType, metaclass=DataTypeSingleton):
442442
"""Timestamp (datetime.datetime) data type."""
443443

444+
# We need to cache the timezone info for datetime.datetime.fromtimestamp
445+
# otherwise the forked process will be extremely slow to convert the timestamp.
446+
# This is probably a glibc issue - the forked process will have a bad cache/lock
447+
# status for the timezone info.
448+
tz_info = None
449+
444450
def needConversion(self) -> bool:
445451
return True
446452

@@ -454,7 +460,12 @@ def toInternal(self, dt: datetime.datetime) -> int:
454460
def fromInternal(self, ts: int) -> datetime.datetime:
455461
if ts is not None:
456462
# using int to avoid precision loss in float
457-
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)
463+
# If TimestampType.tz_info is not None, we need to use it to convert the timestamp.
464+
# Otherwise, we need to use the default timezone.
465+
# We need to replace the tzinfo to None to keep backward compatibility
466+
return datetime.datetime.fromtimestamp(ts // 1000000, self.tz_info).replace(
467+
microsecond=ts % 1000000, tzinfo=None
468+
)
458469

459470

460471
class TimestampNTZType(DatetimeType, metaclass=DataTypeSingleton):

python/pyspark/worker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""
1919
Worker that receives input from Piped RDD.
2020
"""
21+
import datetime
2122
import itertools
2223
import os
2324
import sys
@@ -71,7 +72,7 @@
7172
ArrowStreamUDTFSerializer,
7273
ArrowStreamArrowUDTFSerializer,
7374
)
74-
from pyspark.sql.pandas.types import to_arrow_type
75+
from pyspark.sql.pandas.types import to_arrow_type, TimestampType
7576
from pyspark.sql.types import (
7677
ArrayType,
7778
BinaryType,
@@ -3302,6 +3303,11 @@ def main(infile, outfile):
33023303
if split_index == -1: # for unit tests
33033304
sys.exit(-1)
33043305
start_faulthandler_periodic_traceback()
3306+
3307+
# Use the local timezone to convert the timestamp
3308+
tz = datetime.datetime.now().astimezone().tzinfo
3309+
TimestampType.tz_info = tz
3310+
33053311
check_python_version(infile)
33063312

33073313
# read inputs only for a barrier task

0 commit comments

Comments
 (0)