Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c0431b4
docs(spec): lightweight event log runtime detector (issue #2082)
sayedbilalbari Apr 23, 2026
2877e6f
docs(spec): address review findings on event log runtime detector spec
sayedbilalbari Apr 23, 2026
3eb4564
docs(spec): reframe event log detector as best-effort early routing
sayedbilalbari Apr 23, 2026
df8b53b
docs(spec): tighten decision rule and Databricks-rolling scan shape
sayedbilalbari Apr 23, 2026
68889da
docs(spec): pin memory contract and frame max_events_scanned as cost cap
sayedbilalbari Apr 23, 2026
327f94b
docs(plan): event log runtime detector implementation plan
sayedbilalbari Apr 23, 2026
c5f445a
build(user_tools): add zstandard dep for event log detector
sayedbilalbari Apr 23, 2026
be1dedc
feat(eventlog_detector): package skeleton and public re-exports
sayedbilalbari Apr 23, 2026
997ef43
feat(eventlog_detector): routing types and exception hierarchy
sayedbilalbari Apr 23, 2026
e18661e
feat(eventlog_detector): Scala-pinned markers module
sayedbilalbari Apr 23, 2026
f40ad36
feat(eventlog_detector): classifier mirroring Scala priority
sayedbilalbari Apr 23, 2026
265f576
feat(eventlog_detector): context-managed codec-aware line streamer
sayedbilalbari Apr 23, 2026
8c4449d
feat(eventlog_detector): single-file and Databricks rolling-dir resolver
sayedbilalbari Apr 23, 2026
5d49e10
feat(eventlog_detector): bounded streaming scanner across files
sayedbilalbari Apr 23, 2026
60dc011
feat(eventlog_detector): top-level detect_spark_runtime entry point
sayedbilalbari Apr 23, 2026
5f3270e
test(eventlog_detector): anchor parity tests on Scala fixtures
sayedbilalbari Apr 23, 2026
8c32cf6
chore(eventlog_detector): resolve lint findings from full-suite run
sayedbilalbari Apr 23, 2026
8906da0
docs(spec): record realized fixture inventory for event log detector
sayedbilalbari Apr 23, 2026
351d9f4
chore(eventlog_detector): fix E302 blank-line lint in stream.py
sayedbilalbari Apr 23, 2026
6c81f37
fix(eventlog_detector): narrow _parse_bool to Scala toBoolean semantics
sayedbilalbari Apr 23, 2026
b2486a3
fix(eventlog_detector): whitelist supported codec suffixes
sayedbilalbari Apr 23, 2026
d8be464
fix(eventlog_detector): track last-scanned file for event_log_path
sayedbilalbari Apr 23, 2026
3bebb11
docs: remove local implementation plan from the PR
sayedbilalbari Apr 23, 2026
3871a14
docs: remove local design spec from the PR
sayedbilalbari Apr 23, 2026
4c1de13
docs(eventlog_detector): trim docstrings and drop internal references
sayedbilalbari Apr 23, 2026
be0ac3d
fix(eventlog_detector): address review findings
sayedbilalbari Apr 23, 2026
3b140f1
feat: streamline event log runtime detection
sayedbilalbari Apr 24, 2026
e635db8
test: address event log detector review feedback
sayedbilalbari Apr 28, 2026
11ae2ee
test: trim event log detector coverage overlap
sayedbilalbari Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions user_tools/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ dependencies = [
"scikit-learn==1.7.0",
# used for retrieving available memory on the host
"psutil==7.0.0",
# used to read zstd-compressed spark event logs
"zstandard==0.25.0",
# pyspark for distributed computing
"pyspark>=3.5.7,<4.0.0",
# Jproperties used to handle Java properties file (added for the Tools API)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Event Log Runtime Detector

This package provides a lightweight Python detector for deciding which full
tools flow should handle a single Spark application event log.

The detector is an early routing check. It scans a bounded prefix of an event
log, stops as soon as it has enough information, and returns one of:

- `PROFILING`: a RAPIDS runtime signal was found.
- `QUALIFICATION`: startup properties indicate standard OSS Spark with no
RAPIDS markers.
- `UNKNOWN`: the scan did not reach enough information within the event budget.

## Detection Flow

1. `resolver.py` resolves the input into ordered event-log files.
Supported inputs are a single event-log file or an Apache Spark rolling
event-log directory using the `eventlog_v2_*` / `events_*` layout.
2. `stream.py` opens each file through `CspPath.open_input_stream()` and yields
one decoded event-log line at a time. The full log is not loaded into memory.
3. `scanner.py` parses events until a decision is available or the
`max_events_scanned` budget is reached.
4. `classifier.py` classifies the accumulated Spark properties as `SPARK` or
`SPARK_RAPIDS`.
5. `detector.py` maps the scan result to `ToolExecution`.

## RAPIDS Detection

RAPIDS logs are detected from either of these signals:

- `SparkRapidsBuildInfoEvent`, emitted by RAPIDS plugin event logs.
- Spark properties showing `spark.plugins` contains
`com.nvidia.spark.SQLPlugin` and `spark.rapids.sql.enabled` is not `false`.

The `spark.rapids.sql.enabled` parse matches the Scala tools behavior:
missing or unparseable values default to `true`.

## CPU Fast Path

When `SparkListenerEnvironmentUpdate` is reached and startup Spark properties
contain no RAPIDS-related configuration, the detector can return
`QUALIFICATION` immediately. This applies to both single-file and OSS rolling
event logs.

If RAPIDS-related configuration is present but not decisive, the scanner keeps
reading within the configured event budget. This avoids treating a log as CPU
when later `modifiedConfigs` may make the RAPIDS configuration active.

## Streaming And Memory

The detector streams one line at a time. Memory is bounded to:

- a small set of runtime metadata fields,
- the accumulated Spark properties map,
- the current decoded event record.

It does not retain raw events or read entire event-log files into memory.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Lightweight event log runtime detector.

Scans a bounded prefix of a Spark event log and returns a tool execution
decision (``QUALIFICATION`` / ``PROFILING`` / ``UNKNOWN``) plus best-effort
runtime metadata, without invoking the full tools pipeline.

Public entry point: :func:`detect_spark_runtime`.
"""

from .detector import detect_spark_runtime
from .types import (
DetectionResult,
EventLogDetectionError,
EventLogReadError,
SparkRuntime,
ToolExecution,
UnsupportedCompressionError,
UnsupportedInputError,
)

__all__ = [
"DetectionResult",
"EventLogDetectionError",
"EventLogReadError",
"SparkRuntime",
"ToolExecution",
"UnsupportedCompressionError",
"UnsupportedInputError",
"detect_spark_runtime",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright (c) 2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classify Spark runtime from accumulated Spark properties.

The scanner extracts Spark properties from event-log records and passes the
merged map to this module. This module only answers whether those properties
represent standard Spark or a RAPIDS-enabled application.
"""

from typing import Mapping

from spark_rapids_tools.tools.eventlog_detector import markers as m
from spark_rapids_tools.tools.eventlog_detector.types import SparkRuntime


def _parse_bool(raw: str, default: bool) -> bool:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is copied from how the Scala implementation in tools is

"""Parse Spark boolean strings with Scala-compatible fallback behavior.

Scala's ``String.toBoolean`` accepts only ``true`` and ``false``. The
Scala tools wrap that parse in ``Try(...).getOrElse(default)``, so values
such as ``yes``, ``1``, or an empty string must return ``default`` rather
than using Python truthiness.
"""
stripped = raw.strip().lower()
if stripped == "true":
return True
if stripped == "false":
return False
return default


def _is_spark_rapids(props: Mapping[str, str]) -> bool:
"""Return true when Spark properties show the RAPIDS SQL plugin is active."""
plugins = props.get(m.GPU_PLUGIN_KEY, "")
if m.GPU_PLUGIN_CLASS_SUBSTRING not in plugins:
return False
raw = props.get(m.GPU_ENABLED_KEY)
if raw is None:
return m.GPU_ENABLED_DEFAULT
return _parse_bool(raw, default=m.GPU_ENABLED_DEFAULT)


def has_rapids_conf_markers(props: Mapping[str, str]) -> bool:
"""Return true when properties contain any RAPIDS-related configuration.

This is intentionally broader than ``_is_spark_rapids``. A disabled or
incomplete RAPIDS configuration is not classified as RAPIDS, but its
presence should prevent early CPU routing because later events may update
the effective configuration.
"""
if m.GPU_PLUGIN_CLASS_SUBSTRING in props.get(m.GPU_PLUGIN_KEY, ""):
return True
return m.GPU_ENABLED_KEY in props


def classify_runtime(props: Mapping[str, str]) -> SparkRuntime:
"""Classify accumulated Spark properties into the supported runtime enum."""
if _is_spark_rapids(props):
return SparkRuntime.SPARK_RAPIDS
return SparkRuntime.SPARK
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) 2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Top-level event log runtime detector."""

from typing import Optional, Union

from spark_rapids_tools.storagelib import CspPath
from spark_rapids_tools.tools.eventlog_detector.classifier import classify_runtime
from spark_rapids_tools.tools.eventlog_detector.resolver import resolve_event_log_files
from spark_rapids_tools.tools.eventlog_detector.scanner import scan_events_across
from spark_rapids_tools.tools.eventlog_detector.types import (
DetectionResult,
SparkRuntime,
Termination,
ToolExecution,
)


def detect_spark_runtime(
event_log: Union[str, CspPath],
*,
max_events_scanned: int = 500,
allow_cpu_fast_path: bool = True,
) -> DetectionResult:
"""Classify a single-app event log into a tool execution decision.

Returns ``PROFILING`` when a RAPIDS marker is found, ``QUALIFICATION`` when
the log appears to be OSS Spark/CPU, and ``UNKNOWN`` when the bounded scan
cannot make a decision.

``max_events_scanned`` caps CPU/IO cost. Logs that do not expose a RAPIDS
marker or ``SparkListenerEnvironmentUpdate`` within the cap remain
``UNKNOWN``.

``allow_cpu_fast_path`` enables early CPU routing when startup properties
contain no RAPIDS markers. Disable it to require EOF before returning
``QUALIFICATION``.
"""
# Keep the caller's input verbatim in source_path (cloud URI schemes
# would otherwise be stripped by CspPath normalisation).
source_path = event_log if isinstance(event_log, str) else str(event_log)
path = event_log if isinstance(event_log, CspPath) else CspPath(str(event_log))
_, files = resolve_event_log_files(path)

scan = scan_events_across(
files,
budget=max_events_scanned,
allow_cpu_fast_path=allow_cpu_fast_path,
)

runtime: Optional[SparkRuntime]
if scan.rapids_build_info_seen:
runtime = SparkRuntime.SPARK_RAPIDS
elif scan.env_update_seen:
runtime = classify_runtime(scan.spark_properties)
else:
runtime = None

if runtime is SparkRuntime.SPARK_RAPIDS:
tool_execution = ToolExecution.PROFILING
reason = f"decisive: classified as {runtime.value}"
elif scan.termination is Termination.CPU_FAST_PATH and runtime is SparkRuntime.SPARK:
tool_execution = ToolExecution.QUALIFICATION
reason = "startup properties classify as SPARK with no RAPIDS markers"
elif scan.termination is Termination.EXHAUSTED and scan.env_update_seen:
tool_execution = ToolExecution.QUALIFICATION
reason = "walked full log, no RAPIDS signal"
else:
tool_execution = ToolExecution.UNKNOWN
reason = (
"no decisive signal within bounded scan"
if scan.env_update_seen
else "no SparkListenerEnvironmentUpdate reached"
)

resolved_path = scan.last_scanned_path or (str(files[0]) if files else source_path)
return DetectionResult(
tool_execution=tool_execution,
spark_runtime=runtime,
app_id=scan.app_id,
spark_version=scan.spark_version,
event_log_path=resolved_path,
source_path=source_path,
reason=reason,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2026, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Property keys and constants used by the runtime detector.

Each block carries a Scala source reference so the two implementations
can be kept in sync when the Scala detection rules change.
"""

# GPU (SPARK_RAPIDS) markers.
# Scala: org/apache/spark/sql/rapids/tool/ToolUtils.scala :: isPluginEnabled
GPU_PLUGIN_KEY: str = "spark.plugins"
GPU_PLUGIN_CLASS_SUBSTRING: str = "com.nvidia.spark.SQLPlugin"
GPU_ENABLED_KEY: str = "spark.rapids.sql.enabled"
# Defaults to true when missing or unparseable.
GPU_ENABLED_DEFAULT: bool = True

# RAPIDS 24.06+ plugin marker.
# Scala: com/nvidia/spark/rapids/SparkRapidsBuildInfoEvent.scala
EVENT_SPARK_RAPIDS_BUILD_INFO: str = "com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent"
EVENT_SPARK_RAPIDS_BUILD_INFO_SHORTNAME: str = "SparkRapidsBuildInfoEvent"

# Apache Spark rolling event-log directory layout.
# Scala: com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala :: isEventLogDir
OSS_EVENT_LOG_DIR_PREFIX: str = "eventlog_v2_"
OSS_EVENT_LOG_FILE_PREFIX: str = "events_"

# Spark listener event names consumed by the scanner.
EVENT_LOG_START: str = "SparkListenerLogStart"
EVENT_APPLICATION_START: str = "SparkListenerApplicationStart"
EVENT_ENVIRONMENT_UPDATE: str = "SparkListenerEnvironmentUpdate"
EVENT_SQL_EXECUTION_START: str = "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart"
# Unqualified event name accepted by the scanner for compatibility.
EVENT_SQL_EXECUTION_START_SHORTNAME: str = "SparkListenerSQLExecutionStart"
Loading