Skip to content

Commit 53f0fb1

Browse files
committed
[SPARK-54305][SQL][PYTHON] Add admission control support to Python DataSource streaming API
This PR adds admission control support to Python streaming data sources by: 1. Enhanced Python API: Modified DataSourceStreamReader.latestOffset() to accept optional start and limit parameters 2. Scala Bridge: Updated PythonMicroBatchStream to validate admission control options and throw IllegalArgumentException for invalid values 3. Python-JVM Communication: Added function IDs and handlers in PythonStreamingSourceRunner for admission control parameters 4. Python Worker: Implemented latest_offset_with_report_func() with backward compatibility fallback 5. Documentation: Added comprehensive guide in python_data_source.rst 6. Tests: Added validation tests for invalid admission control parameters 7. Example: Created structured_blockchain_admission_control.py demonstrating the Full API Key Benefits: - Predictable performance with controlled batch sizes - Rate limiting and backpressure support - Feature parity with Scala DataSource capabilities - Full backward compatibility (all parameters optional)
1 parent 2e4708e commit 53f0fb1

File tree

8 files changed

+946
-156
lines changed

8 files changed

+946
-156
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Demonstrates admission control in Python streaming data sources.
20+
21+
This example implements a simple blockchain-like streaming source that generates
22+
sequential blocks and shows how to use admission control to limit batch sizes.
23+
24+
Usage: structured_blockchain_admission_control.py [<max-blocks-per-batch>]
25+
<max-blocks-per-batch> Maximum number of blocks to process per microbatch (default: 10)
26+
27+
Run the example:
28+
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py 5`
29+
30+
The example will process blocks in controlled batches of 5, demonstrating admission control.
31+
"""
32+
import sys
33+
import time
34+
35+
from pyspark.sql import SparkSession
36+
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
37+
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType
38+
39+
40+
class SimpleBlockchainReader(DataSourceStreamReader):
41+
"""A simple streaming source that generates sequential blockchain blocks."""
42+
43+
def __init__(self, max_block=1000):
44+
self.max_block = max_block
45+
self.current_block = 0
46+
47+
def initialOffset(self):
48+
"""Start from block 0."""
49+
return {"block": self.current_block}
50+
51+
def latestOffset(self, start=None, limit=None):
52+
"""
53+
Return the latest offset, respecting admission control limits.
54+
55+
This demonstrates the key admission control pattern:
56+
- Without limit: process all available blocks
57+
- With maxRows limit: cap the end block to respect batch size
58+
"""
59+
# Determine where we are now
60+
if start is None:
61+
start_block = self.current_block
62+
else:
63+
start_block = start["block"]
64+
65+
# Simulate blockchain growth - advance by 20 blocks each time
66+
latest_available = min(start_block + 20, self.max_block)
67+
68+
# Apply admission control if configured
69+
if limit and limit.get("type") == "maxRows":
70+
max_blocks = limit["maxRows"]
71+
# Cap at the configured limit
72+
end_block = min(start_block + max_blocks, latest_available)
73+
print(
74+
f" [Admission Control] Start: {start_block}, Available: {latest_available}, "
75+
f"Capped: {end_block} (limit: {max_blocks})"
76+
)
77+
# Return tuple: (capped_offset, true_latest_offset)
78+
return ({"block": end_block}, {"block": latest_available})
79+
else:
80+
# No limit - process all available
81+
end_block = latest_available
82+
print(f" [No Limit] Start: {start_block}, End: {end_block}")
83+
return {"block": end_block}
84+
85+
def partitions(self, start, end):
86+
"""Create a single partition for the block range."""
87+
start_block = start["block"]
88+
end_block = end["block"]
89+
return [InputPartition(f"{start_block}:{end_block}".encode())]
90+
91+
def read(self, partition):
92+
"""Generate block data for the partition."""
93+
# Parse the block range
94+
range_str = partition.value.decode()
95+
start_block, end_block = map(int, range_str.split(":"))
96+
97+
# Generate block data
98+
for block_num in range(start_block, end_block):
99+
# Simulate block data: block number, timestamp, simple hash
100+
yield (
101+
block_num,
102+
int(time.time() * 1000),
103+
f"0x{'0' * 60}{block_num:04x}",
104+
)
105+
106+
def commit(self, end):
107+
"""Mark this offset as committed."""
108+
pass
109+
110+
111+
class SimpleBlockchainSource(DataSource):
112+
"""Data source for simple blockchain streaming."""
113+
114+
@classmethod
115+
def name(cls):
116+
return "simple_blockchain"
117+
118+
def schema(self):
119+
return "block_number INT, timestamp LONG, block_hash STRING"
120+
121+
def streamReader(self, schema):
122+
return SimpleBlockchainReader(max_block=1000)
123+
124+
125+
if __name__ == "__main__":
126+
max_blocks_per_batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10
127+
128+
print(
129+
f"""
130+
=================================================================
131+
Blockchain Streaming with Admission Control
132+
=================================================================
133+
Configuration:
134+
- Max blocks per batch: {max_blocks_per_batch}
135+
- Total blocks to generate: 1000
136+
137+
Watch how admission control limits each microbatch to process
138+
only {max_blocks_per_batch} blocks at a time, even when more data is available.
139+
=================================================================
140+
"""
141+
)
142+
143+
spark = (
144+
SparkSession.builder.appName("StructuredBlockchainAdmissionControl").getOrCreate()
145+
)
146+
147+
# Register the custom data source
148+
spark.dataSource.register(SimpleBlockchainSource)
149+
150+
# Create streaming DataFrame with admission control
151+
blocks = (
152+
spark.readStream.format("simple_blockchain")
153+
.option("maxRecordsPerBatch", str(max_blocks_per_batch))
154+
.load()
155+
)
156+
157+
# Show block statistics per microbatch
158+
query = (
159+
blocks.writeStream.outputMode("append")
160+
.format("console")
161+
.option("numRows", "20")
162+
.option("truncate", "false")
163+
.trigger(processingTime="3 seconds")
164+
.start()
165+
)
166+
167+
query.awaitTermination()
168+

python/docs/source/tutorial/sql/python_data_source.rst

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,101 @@ We can also use the same data source in streaming reader and writer
487487
488488
query = spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")
489489
490+
**Admission Control for Streaming Sources**
491+
492+
Spark supports admission control for streaming sources to limit the amount of data processed in each micro-batch. This helps control resource usage and maintain consistent processing times. Python streaming data sources support three types of admission control options:
493+
494+
- **maxRecordsPerBatch**: Limit the maximum number of rows per batch
495+
- **maxFilesPerBatch**: Limit the maximum number of files per batch
496+
- **maxBytesPerBatch**: Limit the maximum bytes per batch (in bytes)
497+
498+
These options can be specified when reading from a streaming source:
499+
500+
.. code-block:: python
501+
502+
# Limit to 1000 rows per batch
503+
query = spark.readStream \
504+
.format("fake") \
505+
.option("maxRecordsPerBatch", "1000") \
506+
.load() \
507+
.writeStream \
508+
.format("console") \
509+
.start()
510+
511+
# Limit to 100 files per batch
512+
query = spark.readStream \
513+
.format("fake") \
514+
.option("maxFilesPerBatch", "100") \
515+
.load() \
516+
.writeStream \
517+
.format("console") \
518+
.start()
519+
520+
# Limit to 10 MB per batch
521+
query = spark.readStream \
522+
.format("fake") \
523+
.option("maxBytesPerBatch", str(10 * 1024 * 1024)) \
524+
.load() \
525+
.writeStream \
526+
.format("console") \
527+
.start()
528+
529+
**Note**:
530+
531+
- Only one admission control option should be specified at a time
532+
- All admission control values must be positive integers. If an invalid value is provided (negative, zero, or non-numeric), an ``IllegalArgumentException`` will be thrown
533+
- These options apply to both ``streamReader()`` and ``simpleStreamReader()`` implementations
534+
- This behavior is consistent with Spark's built-in streaming sources (e.g., ``maxFilesPerTrigger``, ``maxBytesPerTrigger`` for file sources)
535+
536+
**Implementing Admission Control in Custom Data Sources (Advanced)**
537+
538+
For users implementing the full ``DataSourceStreamReader`` API (not ``SimpleDataSourceStreamReader``), admission control is enabled through optional parameters on the ``latestOffset()`` method. As of Spark 4.2, the ``latestOffset()`` method signature has been enhanced to accept optional parameters:
539+
540+
.. code-block:: python
541+
542+
from pyspark.sql.datasource import DataSourceStreamReader
543+
from typing import Optional, Union, Tuple
544+
545+
class MyStreamReader(DataSourceStreamReader):
546+
def latestOffset(
547+
self,
548+
start: Optional[dict] = None,
549+
limit: Optional[dict] = None
550+
) -> Union[dict, Tuple[dict, dict]]:
551+
# For backward compatibility: old implementations without parameters still work
552+
if start is None and limit is None:
553+
# Old behavior: return latest offset without admission control
554+
return {"offset": self.get_latest_offset()}
555+
556+
# New behavior: with admission control support
557+
true_latest = self.get_latest_offset()
558+
559+
# Apply admission control if configured
560+
if limit and limit.get("type") == "maxRows":
561+
max_rows = limit["maxRows"]
562+
capped_offset = self.calculate_capped_offset(start, max_rows)
563+
# Return tuple: (capped_offset, true_latest_offset)
564+
return (capped_offset, true_latest)
565+
elif limit and limit.get("type") == "maxFiles":
566+
max_files = limit["maxFiles"]
567+
capped_offset = self.calculate_capped_offset_by_files(start, max_files)
568+
return (capped_offset, true_latest)
569+
else:
570+
# No limit or allAvailable
571+
return (true_latest, true_latest)
572+
573+
**Key Points:**
574+
575+
- **Backward Compatibility**: Old implementations that don't accept parameters continue to work without modification
576+
- **Optional Parameters**: Both ``start`` and ``limit`` are optional; if not provided, implement old behavior
577+
- **Return Type**: Return a single ``dict`` for old behavior, or a ``Tuple[dict, dict]`` for new behavior with admission control
578+
- **Limit Structure**: The ``limit`` parameter is a dictionary with:
579+
- ``{"type": "maxRows", "maxRows": N}`` for row-based limits
580+
- ``{"type": "maxFiles", "maxFiles": N}`` for file-based limits
581+
- ``{"type": "maxBytes", "maxBytes": N}`` for byte-based limits
582+
- ``{"type": "allAvailable"}`` for no limit
583+
- **SimpleDataSourceStreamReader**: Users of the simple API don't need to implement this; the framework handles admission control automatically
584+
490585
Python Data Source Reader with direct Arrow Batch support for improved performance
491586
----------------------------------------------------------------------------------
492587
The Python Datasource Reader supports direct yielding of Arrow Batches, which can significantly improve data processing performance. By using the efficient Arrow format,

python/pyspark/sql/datasource.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -714,20 +714,46 @@ def initialOffset(self) -> dict:
714714
messageParameters={"feature": "initialOffset"},
715715
)
716716

717-
def latestOffset(self) -> dict:
717+
def latestOffset(
718+
self, start: Optional[dict] = None, limit: Optional[dict] = None
719+
) -> Union[dict, Tuple[dict, dict]]:
718720
"""
719721
Returns the most recent offset available.
720722
723+
Parameters (optional - added in Spark 4.2 for admission control)
724+
----------
725+
start : dict, optional
726+
The starting offset. Enables admission control when provided.
727+
limit : dict, optional
728+
Admission control limit with structure:
729+
- {"type": "maxRows", "maxRows": N}
730+
- {"type": "maxFiles", "maxFiles": N}
731+
- {"type": "maxBytes", "maxBytes": N}
732+
- {"type": "allAvailable"}
733+
721734
Returns
722735
-------
723-
dict
724-
A dict or recursive dict whose key and value are primitive types, which includes
725-
Integer, String and Boolean.
736+
dict or Tuple[dict, dict]
737+
- Old behavior (no params): returns single offset dict
738+
- New behavior (with params): returns (capped_offset, true_latest_offset)
726739
727740
Examples
728741
--------
742+
Old implementation (backward compatible):
743+
729744
>>> def latestOffset(self):
730-
... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}}
745+
... return {"partition-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}}
746+
747+
New implementation (with admission control):
748+
749+
>>> def latestOffset(self, start=None, limit=None):
750+
... if start is None or limit is None:
751+
... return {"offset": self.get_latest()}
752+
... if limit["type"] == "maxRows":
753+
... capped = self.calc_capped(start, limit["maxRows"])
754+
... true_latest = self.get_latest()
755+
... return (capped, true_latest)
756+
... return (self.get_latest(), self.get_latest())
731757
"""
732758
raise PySparkNotImplementedError(
733759
errorClass="NOT_IMPLEMENTED",

0 commit comments

Comments
 (0)