Skip to content

Conversation

@jiteshsoni
Copy link

@jiteshsoni jiteshsoni commented Nov 16, 2025

What changes were proposed in this pull request?

This PR adds admission control support to Python streaming data sources, enabling users to control microbatch sizes through maxRecordsPerBatch, maxFilesPerBatch, and maxBytesPerBatch options.

Changes:

  • Enhanced DataSourceStreamReader.latestOffset() to accept optional start and limit parameters
  • Updated PythonMicroBatchStream to validate admission control options (throws IllegalArgumentException for invalid values)
  • Added Python-JVM communication handlers for admission control parameters
  • Implemented backward compatibility fallback logic in python_streaming_source_runner.py
  • Added comprehensive documentation and example (structured_blockchain_admission_control.py)
  • Added test cases validating invalid parameter handling

Why are the changes needed?

Python streaming data sources previously could not control microbatch sizes because latestOffset() had no parameters to receive configured limits. This forced Python sources to either process all available data (unpredictable resource usage) or artificially limit offsets (risking data loss). Scala sources have this capability via SupportsAdmissionControl.

Detailed Verification

✅ 1. Enhanced DataSourceStreamReader.latestOffset() API

Claim: Enhanced to accept optional start and limit parameters

Verification:

  • File: python/pyspark/sql/datasource.py (lines 717-761)
  • Implementation:
    def latestOffset(
        self, start: Optional[dict] = None, limit: Optional[dict] = None
    ) -> Union[dict, Tuple[dict, dict]]:
  • Features:
    • Optional parameters for admission control
    • Returns single dict (old behavior) or tuple (capped, true_latest)
    • Fully documented with examples
  • Status: ✅ VERIFIED

✅ 2. Admission Control Validation in PythonMicroBatchStream

Claim: Validates admission control options and throws IllegalArgumentException for invalid values

Verification:

  • File: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonMicroBatchStream.scala (lines 92-125)
  • Implementation:
    • Validates maxRecordsPerBatch (Long, > 0)
    • Validates maxFilesPerBatch (Int, > 0)
    • Validates maxBytesPerBatch (Long, > 0)
    • Throws IllegalArgumentException for invalid values
  • Code Snippet:
    def parseLong(key: String): Long = {
      Try(options.get(key).toLong).toOption.filter(_ > 0).getOrElse {
        throw new IllegalArgumentException(
          s"Invalid value '${options.get(key)}' for option '$key', must be a positive integer")
      }
    }
  • Status: ✅ VERIFIED

✅ 3. Python-JVM Communication Handlers

Claim: Added Python-JVM communication handlers for admission control parameters

Verification:

  • File: python/pyspark/sql/streaming/python_streaming_source_runner.py
  • Implementation:
    • Function ID: LATEST_OFFSET_WITH_REPORT_FUNC_ID = 890
    • Handler: latest_offset_with_report_func() (lines 108-164)
    • Handles serialization of start offset and limit
    • Returns both capped and true latest offsets
  • Status: ✅ VERIFIED

✅ 4. Backward Compatibility Fallback Logic

Claim: Implemented backward compatibility fallback logic

Verification:

  • File: python/pyspark/sql/streaming/python_streaming_source_runner.py (lines 140-150)
  • Implementation:
    try:
        # Try calling with optional parameters (new signature)
        result = reader.latestOffset(start_offset, limit)
    except TypeError:
        # Old signature that doesn't accept parameters - fallback
        fallback_result = reader.latestOffset()
  • Features:
    • Catches TypeError for old implementations
    • Falls back to parameterless call
    • Handles both tuple and single dict return types
  • Status: ✅ VERIFIED

✅ 5. Comprehensive Documentation

Claim: Added comprehensive documentation and examples

Verification:

  • File: python/docs/source/tutorial/sql/python_data_source.rst
  • Content: (+95 lines)
    • Section: "Admission Control for Streaming Sources"
    • Documents all three options: maxRecordsPerBatch, maxFilesPerBatch, maxBytesPerBatch
    • Provides usage examples for each option
    • Explains advanced implementation for DataSourceStreamReader
    • Notes on backward compatibility
    • Warning about validation and IllegalArgumentException
  • Example Code:
    query = spark.readStream \
        .format("fake") \
        .option("maxRecordsPerBatch", "1000") \
        .load()
  • Status: ✅ VERIFIED

✅ 6. Example Implementation

Claim: Added comprehensive example (structured_blockchain_admission_control.py)

Verification:

  • File: examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py
  • Size: 181 lines (new file)
  • Features Demonstrated:
    • Custom DataSourceStreamReader implementation
    • latestOffset(start, limit) with admission control
    • Tuple return pattern: (capped_offset, true_latest_offset)
    • Command-line configurable batch size
    • Blockchain-themed data generation (blocks with timestamps and hashes)
  • Key Implementation:
    def latestOffset(self, start=None, limit=None):
        if limit and limit.get("type") == "maxRows":
            max_blocks = limit["maxRows"]
            end_block = min(start_block + max_blocks, latest_available)
            return ({"block": end_block}, {"block": latest_available})
  • Status: ✅ VERIFIED

✅ 7. Test Coverage

Claim: Added Scala test cases validating IllegalArgumentException for invalid admission control values

Verification:

  • File: sql/core/src/test/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingDataSourceSuite.scala
  • Tests Added:
Test Name Purpose Status
admission control: maxRecordsPerBatch with SimpleDataSourceStreamReader Valid usage
admission control: maxFilesPerBatch option Valid usage
admission control: maxBytesPerBatch option Valid usage
admission control: invalid maxRecordsPerBatch throws exception Non-numeric value
admission control: negative maxRecordsPerBatch throws exception Negative value
admission control: zero maxRecordsPerBatch throws exception Zero value
admission control: decimal maxRecordsPerBatch throws exception Decimal value
admission control: invalid maxFilesPerBatch throws exception Non-numeric value
admission control: negative maxFilesPerBatch throws exception Negative value
admission control: invalid maxBytesPerBatch throws exception Non-numeric value
  • Test Implementation:
    assert(
      e.getCause.isInstanceOf[IllegalArgumentException],
      s"Expected IllegalArgumentException but got ${e.getCause.getClass}")
    Seq(option, value, "positive integer").foreach { s =>
      assert(e.getMessage.contains(s))
    }
  • Status: ✅ VERIFIED

Does this PR introduce any user-facing change?

Yes. Users can now implement admission control in custom Python streaming sources:

  • Simple API users get automatic admission control support
  • Full API users can implement latestOffset(start, limit) for fine-grained control
  • Fully backward compatible (old implementations continue to work)

How was this patch tested?

  • Added Scala test cases validating IllegalArgumentException for invalid admission control values
  • Existing streaming tests pass (verifying backward compatibility)
  • All linting/formatting checks pass (scalastyle, scalafmt, lint-scala, lint-python, Black, mypy)
  • Example structured_blockchain_admission_control.py demonstrates the feature

Was this patch authored or co-authored using generative AI tooling?

No.

Closes #SPARK-54305

…taSource streaming API

This PR adds admission control support to Python streaming data sources by:

1. Enhanced Python API: Modified DataSourceStreamReader.latestOffset() to accept optional start and limit parameters
2. Scala Bridge: Updated PythonMicroBatchStream to validate admission control options and throw IllegalArgumentException for invalid values
3. Python-JVM Communication: Added function IDs and handlers in PythonStreamingSourceRunner for admission control parameters
4. Python Worker: Implemented latest_offset_with_report_func() with backward compatibility fallback
5. Documentation: Added comprehensive guide in python_data_source.rst
6. Tests: Added validation tests for invalid admission control parameters
7. Example: Created structured_blockchain_admission_control.py demonstrating the Full API

Key Benefits:
- Predictable performance with controlled batch sizes
- Rate limiting and backpressure support
- Feature parity with Scala DataSource capabilities
- Full backward compatibility (all parameters optional)
…ibility

Change latestOffset() from throwing UnsupportedOperationException to
bridging to latestOffset(Offset, ReadLimit) for backward compatibility.

This fixes test failures in PythonStreamingDataSourceSuite where the
old latestOffset() signature is still called by Spark's streaming
execution engine.
- Fix null handling in latestOffset bridge method
- Handle null start offset in PythonStreamingSourceRunner by converting to empty string
- Update Python side to convert empty string back to None
- Fix error action name to be 'latestOffset' instead of 'latestOffsetWithReport'

All PythonStreamingDataSourceSuite tests now pass (8/8).
Add noqa: E501 comments to lines where Black and flake8 conflict.
Black considers these lines correctly formatted, but they exceed 79 chars.
@jiteshsoni jiteshsoni force-pushed the test-build-branch branch 2 times, most recently from 518b498 to ffb28fd Compare November 17, 2025 06:13
This commit addresses several issues flagged by the CI pipeline:

- Fixes Flake8 E501 (line too long) errors in 'structured_blockchain_admission_control.py' by refactoring docstrings and long lines.
- Applies Black formatting to 'datasource_internal.py' and 'python_streaming_source_runner.py' to resolve pre-existing formatting inconsistencies.
@allisonwang-db
Copy link
Contributor

cc @HeartSaVioR @jiateoh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants