|
| 1 | +# SimpleDataSourceStreamReader Architecture |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +`SimpleDataSourceStreamReader` is a lightweight streaming data source reader in PySpark designed for scenarios with small data volumes and low throughput requirements. Unlike the standard `DataSourceStreamReader`, it executes entirely on the driver node, trading scalability for simplicity. |
| 6 | + |
| 7 | +## Key Architecture Components |
| 8 | + |
| 9 | +### Python-Side Components |
| 10 | + |
| 11 | +#### SimpleDataSourceStreamReader (datasource.py) |
| 12 | +The user-facing API with three core methods: |
| 13 | +- `initialOffset()`: Returns the starting position for a new streaming query |
| 14 | +- `read(start)`: Reads all available data from a given offset and returns both the data and the next offset |
| 15 | +- `readBetweenOffsets(start, end)`: Re-reads data deterministically for failure recovery |
| 16 | + |
| 17 | +#### _SimpleStreamReaderWrapper (datasource_internal.py) |
| 18 | +A private wrapper that implements the prefetch-and-cache pattern: |
| 19 | +- Maintains `current_offset` to track reading progress |
| 20 | +- Caches prefetched data in memory on the driver |
| 21 | +- Converts simple reader interface to standard streaming reader interface |
| 22 | + |
| 23 | +### Scala-Side Components |
| 24 | + |
| 25 | +#### PythonMicroBatchStream (PythonMicroBatchStream.scala) |
| 26 | +Manages the micro-batch execution: |
| 27 | +- Creates and manages `PythonStreamingSourceRunner` for Python communication |
| 28 | +- Stores prefetched data in BlockManager with `PythonStreamBlockId` |
| 29 | +- Handles offset management and partition planning |
| 30 | + |
| 31 | +#### PythonStreamingSourceRunner (PythonStreamingSourceRunner.scala) |
| 32 | +The bridge between JVM and Python: |
| 33 | +- Spawns a Python worker process running `python_streaming_source_runner.py` |
| 34 | +- Serializes/deserializes data using Arrow format |
| 35 | +- Manages RPC-style communication for method invocations |
| 36 | + |
| 37 | +## Data Flow and Lifecycle |
| 38 | + |
| 39 | +### Query Initialization |
| 40 | +1. Spark creates `PythonMicroBatchStream` when a streaming query starts |
| 41 | +2. `PythonStreamingSourceRunner` spawns a Python worker process |
| 42 | +3. Python worker instantiates the `SimpleDataSourceStreamReader` |
| 43 | +4. Initial offset is obtained via `initialOffset()` call |
| 44 | + |
| 45 | +### Micro-batch Execution (per trigger) |
| 46 | + |
| 47 | +#### 1. Offset Discovery (Driver) |
| 48 | +- Spark calls `latestOffset()` on `PythonMicroBatchStream` |
| 49 | +- Runner invokes Python's `latestOffset()` via RPC |
| 50 | +- Wrapper calls `simple_reader.read(current_offset)` to prefetch data |
| 51 | +- Data and new offset are returned and cached |
| 52 | + |
| 53 | +#### 2. Data Caching (Driver) |
| 54 | +- Prefetched records are converted to Arrow batches |
| 55 | +- Data is stored in BlockManager with a unique `PythonStreamBlockId` |
| 56 | +- Cache entry maintains mapping of (start_offset, end_offset) → data |
| 57 | + |
| 58 | +#### 3. Partition Planning (Driver) |
| 59 | +- `planInputPartitions(start, end)` creates a single `PythonStreamingInputPartition` |
| 60 | +- Partition references the cached block ID |
| 61 | +- No actual data distribution happens (single partition on driver) |
| 62 | + |
| 63 | +#### 4. Data Reading (Executor) |
| 64 | +- Executor retrieves cached data from BlockManager using block ID |
| 65 | +- Data is already in Arrow format for efficient processing |
| 66 | +- Records are converted to internal rows for downstream processing |
| 67 | + |
| 68 | +## Integration with Spark Structured Streaming APIs |
| 69 | + |
| 70 | +### User API Integration |
| 71 | + |
| 72 | +```python |
| 73 | +# User defines a SimpleDataSourceStreamReader |
| 74 | +class MyStreamReader(SimpleDataSourceStreamReader): |
| 75 | + def initialOffset(self): |
| 76 | + return {"position": 0} |
| 77 | + |
| 78 | + def read(self, start): |
| 79 | + # Read data from source |
| 80 | + data = fetch_data_since(start["position"]) |
| 81 | + new_offset = {"position": start["position"] + len(data)} |
| 82 | + return (iter(data), new_offset) |
| 83 | + |
| 84 | + def readBetweenOffsets(self, start, end): |
| 85 | + # Re-read for failure recovery |
| 86 | + return fetch_data_between(start["position"], end["position"]) |
| 87 | + |
| 88 | +# Register and use with Spark |
| 89 | +class MyDataSource(DataSource): |
| 90 | + def simpleStreamReader(self, schema): |
| 91 | + return MyStreamReader() |
| 92 | + |
| 93 | +spark.dataSource.register(MyDataSource) |
| 94 | +df = spark.readStream.format("my_source").load() |
| 95 | +query = df.writeStream.format("console").start() |
| 96 | +``` |
| 97 | + |
| 98 | +### Streaming Engine Integration |
| 99 | + |
| 100 | +1. **Trigger Processing**: Works with all trigger modes (ProcessingTime, Once, AvailableNow) |
| 101 | +2. **Offset Management**: Offsets are checkpointed to WAL for exactly-once semantics |
| 102 | +3. **Failure Recovery**: Uses `readBetweenOffsets()` to replay uncommitted batches |
| 103 | +4. **Commit Protocol**: After successful batch, `commit(offset)` is called for cleanup |
| 104 | + |
| 105 | +## Execution Flow Diagram |
| 106 | + |
| 107 | +``` |
| 108 | +Driver Node Python Worker Executors |
| 109 | +----------- ------------- --------- |
| 110 | +PythonMicroBatchStream |
| 111 | + | |
| 112 | + ├─> latestOffset() ──────────> PythonStreamingSourceRunner |
| 113 | + | | |
| 114 | + | ├─> RPC: LATEST_OFFSET ──> SimpleStreamReaderWrapper |
| 115 | + | | | |
| 116 | + | | ├─> read(current_offset) |
| 117 | + | | | └─> (data, new_offset) |
| 118 | + | | | |
| 119 | + | |<── Arrow batches ────────────┘ |
| 120 | + | | |
| 121 | + ├─> Cache in BlockManager <───────────┘ |
| 122 | + | (PythonStreamBlockId) |
| 123 | + | |
| 124 | + ├─> planInputPartitions() |
| 125 | + | └─> Single partition with BlockId |
| 126 | + | |
| 127 | + └─> createReaderFactory() ─────────────────────────────────────> Read from BlockManager |
| 128 | + | |
| 129 | + └─> Process records |
| 130 | +``` |
| 131 | + |
| 132 | +## Key Design Decisions and Trade-offs |
| 133 | + |
| 134 | +### Advantages |
| 135 | +- **Simplicity**: No need to implement partitioning logic |
| 136 | +- **Consistency**: All data reading happens in one place (driver) |
| 137 | +- **Efficiency for small data**: Avoids overhead of distributed execution |
| 138 | +- **Easy offset management**: Single reader maintains consistent view of progress |
| 139 | +- **Quick development**: Minimal boilerplate for simple streaming sources |
| 140 | + |
| 141 | +### Limitations |
| 142 | +- **Not scalable**: All data flows through driver (bottleneck) |
| 143 | +- **Memory constraints**: Driver must cache entire micro-batch |
| 144 | +- **Single point of failure**: Driver failure affects data reading |
| 145 | +- **Network overhead**: Data must be transferred from driver to executors |
| 146 | +- **Throughput ceiling**: Limited by driver's processing capacity |
| 147 | + |
| 148 | +### Important Note from Source Code |
| 149 | +From datasource.py: |
| 150 | +> "Because SimpleDataSourceStreamReader read records in Spark driver node to determine end offset of each batch without partitioning, it is only supposed to be used in lightweight use cases where input rate and batch size is small." |
| 151 | +
|
| 152 | +## Use Cases |
| 153 | + |
| 154 | +### Ideal for: |
| 155 | +- Configuration change streams |
| 156 | +- Small lookup table updates |
| 157 | +- Low-volume event streams (< 1000 records/sec) |
| 158 | +- Prototyping and testing streaming applications |
| 159 | +- REST API polling with low frequency |
| 160 | +- File system monitoring for small files |
| 161 | +- Message queue consumers with low throughput |
| 162 | + |
| 163 | +### Not suitable for: |
| 164 | +- High-throughput data sources (use `DataSourceStreamReader` instead) |
| 165 | +- Large batch sizes that exceed driver memory |
| 166 | +- Sources requiring parallel reads for performance |
| 167 | +- Production workloads with high availability requirements |
| 168 | +- Kafka topics with high message rates |
| 169 | +- Large file streaming |
| 170 | + |
| 171 | +## Implementation Example: File Monitor |
| 172 | + |
| 173 | +```python |
| 174 | +import os |
| 175 | +import json |
| 176 | +from typing import Iterator, Tuple, Dict |
| 177 | +from pyspark.sql.datasource import SimpleDataSourceStreamReader |
| 178 | + |
| 179 | +class FileMonitorStreamReader(SimpleDataSourceStreamReader): |
| 180 | + def __init__(self, path: str): |
| 181 | + self.path = path |
| 182 | + |
| 183 | + def initialOffset(self) -> Dict: |
| 184 | + # Start with empty file list |
| 185 | + return {"processed_files": []} |
| 186 | + |
| 187 | + def read(self, start: Dict) -> Tuple[Iterator[Tuple], Dict]: |
| 188 | + processed = set(start.get("processed_files", [])) |
| 189 | + current_files = set(os.listdir(self.path)) |
| 190 | + new_files = current_files - processed |
| 191 | + |
| 192 | + # Read content from new files |
| 193 | + data = [] |
| 194 | + for filename in new_files: |
| 195 | + filepath = os.path.join(self.path, filename) |
| 196 | + if os.path.isfile(filepath): |
| 197 | + with open(filepath, 'r') as f: |
| 198 | + content = f.read() |
| 199 | + data.append((filename, content)) |
| 200 | + |
| 201 | + # Update offset |
| 202 | + new_offset = {"processed_files": list(current_files)} |
| 203 | + |
| 204 | + return (iter(data), new_offset) |
| 205 | + |
| 206 | + def readBetweenOffsets(self, start: Dict, end: Dict) -> Iterator[Tuple]: |
| 207 | + # For recovery: re-read files that were added between start and end |
| 208 | + start_files = set(start.get("processed_files", [])) |
| 209 | + end_files = set(end.get("processed_files", [])) |
| 210 | + files_to_read = end_files - start_files |
| 211 | + |
| 212 | + data = [] |
| 213 | + for filename in files_to_read: |
| 214 | + filepath = os.path.join(self.path, filename) |
| 215 | + if os.path.exists(filepath): |
| 216 | + with open(filepath, 'r') as f: |
| 217 | + content = f.read() |
| 218 | + data.append((filename, content)) |
| 219 | + |
| 220 | + return iter(data) |
| 221 | +``` |
| 222 | + |
| 223 | +## Performance Considerations |
| 224 | + |
| 225 | +### Memory Management |
| 226 | +- Driver memory should be configured to handle maximum expected batch size |
| 227 | +- Use `spark.driver.memory` and `spark.driver.maxResultSize` appropriately |
| 228 | +- Monitor driver memory usage during streaming |
| 229 | + |
| 230 | +### Optimization Tips |
| 231 | +1. **Batch Size Control**: Implement rate limiting in `read()` method |
| 232 | +2. **Data Compression**: Use efficient serialization formats (Parquet, Arrow) |
| 233 | +3. **Offset Design**: Keep offset structure simple and small |
| 234 | +4. **Caching Strategy**: Clear old cache entries in `commit()` method |
| 235 | +5. **Error Handling**: Implement robust error handling in read methods |
| 236 | + |
| 237 | +### Monitoring |
| 238 | +Key metrics to monitor: |
| 239 | +- Driver memory usage |
| 240 | +- Batch processing time |
| 241 | +- Records per batch |
| 242 | +- Offset checkpoint frequency |
| 243 | +- Cache hit/miss ratio |
| 244 | + |
| 245 | +## Comparison with DataSourceStreamReader |
| 246 | + |
| 247 | +| Feature | SimpleDataSourceStreamReader | DataSourceStreamReader | |
| 248 | +|---------|------------------------------|------------------------| |
| 249 | +| Execution Location | Driver only | Distributed (executors) | |
| 250 | +| Partitioning | Single partition | Multiple partitions | |
| 251 | +| Scalability | Limited | High | |
| 252 | +| Implementation Complexity | Low | High | |
| 253 | +| Memory Requirements | Driver-heavy | Distributed | |
| 254 | +| Suitable Data Volume | < 10MB per batch | Unlimited | |
| 255 | +| Parallelism | None | Configurable | |
| 256 | +| Use Case | Prototyping, small streams | Production, large streams | |
| 257 | + |
| 258 | +## Conclusion |
| 259 | + |
| 260 | +`SimpleDataSourceStreamReader` provides an elegant solution for integrating small-scale streaming data sources with Spark's Structured Streaming engine. By executing entirely on the driver, it simplifies development while maintaining full compatibility with Spark's streaming semantics. However, users must carefully consider the scalability limitations and ensure their use case fits within the architectural constraints of driver-side execution. |
| 261 | + |
| 262 | +For production systems with high throughput requirements, the standard `DataSourceStreamReader` with proper partitioning should be used instead. |
0 commit comments