From f5f2ac96f19ff07a35c96c795dbe9db5b89ae96c Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Thu, 14 Aug 2025 12:47:02 -0700 Subject: [PATCH 1/3] update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c8120f7..ce746c8 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [StockDataSource](pyspark_datasources/stock.py) | `stock` | Batch Read | Read stock data from Alpha Vantage | None | `pip install pyspark-data-sources`
`spark.read.format("stock").option("symbols", "AAPL,GOOGL").option("api_key", "key").load()` | | **Batch Write** | | | | | | | [LanceSink](pyspark_datasources/lance.py) | `lance` | Batch Write | Write data in Lance format | `lance` | `pip install pyspark-data-sources[lance]`
`df.write.format("lance").mode("append").save("/tmp/lance_data")` | +| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Batch Write | Write JSON data to Databricks DBFS | `databricks-sdk` | `pip install pyspark-data-sources[simplejson]`
`df.write.format("simplejson").save("/path/to/output")` | | **Streaming Read** | | | | | | | [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Streaming Read | Read from OpenSky Network. | None | `pip install pyspark-data-sources`
`spark.readStream.format("opensky").option("region", "EUROPE").load()` | | [WeatherDataSource](pyspark_datasources/weather.py) | `weather` | Streaming Read | Fetch weather data from tomorrow.io | None | `pip install pyspark-data-sources`
`spark.readStream.format("weather").option("locations", "[(37.7749, -122.4194)]").option("apikey", "key").load()` | From ca97e9c8dba233b2d7ce34edec90cd5a1d1ff5a6 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Fri, 3 Oct 2025 13:18:16 -0700 Subject: [PATCH 2/3] docs --- docs/simple-stream-reader-architecture.md | 262 ++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 docs/simple-stream-reader-architecture.md diff --git a/docs/simple-stream-reader-architecture.md b/docs/simple-stream-reader-architecture.md new file mode 100644 index 0000000..64b3ca4 --- /dev/null +++ b/docs/simple-stream-reader-architecture.md @@ -0,0 +1,262 @@ +# SimpleDataSourceStreamReader Architecture + +## Overview + +`SimpleDataSourceStreamReader` is a lightweight streaming data source reader in PySpark designed for scenarios with small data volumes and low throughput requirements. Unlike the standard `DataSourceStreamReader`, it executes entirely on the driver node, trading scalability for simplicity. + +## Key Architecture Components + +### Python-Side Components + +#### SimpleDataSourceStreamReader (datasource.py:816-911) +The user-facing API with three core methods: +- `initialOffset()`: Returns the starting position for a new streaming query +- `read(start)`: Reads all available data from a given offset and returns both the data and the next offset +- `readBetweenOffsets(start, end)`: Re-reads data deterministically for failure recovery + +#### _SimpleStreamReaderWrapper (datasource_internal.py) +A private wrapper that implements the prefetch-and-cache pattern: +- Maintains `current_offset` to track reading progress +- Caches prefetched data in memory on the driver +- Converts simple reader interface to standard streaming reader interface + +### Scala-Side Components + +#### PythonMicroBatchStream (PythonMicroBatchStream.scala:31-111) +Manages the micro-batch execution: +- Creates and manages `PythonStreamingSourceRunner` for Python communication +- Stores prefetched data in BlockManager with `PythonStreamBlockId` +- Handles offset management and partition planning + +#### PythonStreamingSourceRunner (PythonStreamingSourceRunner.scala:63-268) +The bridge between JVM and Python: +- Spawns a Python worker process running `python_streaming_source_runner.py` +- Serializes/deserializes data using Arrow format +- Manages RPC-style communication for method invocations + +## Data Flow and Lifecycle + +### Query Initialization +1. Spark creates `PythonMicroBatchStream` when a streaming query starts +2. `PythonStreamingSourceRunner` spawns a Python worker process +3. Python worker instantiates the `SimpleDataSourceStreamReader` +4. Initial offset is obtained via `initialOffset()` call + +### Micro-batch Execution (per trigger) + +#### 1. Offset Discovery (Driver) +- Spark calls `latestOffset()` on `PythonMicroBatchStream` +- Runner invokes Python's `latestOffset()` via RPC +- Wrapper calls `simple_reader.read(current_offset)` to prefetch data +- Data and new offset are returned and cached + +#### 2. Data Caching (Driver) +- Prefetched records are converted to Arrow batches +- Data is stored in BlockManager with a unique `PythonStreamBlockId` +- Cache entry maintains mapping of (start_offset, end_offset) → data + +#### 3. Partition Planning (Driver) +- `planInputPartitions(start, end)` creates a single `PythonStreamingInputPartition` +- Partition references the cached block ID +- No actual data distribution happens (single partition on driver) + +#### 4. Data Reading (Executor) +- Executor retrieves cached data from BlockManager using block ID +- Data is already in Arrow format for efficient processing +- Records are converted to internal rows for downstream processing + +## Integration with Spark Structured Streaming APIs + +### User API Integration + +```python +# User defines a SimpleDataSourceStreamReader +class MyStreamReader(SimpleDataSourceStreamReader): + def initialOffset(self): + return {"position": 0} + + def read(self, start): + # Read data from source + data = fetch_data_since(start["position"]) + new_offset = {"position": start["position"] + len(data)} + return (iter(data), new_offset) + + def readBetweenOffsets(self, start, end): + # Re-read for failure recovery + return fetch_data_between(start["position"], end["position"]) + +# Register and use with Spark +class MyDataSource(DataSource): + def simpleStreamReader(self, schema): + return MyStreamReader() + +spark.dataSource.register(MyDataSource) +df = spark.readStream.format("my_source").load() +query = df.writeStream.format("console").start() +``` + +### Streaming Engine Integration + +1. **Trigger Processing**: Works with all trigger modes (ProcessingTime, Once, AvailableNow) +2. **Offset Management**: Offsets are checkpointed to WAL for exactly-once semantics +3. **Failure Recovery**: Uses `readBetweenOffsets()` to replay uncommitted batches +4. **Commit Protocol**: After successful batch, `commit(offset)` is called for cleanup + +## Execution Flow Diagram + +``` +Driver Node Python Worker Executors +----------- ------------- --------- +PythonMicroBatchStream + | + ├─> latestOffset() ──────────> PythonStreamingSourceRunner + | | + | ├─> RPC: LATEST_OFFSET ──> SimpleStreamReaderWrapper + | | | + | | ├─> read(current_offset) + | | | └─> (data, new_offset) + | | | + | |<── Arrow batches ────────────┘ + | | + ├─> Cache in BlockManager <───────────┘ + | (PythonStreamBlockId) + | + ├─> planInputPartitions() + | └─> Single partition with BlockId + | + └─> createReaderFactory() ─────────────────────────────────────> Read from BlockManager + | + └─> Process records +``` + +## Key Design Decisions and Trade-offs + +### Advantages +- **Simplicity**: No need to implement partitioning logic +- **Consistency**: All data reading happens in one place (driver) +- **Efficiency for small data**: Avoids overhead of distributed execution +- **Easy offset management**: Single reader maintains consistent view of progress +- **Quick development**: Minimal boilerplate for simple streaming sources + +### Limitations +- **Not scalable**: All data flows through driver (bottleneck) +- **Memory constraints**: Driver must cache entire micro-batch +- **Single point of failure**: Driver failure affects data reading +- **Network overhead**: Data must be transferred from driver to executors +- **Throughput ceiling**: Limited by driver's processing capacity + +### Important Note from Source Code +From datasource.py:823-827: +> "Because SimpleDataSourceStreamReader read records in Spark driver node to determine end offset of each batch without partitioning, it is only supposed to be used in lightweight use cases where input rate and batch size is small." + +## Use Cases + +### Ideal for: +- Configuration change streams +- Small lookup table updates +- Low-volume event streams (< 1000 records/sec) +- Prototyping and testing streaming applications +- REST API polling with low frequency +- File system monitoring for small files +- Message queue consumers with low throughput + +### Not suitable for: +- High-throughput data sources (use `DataSourceStreamReader` instead) +- Large batch sizes that exceed driver memory +- Sources requiring parallel reads for performance +- Production workloads with high availability requirements +- Kafka topics with high message rates +- Large file streaming + +## Implementation Example: File Monitor + +```python +import os +import json +from typing import Iterator, Tuple, Dict +from pyspark.sql.datasource import SimpleDataSourceStreamReader + +class FileMonitorStreamReader(SimpleDataSourceStreamReader): + def __init__(self, path: str): + self.path = path + + def initialOffset(self) -> Dict: + # Start with empty file list + return {"processed_files": []} + + def read(self, start: Dict) -> Tuple[Iterator[Tuple], Dict]: + processed = set(start.get("processed_files", [])) + current_files = set(os.listdir(self.path)) + new_files = current_files - processed + + # Read content from new files + data = [] + for filename in new_files: + filepath = os.path.join(self.path, filename) + if os.path.isfile(filepath): + with open(filepath, 'r') as f: + content = f.read() + data.append((filename, content)) + + # Update offset + new_offset = {"processed_files": list(current_files)} + + return (iter(data), new_offset) + + def readBetweenOffsets(self, start: Dict, end: Dict) -> Iterator[Tuple]: + # For recovery: re-read files that were added between start and end + start_files = set(start.get("processed_files", [])) + end_files = set(end.get("processed_files", [])) + files_to_read = end_files - start_files + + data = [] + for filename in files_to_read: + filepath = os.path.join(self.path, filename) + if os.path.exists(filepath): + with open(filepath, 'r') as f: + content = f.read() + data.append((filename, content)) + + return iter(data) +``` + +## Performance Considerations + +### Memory Management +- Driver memory should be configured to handle maximum expected batch size +- Use `spark.driver.memory` and `spark.driver.maxResultSize` appropriately +- Monitor driver memory usage during streaming + +### Optimization Tips +1. **Batch Size Control**: Implement rate limiting in `read()` method +2. **Data Compression**: Use efficient serialization formats (Parquet, Arrow) +3. **Offset Design**: Keep offset structure simple and small +4. **Caching Strategy**: Clear old cache entries in `commit()` method +5. **Error Handling**: Implement robust error handling in read methods + +### Monitoring +Key metrics to monitor: +- Driver memory usage +- Batch processing time +- Records per batch +- Offset checkpoint frequency +- Cache hit/miss ratio + +## Comparison with DataSourceStreamReader + +| Feature | SimpleDataSourceStreamReader | DataSourceStreamReader | +|---------|------------------------------|------------------------| +| Execution Location | Driver only | Distributed (executors) | +| Partitioning | Single partition | Multiple partitions | +| Scalability | Limited | High | +| Implementation Complexity | Low | High | +| Memory Requirements | Driver-heavy | Distributed | +| Suitable Data Volume | < 10MB per batch | Unlimited | +| Parallelism | None | Configurable | +| Use Case | Prototyping, small streams | Production, large streams | + +## Conclusion + +`SimpleDataSourceStreamReader` provides an elegant solution for integrating small-scale streaming data sources with Spark's Structured Streaming engine. By executing entirely on the driver, it simplifies development while maintaining full compatibility with Spark's streaming semantics. However, users must carefully consider the scalability limitations and ensure their use case fits within the architectural constraints of driver-side execution. + +For production systems with high throughput requirements, the standard `DataSourceStreamReader` with proper partitioning should be used instead. \ No newline at end of file From 97a3600cbb991547ef40bbd9a627c4044f483e3d Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Fri, 3 Oct 2025 13:33:43 -0700 Subject: [PATCH 3/3] update --- README.md | 1 - docs/index.md | 1 - docs/simple-stream-reader-architecture.md | 8 ++++---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ce746c8..c8120f7 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,6 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [StockDataSource](pyspark_datasources/stock.py) | `stock` | Batch Read | Read stock data from Alpha Vantage | None | `pip install pyspark-data-sources`
`spark.read.format("stock").option("symbols", "AAPL,GOOGL").option("api_key", "key").load()` | | **Batch Write** | | | | | | | [LanceSink](pyspark_datasources/lance.py) | `lance` | Batch Write | Write data in Lance format | `lance` | `pip install pyspark-data-sources[lance]`
`df.write.format("lance").mode("append").save("/tmp/lance_data")` | -| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Batch Write | Write JSON data to Databricks DBFS | `databricks-sdk` | `pip install pyspark-data-sources[simplejson]`
`df.write.format("simplejson").save("/path/to/output")` | | **Streaming Read** | | | | | | | [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Streaming Read | Read from OpenSky Network. | None | `pip install pyspark-data-sources`
`spark.readStream.format("opensky").option("region", "EUROPE").load()` | | [WeatherDataSource](pyspark_datasources/weather.py) | `weather` | Streaming Read | Fetch weather data from tomorrow.io | None | `pip install pyspark-data-sources`
`spark.readStream.format("weather").option("locations", "[(37.7749, -122.4194)]").option("apikey", "key").load()` | diff --git a/docs/index.md b/docs/index.md index ee7983b..ed53cd7 100644 --- a/docs/index.md +++ b/docs/index.md @@ -37,7 +37,6 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [FakeDataSource](./datasources/fake.md) | `fake` | Generate fake data using the `Faker` library | `faker` | | [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` | | [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None | -| [SimpleJsonDataSource](./datasources/simplejson.md) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` | | [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | | [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | | [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | diff --git a/docs/simple-stream-reader-architecture.md b/docs/simple-stream-reader-architecture.md index 64b3ca4..10a6845 100644 --- a/docs/simple-stream-reader-architecture.md +++ b/docs/simple-stream-reader-architecture.md @@ -8,7 +8,7 @@ ### Python-Side Components -#### SimpleDataSourceStreamReader (datasource.py:816-911) +#### SimpleDataSourceStreamReader (datasource.py) The user-facing API with three core methods: - `initialOffset()`: Returns the starting position for a new streaming query - `read(start)`: Reads all available data from a given offset and returns both the data and the next offset @@ -22,13 +22,13 @@ A private wrapper that implements the prefetch-and-cache pattern: ### Scala-Side Components -#### PythonMicroBatchStream (PythonMicroBatchStream.scala:31-111) +#### PythonMicroBatchStream (PythonMicroBatchStream.scala) Manages the micro-batch execution: - Creates and manages `PythonStreamingSourceRunner` for Python communication - Stores prefetched data in BlockManager with `PythonStreamBlockId` - Handles offset management and partition planning -#### PythonStreamingSourceRunner (PythonStreamingSourceRunner.scala:63-268) +#### PythonStreamingSourceRunner (PythonStreamingSourceRunner.scala) The bridge between JVM and Python: - Spawns a Python worker process running `python_streaming_source_runner.py` - Serializes/deserializes data using Arrow format @@ -146,7 +146,7 @@ PythonMicroBatchStream - **Throughput ceiling**: Limited by driver's processing capacity ### Important Note from Source Code -From datasource.py:823-827: +From datasource.py: > "Because SimpleDataSourceStreamReader read records in Spark driver node to determine end offset of each batch without partitioning, it is only supposed to be used in lightweight use cases where input rate and batch size is small." ## Use Cases