[FEA] Lightweight event log runtime detector (#2082)#2086
[FEA] Lightweight event log runtime detector (#2082)#2086sayedbilalbari merged 29 commits intoNVIDIA:devfrom
Conversation
Add design spec for a Python-side detector that classifies a Spark event log as SPARK / SPARK_RAPIDS / PHOTON / AURON without running the full qualification or profiling pipeline. Scoped to single-app inputs (files and rolling directories) for V1, targeting aether's per-job pre-flight decision. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
- Extend scanner scope to include SparkListenerJobStart and SparkListenerSQLExecutionStart events, merging job-level properties and modifiedConfigs so Python classification stays in sync with the Scala pipeline (closes divergence where spark.plugins / Databricks tags are set after env-update). - Specify Databricks rolling-dir file ordering: mirror Scala's date parse with bare `eventlog` treated as latest and earliest chunk read first (the one carrying ApplicationStart). - Correct storage API references: CspFs.list_all_files / CspPath.open_input_stream. - Add `.lzf` to supported codecs (matches Scala and existing on-prem docs); use correct distribution name `spark-rapids-user-tools` and flag the `[compression]` extra as a new addition introduced by this feature. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Incorporate second review pass: the previous spec drifted toward exact Scala parity and accumulated scope (rolling dirs, five compression codecs, four-way taxonomy as primary output). Pull it back to what the issue actually asked for. - Primary contract is now a Route enum (QUALIFICATION / PROFILING / UNKNOWN); spark_runtime demotes to auxiliary metadata. - Framed explicitly as best-effort early routing, not exact parity. Inconclusive logs return UNKNOWN; caller falls back to the full tool. - V1 input shape reduced to single-file and Databricks rolling dir (kept because aether requires it). Spark-native rolling-dir support is now out of scope. - V1 codecs reduced to plain / gz / zstd. No new `[compression]` extra. - Scanner still consumes SQLExecutionStart.modifiedConfigs (covers logs that enable spark.plugins late) but intentionally skips the narrow JobStart job-level plugin re-evaluation path, documented as a deliberate divergence. - Errors condensed to three types; "no env-update seen" becomes Route.UNKNOWN instead of an exception. - Section 13 records the evolution so reviewers can see what was pulled in and what was pulled back out. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Third review pass. Three targeted changes, no scope expansion. - Asymmetric decision rule: PROFILING on a decisive non-SPARK signal, QUALIFICATION only after walking the full log (EXHAUSTED) with no GPU signal, UNKNOWN when the cap is hit or env-update was never seen. Previously the spec promoted "no GPU signal in prefix" to QUALIFICATION, which is unsafe because Scala can promote runtime later via SQLExecutionStart.modifiedConfigs. - Databricks rolling dir resolves to the full ordered file list and the scanner walks all files under one shared event budget. Picking only the earliest file undercut the scanner's own mitigation, since GPU markers set via modifiedConfigs can land in later rolled files. - Stream opener is explicitly a `@contextmanager` that yields an Iterator[str] and owns closing the codec / text / byte streams. Previous signature vs usage were inconsistent. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Fourth review pass, two low-risk clarifications. - Memory contract stated explicitly at the top of section 7: streaming only, no full-file reads, no raw-event accumulation, bounded per-invocation state (scalars + one mutable spark_properties dict). Prevents an implementation from drifting into read() / readlines() / full-log buffering and silently breaking the "lightweight" promise. - max_events_scanned reframed as the primary CPU/I-O cost cap, not just an ambiguity tie-breaker. Large CPU logs hitting the cap will terminate as UNKNOWN; that is expected behavior, not a failure mode. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Twelve bite-sized tasks with TDD flow. Covers: zstandard dep, package skeleton, types/enums/exceptions, Scala-pinned markers, classifier, codec-aware streamer, resolver (single file + Databricks rolling), bounded multi-file scanner, top-level detect_spark_runtime, fixture anchor tests, full-suite + lint run, and a final spec doc update capturing the realized fixture inventory. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Replace lazy __getattr__ re-exports with direct imports in __init__.py (fixes E0603 undefined-all-variable and downstream E0611 in tests). Remove unused imports, add class docstrings, move inline json imports to module level, replace list comprehensions with list(), use dict literals, add pylint: disable=too-few-public-methods for single-test-method classes. Pylint score: 10.00/10. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Replace the two placeholder fixture bullets with the concrete three-item inventory actually used by test_detector_fixtures.py, and clarify that Databricks-rolling coverage is handled via synthesized fixtures in test_detector.py since core/src/test/resources/ has no reusable Databricks-rolling-shape fixture. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
One missing blank line between the module-level constants block and _classify_suffix. Flake8 E302 fix; no behavior change. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Mirror Scala's ``Try { s.toBoolean }.getOrElse(default)`` exactly: only
``"true"``/``"false"`` (case-insensitive) are valid boolean strings;
``"yes"``, ``"no"``, ``"1"``, ``"0"`` and everything else falls back to
the default. Removes the overly broad ``_TRUE_STRINGS``/``_FALSE_STRINGS``
sets that caused false-negatives (e.g. ``spark.rapids.sql.enabled=no``
routed as CPU by us but GPU by Scala). Adds a parametrized test to
document parity with Scala behaviour.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Replace the narrow ``_UNSUPPORTED_CODEC_SUFFIXES`` blacklist with an explicit ``_SUPPORTED_SUFFIXES`` whitelist (plain, .inprogress, .gz, .zstd, .zst). Any unrecognised file suffix now raises ``UnsupportedCompressionError`` rather than silently falling through as plain text, which could cause garbled reads on unknown compressed formats. Adds ``_PLAIN_SUFFIXES`` constant and updates the else-branch comment to reflect that unknown-codec fallthrough is no longer possible. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Add ``last_scanned_path`` to ``_ScanResult`` and update it in ``_scan_events_across`` as each file is opened. In ``detector.py``, prefer ``scan.last_scanned_path`` over ``files[0]`` so that ``DetectionResult.event_log_path`` reflects the file that actually contained the decisive signal, not always the first file in a rolling directory. Extends the Databricks rolling-dir test to assert the correct path is reported. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
The plan document was a development aid for the implementation phase and does not belong in the published change set. The design spec remains under docs/superpowers/specs/ as the authoritative reference. Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
The design spec was a development artifact and does not belong in the published change set. Module- and function-level docstrings describe the detector contract in the code itself. Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Greptile SummaryThis PR adds a lightweight, streaming Python detector that classifies a single Spark application event log as
Confidence Score: 4/5Safe to merge after fixing the non-dict JSON guard in scanner.py; all other logic is correct and well-tested. One P1 bug: a valid-JSON non-dict line (e.g. a bare JSON array) causes an unhandled AttributeError that bypasses the EventLogDetectionError hierarchy. All other components — classification, streaming, path resolution, decision logic — look correct and are covered by tests. Score is capped at 4 by the single P1. user_tools/src/spark_rapids_tools/tools/eventlog_detector/scanner.py — the non-dict JSON guard fix is isolated to the scan_events function. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[detect_spark_runtime] --> B[resolve_event_log_files]
B -->|single file| C[1-element file list]
B -->|eventlog_v2_dir| D[sorted events_N_ files]
B -->|other dir| E([UnsupportedInputError])
C --> F[scan_events_across]
D --> F
F --> G[open_event_log_stream per file]
G -->|.gz or .zst| H[PyArrow auto-decompress]
G -->|.zstd| I[zstandard manual decompress]
G -->|plain or .inprogress| J[raw bytes]
G -->|other suffix| K([UnsupportedCompressionError])
H --> L[scan_events JSON line loop]
I --> L
J --> L
L -->|SparkRapidsBuildInfoEvent| M([DECISIVE SPARK_RAPIDS])
L -->|EnvironmentUpdate classifies SPARK_RAPIDS| M
L -->|EnvironmentUpdate no RAPIDS markers fast-path| N([CPU_FAST_PATH SPARK])
L -->|budget exhausted| O([CAP_HIT])
L -->|all lines read| P([EXHAUSTED])
M --> Q{detector decision}
N --> Q
O --> Q
P --> Q
Q -->|SPARK_RAPIDS| R([PROFILING])
Q -->|CPU_FAST_PATH and SPARK| S([QUALIFICATION])
Q -->|EXHAUSTED and env_update_seen| S
Q -->|otherwise| T([UNKNOWN])
Reviews (9): Last reviewed commit: "test: trim event log detector coverage o..." | Re-trigger Greptile |
Tighten module/function docstrings and inline comments across the detector package: - Drop references to internal planning docs and consumer-specific projects. - Shorten verbose narrative in module docstrings to the essentials. - Keep the Scala source pointers in markers.py (they are the actionable pin-points for keeping the two sides in sync). - Collapse long "why we do X" comments where the surrounding code already makes the intent obvious. Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
- resolver: invalid date components (e.g. month=13) in a Databricks
rolled filename now return None instead of letting ValueError escape
as a non-domain exception. The bad name sorts last alongside bare
`eventlog`.
- stream: narrow the context-manager's yield-time exception handler
to OSError so caller-side logic errors propagate untouched instead
of being reclassified as EventLogReadError.
- classifier: compile the Auron extension regex with re.DOTALL so a
newline-separated spark.sql.extensions value still matches.
The fourth Greptile finding (Auron enabled-flag should use _parse_bool)
is intentionally not applied: Scala's AuronParseHelper.isAuronTurnedOn
uses equalsIgnoreCase("true") — strict string equality — not
Try { toBoolean }.getOrElse(true). Keeping the Python check strict
preserves Scala parity.
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
|
@greptile review |
c1cc6ad to
1641290
Compare
|
@greptile review |
| from spark_rapids_tools.tools.eventlog_detector.types import SparkRuntime | ||
|
|
||
|
|
||
| def _parse_bool(raw: str, default: bool) -> bool: |
There was a problem hiding this comment.
This logic is copied from how the Scala implementation in tools is
1641290 to
7cc58ba
Compare
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
7cc58ba to
3b140f1
Compare
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
| "spark.rapids.sql.enabled": "true", | ||
| }, | ||
| ]) | ||
| def test_rapids_plugin_configs_count_as_markers(self, props): |
There was a problem hiding this comment.
minor nit: can we combine test_disabled_rapids_plugin_still_counts_as_marker and test_rapids_enabled_key_alone_counts_as_marker here too? they seem to test the same thing
There was a problem hiding this comment.
updated. Actually updating to remove a bunch of other redundant or low-value tests as well.
There was a problem hiding this comment.
Removed these tests ->
- test_detector.py::test_fast_path_does_not_fire_when_rapids_marker_present
Removed because scanner already tests this exact branch: RAPIDS marker present in env update prevents CPU fast path, then later SQL config makes
it decisive.
- test_detector.py::test_reason_mentions_runtime_on_profiling
Removed because it only checks a human-readable reason string. Routing behavior is already tested.
- test_detector.py::test_reason_mentions_full_log_on_strict_qualification
Removed for the same reason: brittle reason-string assertion, not core behavior.
- test_resolver.py::test_generic_multi_app_dir_raises
Removed because it hits the same resolver branch as test_non_oss_rolling_dir_raises: directory basename does not start with eventlog_v2_.
- test_types.py::test_base_is_exception_not_value_error
Removed because it asserts an implementation choice rather than an integration contract. We kept the important part: detector-specific errors
subclass EventLogDetectionError
These tests have been consolidated ->
- test_classifier.py marker tests
Previously separate tests checked plain Spark, enabled-key-only, plugin-only, plugin-list, plugin-plus-enabled, and lookalike plugin. These are
now one parametrized test:
test_detects_rapids_marker_configs(props, expected)
This keeps all cases but removes repeated test structure.
This keeps all cases but removes repeated test structure.
- test_stream.py unsupported compression tests
Previously .lz4, .snappy, .lzf, and unknown suffix each had a separate test. These are now one parametrized test:
test_unsupported_suffix_raises(suffix)
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
|
@sayedbilalbari To confirm the high-level goal of the parser:
|
cindyyuanjiang
left a comment
There was a problem hiding this comment.
Thanks @sayedbilalbari! LGTM!
@cindyyuanjiang Yes. With the current fast-path default, this is the behavior. In case false positives are to be avoided, toggle the fast-path off(trade off in some cases where running the qualification and failing takes longer than some extra time at detection) |
| try: | ||
| event = json.loads(raw) | ||
| except (json.JSONDecodeError, ValueError): | ||
| # Tolerate trailing partial lines in live logs; count them so | ||
| # a pathological log can't keep us scanning forever. | ||
| result.events_scanned += 1 | ||
| continue | ||
|
|
||
| result.events_scanned += 1 | ||
| name = event.get("Event") |
There was a problem hiding this comment.
Non-dict JSON values escape the try/except and raise
AttributeError
json.loads(raw) can succeed and return a non-dict JSON value (e.g., a JSON array […] or a bare number). The try/except (json.JSONDecodeError, ValueError) only guards the parse call on line 71; line 79's event.get("Event") is outside that block and raises AttributeError for any non-dict value. That AttributeError propagates all the way through scan_events_across and detect_spark_runtime as a bare Python exception — not an EventLogDetectionError — breaking callers that key their fallback on the detector's typed error hierarchy.
A minimal guard keeps the handling consistent with the existing "skip unrecognised lines" approach:
try:
event = json.loads(raw)
except (json.JSONDecodeError, ValueError):
result.events_scanned += 1
continue
if not isinstance(event, dict):
result.events_scanned += 1
continue
result.events_scanned += 1
name = event.get("Event")
Summary
Adds a lightweight Python event-log detector that classifies a single Spark application log as one of:
PROFILINGwhen a RAPIDS runtime signal is found.QUALIFICATIONwhen startup properties indicate standard OSS Spark with no RAPIDS markers.UNKNOWNwhen the bounded scan cannot make a safe decision.The detector is intended as an early routing check before invoking the full tools pipeline. It scans event logs directly from the Python layer and does not invoke the Scala tools runtime for this decision.
Resolves #2082.
Public API
Callers can use
ToolExecution.UNKNOWNor anyEventLogDetectionErrorsubclass as a fallback signal for the full tools pipeline.Detection Contract
The detector performs best-effort early routing for RAPIDS and standard OSS Spark event logs:
SparkRapidsBuildInfoEventor RAPIDS Spark configuration markers.SparkListenerEnvironmentUpdatewhen startup properties contain no RAPIDS markers.UNKNOWNwhen the bounded scan reachesmax_events_scannedbefore seeing enough information.The scanner is streaming and bounded. It reads line-by-line through
CspPath.open_input_stream(), stops as soon as a decision is available, and retains only runtime metadata plus accumulated Spark properties.Supported Inputs
eventlog_v2_*/events_*layout..inprogress,.gz,.zst, and.zstdevent logs.CspPathstorage layer.Unsupported input shapes and codecs raise typed
EventLogDetectionErrorsubclasses so callers can fall back cleanly.Implementation Notes
README.mddocuments the detection flow, RAPIDS markers, CPU fast path, and streaming memory contract.markers.pykeeps RAPIDS and Spark event constants pinned to the relevant Scala sources.resolver.pyresolves a single-app input into an ordered file list.stream.pyprovides codec-aware streaming without loading the full log.scanner.pyperforms bounded event scanning across one or more files.detector.pyexposes the top-leveldetect_spark_runtimeentry point.Dependencies
Adds
zstandard==0.25.0for.zstdevent logs..gzand.zstcontinue to use the existing PyArrow-backed I/O path.Test Plan
PYTHONPATH=src python -m pytest -q tests/spark_rapids_tools_ut/tools/eventlog_detectortox -e pylint,flake8.inprogressevent logs.