Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Demonstrates admission control in Python streaming data sources.

This example implements a simple blockchain-like streaming source that generates
sequential blocks and shows how to use admission control to limit batch sizes. # noqa: E501

Usage: structured_blockchain_admission_control.py [<max-blocks-per-batch>]
<max-blocks-per-batch> Maximum number of blocks to process per microbatch
(default: 10)

Run the example:
`$ bin/spark-submit examples/src/main/python/sql/streaming/\\
structured_blockchain_admission_control.py 5`

The example will process blocks in controlled batches of 5,
demonstrating admission control.
"""
import sys
import time
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

from pyspark.sql import SparkSession
from pyspark.sql.datasource import (
DataSource,
DataSourceStreamReader,
InputPartition,
)
from pyspark.sql.types import StructType


class SimpleBlockchainReader(DataSourceStreamReader):
"""A simple streaming source that generates sequential blockchain blocks.""" # noqa: E501

def __init__(self, max_block: int = 1000) -> None:
self.max_block = max_block
self.current_block = 0

def initialOffset(self) -> Dict[str, int]:
"""Start from block 0."""
return {"block": self.current_block}

def latestOffset(
self,
start: Optional[Dict[str, int]] = None,
limit: Optional[Dict[str, Any]] = None,
) -> Union[Dict[str, int], Tuple[Dict[str, int], Dict[str, int]]]:
"""
Return the latest offset, respecting admission control limits.

This demonstrates the key admission control pattern:
- Without limit: process all available blocks
- With maxRows limit: cap the end block to respect batch size
"""
# Determine where we are now
if start is None:
start_block = self.current_block
else:
start_block = start["block"]

# Simulate blockchain growth - advance by 20 blocks each time
latest_available = min(start_block + 20, self.max_block)

# Apply admission control if configured
if limit and limit.get("type") == "maxRows":
max_blocks = limit["maxRows"]
# Cap at the configured limit
end_block = min(start_block + max_blocks, latest_available)
print(
f" [Admission Control] Start: {start_block}, "
f"Available: {latest_available}, Capped: {end_block} "
f"(limit: {max_blocks})"
)
# Return tuple: (capped_offset, true_latest_offset)
return ({"block": end_block}, {"block": latest_available})
else:
# No limit - process all available
end_block = latest_available
print(f" [No Limit] Start: {start_block}, End: {end_block}")
return {"block": end_block}

def partitions(
self, start: Dict[str, int], end: Dict[str, int]
) -> List[InputPartition]:
"""Create a single partition for the block range."""
start_block = start["block"]
end_block = end["block"]
return [InputPartition(f"{start_block}:{end_block}".encode())]

def read(self, partition: InputPartition) -> Iterator[Tuple[int, int, str]]:
"""Generate block data for the partition."""
# Parse the block range
range_str = partition.value.decode()
start_block, end_block = map(int, range_str.split(":"))

# Generate block data
for block_num in range(start_block, end_block):
# Simulate block data: block number, timestamp, simple hash
yield (
block_num,
int(time.time() * 1000),
f"0x{'0' * 60}{block_num:04x}",
)

def commit(self, end: Dict[str, int]) -> None:
"""Mark this offset as committed."""
pass


class SimpleBlockchainSource(DataSource):
"""Data source for simple blockchain streaming."""

@classmethod
def name(cls) -> str:
return "simple_blockchain"

def schema(self) -> str:
return "block_number INT, timestamp LONG, block_hash STRING"

def streamReader(self, schema: StructType) -> SimpleBlockchainReader:
return SimpleBlockchainReader(max_block=1000)


if __name__ == "__main__":
max_blocks_per_batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10

print(
f"""
=================================================================
Blockchain Streaming with Admission Control
=================================================================
Configuration:
- Max blocks per batch: {max_blocks_per_batch}
- Total blocks to generate: 1000

Watch how admission control limits each microbatch to process
only {max_blocks_per_batch} blocks at a time, even when more data is available.
=================================================================
"""
)
# fmt: off
spark = SparkSession.builder.appName("StructuredBlockchainAdmissionControl").getOrCreate() # noqa: E501
# fmt: on

# Register the custom data source
spark.dataSource.register(SimpleBlockchainSource)

# Create streaming DataFrame with admission control
blocks = (
spark.readStream.format("simple_blockchain")
.option("maxRecordsPerBatch", str(max_blocks_per_batch))
.load()
)

# Show block statistics per microbatch
query = (
blocks.writeStream.outputMode("append")
.format("console")
.option("numRows", "20")
.option("truncate", "false")
.trigger(processingTime="3 seconds")
.start()
)

query.awaitTermination()
95 changes: 95 additions & 0 deletions python/docs/source/tutorial/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,101 @@ We can also use the same data source in streaming reader and writer

query = spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")

**Admission Control for Streaming Sources**

Spark supports admission control for streaming sources to limit the amount of data processed in each micro-batch. This helps control resource usage and maintain consistent processing times. Python streaming data sources support three types of admission control options:

- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
- **maxFilesPerBatch**: Limit the maximum number of files per batch
- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)

These options can be specified when reading from a streaming source:

.. code-block:: python

# Limit to 1000 rows per batch
query = spark.readStream \
.format("fake") \
.option("maxRecordsPerBatch", "1000") \
.load() \
.writeStream \
.format("console") \
.start()

# Limit to 100 files per batch
query = spark.readStream \
.format("fake") \
.option("maxFilesPerBatch", "100") \
.load() \
.writeStream \
.format("console") \
.start()

# Limit to 10 MB per batch
query = spark.readStream \
.format("fake") \
.option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
.load() \
.writeStream \
.format("console") \
.start()

**Note**:

- Only one admission control option should be specified at a time
- All admission control values must be positive integers. If an invalid value is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` will be thrown
- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` implementations
- This behavior is consistent with Spark's built-in streaming sources (e.g., ``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)

**Implementing Admission Control in Custom Data Sources (Advanced)**

For users implementing the full ``DataSourceStreamReader`` API (not ``SimpleDataSourceStreamReader``), admission control is enabled through optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the ``latestOffset()`` method signature has been enhanced to accept optional parameters:

.. code-block:: python

from pyspark.sql.datasource import DataSourceStreamReader
from typing import Optional, Union, Tuple

class MyStreamReader(DataSourceStreamReader):
def latestOffset(
self,
start: Optional[dict] = None,
limit: Optional[dict] = None
) -> Union[dict, Tuple[dict, dict]]:
# For backward compatibility: old implementations without parameters still work
if start is None and limit is None:
# Old behavior: return latest offset without admission control
return {"offset": self.get_latest_offset()}

# New behavior: with admission control support
true_latest = self.get_latest_offset()

# Apply admission control if configured
if limit and limit.get("type") == "maxRows":
max_rows = limit["maxRows"]
capped_offset = self.calculate_capped_offset(start, max_rows)
# Return tuple: (capped_offset, true_latest_offset)
return (capped_offset, true_latest)
elif limit and limit.get("type") == "maxFiles":
max_files = limit["maxFiles"]
capped_offset = self.calculate_capped_offset_by_files(start, max_files)
return (capped_offset, true_latest)
else:
# No limit or allAvailable
return (true_latest, true_latest)

**Key Points:**

- **Backward Compatibility**: Old implementations that don't accept parameters continue to work without modification
- **Optional Parameters**: Both ``start`` and ``limit`` are optional; if not provided, implement old behavior
- **Return Type**: Return a single ``dict`` for old behavior, or a ``Tuple[dict, dict]`` for new behavior with admission control
- **Limit Structure**: The ``limit`` parameter is a dictionary with:
- ``{"type": "maxRows", "maxRows": N}`` for row-based limits
- ``{"type": "maxFiles", "maxFiles": N}`` for file-based limits
- ``{"type": "maxBytes", "maxBytes": N}`` for byte-based limits
- ``{"type": "allAvailable"}`` for no limit
- **SimpleDataSourceStreamReader**: Users of the simple API don't need to implement this; the framework handles admission control automatically

Python Data Source Reader with direct Arrow Batch support for improved performance
----------------------------------------------------------------------------------
The Python Datasource Reader supports direct yielding of Arrow Batches, which can significantly improve data processing performance. By using the efficient Arrow format,
Expand Down
36 changes: 31 additions & 5 deletions python/pyspark/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,20 +714,46 @@ def initialOffset(self) -> dict:
messageParameters={"feature": "initialOffset"},
)

def latestOffset(self) -> dict:
def latestOffset(
self, start: Optional[dict] = None, limit: Optional[dict] = None
) -> Union[dict, Tuple[dict, dict]]:
"""
Returns the most recent offset available.

Parameters (optional - added in Spark 4.2 for admission control)
-------------------------------------------------------------------
start : dict, optional
The starting offset. Enables admission control when provided.
limit : dict, optional
Admission control limit with structure:
- {"type": "maxRows", "maxRows": N}
- {"type": "maxFiles", "maxFiles": N}
- {"type": "maxBytes", "maxBytes": N}
- {"type": "allAvailable"}

Returns
-------
dict
A dict or recursive dict whose key and value are primitive types, which includes
Integer, String and Boolean.
dict or Tuple[dict, dict]
- Old behavior (no params): returns single offset dict
- New behavior (with params): returns (capped_offset, true_latest_offset)

Examples
--------
Old implementation (backward compatible):

>>> def latestOffset(self):
... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}}
... return {"partition-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}}

New implementation (with admission control):

>>> def latestOffset(self, start=None, limit=None):
... if start is None or limit is None:
... return {"offset": self.get_latest()}
... if limit["type"] == "maxRows":
... capped = self.calc_capped(start, limit["maxRows"])
... true_latest = self.get_latest()
... return (capped, true_latest)
... return (self.get_latest(), self.get_latest())
"""
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
Expand Down
Loading