Skip to content

Commit 8985da2

Browse files
Add PyArrow data source (#14)
* add CLAUDE.md * add arrow data source * update
1 parent 85d9aa1 commit 8985da2

File tree

5 files changed

+486
-0
lines changed

5 files changed

+486
-0
lines changed

CLAUDE.md

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
# PySpark Data Sources - Project Context for Claude
2+
3+
## Project Overview
4+
This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services.
5+
6+
**Important**: This is a demo/educational project and not intended for production use.
7+
8+
## Tech Stack
9+
- **Language**: Python (3.9-3.12)
10+
- **Framework**: Apache Spark 4.0+ (PySpark)
11+
- **Package Management**: Poetry
12+
- **Documentation**: MkDocs with Material theme
13+
- **Testing**: pytest
14+
- **Dependencies**: PyArrow, requests, faker, and optional extras
15+
16+
## Project Structure
17+
```
18+
pyspark_datasources/
19+
├── __init__.py # Main package exports
20+
├── fake.py # Fake data generator using Faker
21+
├── github.py # GitHub repository data connector
22+
├── googlesheets.py # Public Google Sheets reader
23+
├── huggingface.py # Hugging Face datasets connector
24+
├── kaggle.py # Kaggle datasets connector
25+
├── lance.py # Lance vector database connector
26+
├── opensky.py # OpenSky flight data connector
27+
├── simplejson.py # JSON writer for Databricks DBFS
28+
├── stock.py # Alpha Vantage stock data reader
29+
└── weather.py # Weather data connector
30+
```
31+
32+
## Available Data Sources
33+
| Short Name | File | Description | Dependencies |
34+
|---------------|------|-------------|--------------|
35+
| `fake` | fake.py | Generate fake data using Faker | faker |
36+
| `github` | github.py | Read GitHub repository PRs | None |
37+
| `googlesheets`| googlesheets.py | Read public Google Sheets | None |
38+
| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets |
39+
| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas |
40+
| `opensky` | opensky.py | Flight data from OpenSky Network | None |
41+
| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk |
42+
| `stock` | stock.py | Stock data from Alpha Vantage | None |
43+
44+
## Development Commands
45+
46+
### Environment Setup
47+
```bash
48+
poetry install # Install dependencies
49+
poetry install --extras all # Install all optional dependencies
50+
poetry shell # Activate virtual environment
51+
```
52+
53+
### Testing
54+
```bash
55+
# Note: On macOS, set this environment variable to avoid fork safety issues
56+
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
57+
58+
pytest # Run all tests
59+
pytest tests/test_data_sources.py # Run specific test file
60+
pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test
61+
```
62+
63+
### Documentation
64+
```bash
65+
mkdocs serve # Start local docs server
66+
mkdocs build # Build static documentation
67+
```
68+
69+
### Package Management
70+
Please refer to RELEASE.md for more details.
71+
```bash
72+
poetry build # Build package
73+
poetry publish # Publish to PyPI (requires auth)
74+
poetry add <package> # Add new dependency
75+
poetry update # Update dependencies
76+
```
77+
78+
## Usage Patterns
79+
All data sources follow the Spark Data Source API pattern:
80+
81+
```python
82+
from pyspark_datasources import FakeDataSource
83+
84+
# Register the data source
85+
spark.dataSource.register(FakeDataSource)
86+
87+
# Batch reading
88+
df = spark.read.format("fake").option("numRows", 100).load()
89+
90+
# Streaming (where supported)
91+
stream = spark.readStream.format("fake").load()
92+
```
93+
94+
## Testing Strategy
95+
- Tests use pytest with PySpark session fixtures
96+
- Each data source has basic functionality tests
97+
- Tests verify data reading and schema validation
98+
- Some tests may require external API access
99+
100+
## Key Implementation Details
101+
- All data sources inherit from Spark's DataSource base class
102+
- Implements reader() method for batch reading
103+
- Some implement streamReader() for streaming
104+
- Schema is defined using PySpark StructType
105+
- Options are passed via Spark's option() method
106+
107+
## External Dependencies
108+
- **GitHub API**: Uses public API, no auth required
109+
- **Alpha Vantage**: Stock data API (may require API key)
110+
- **Google Sheets**: Public sheets only, no auth
111+
- **Kaggle**: Requires Kaggle API credentials
112+
- **Databricks**: SDK for DBFS access
113+
- **OpenSky**: Public flight data API
114+
115+
## Common Issues
116+
- Ensure PySpark >= 4.0.0 is installed
117+
- Some data sources require API keys/credentials
118+
- Network connectivity required for external APIs
119+
- Rate limiting may affect some external services
120+
121+
## Python Data Source API Specification
122+
123+
### Core Abstract Base Classes
124+
125+
#### DataSource
126+
Primary abstract base class for custom data sources supporting read/write operations.
127+
128+
**Key Methods:**
129+
- `__init__(self, options: Dict[str, str])` - Initialize with user options
130+
- `name() -> str` - Return format name (defaults to class name)
131+
- `schema() -> StructType` - Define data source schema
132+
- `reader(schema: StructType) -> DataSourceReader` - Create batch reader
133+
- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer
134+
- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader
135+
- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer
136+
- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader
137+
138+
#### DataSourceReader
139+
Abstract base class for reading data from sources.
140+
141+
**Key Methods:**
142+
- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch
143+
- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading
144+
145+
#### DataSourceStreamReader
146+
Abstract base class for streaming data sources with offset management.
147+
148+
**Key Methods:**
149+
- `initialOffset() -> Offset` - Return starting offset
150+
- `latestOffset() -> Offset` - Return latest available offset
151+
- `partitions(start: Offset, end: Offset) -> List[InputPartition]` - Get partitions for offset range
152+
- `read(partition) -> Iterator` - Read data from partition
153+
- `commit(end: Offset)` - Mark offsets as processed
154+
- `stop()` - Clean up resources
155+
156+
#### DataSourceWriter
157+
Abstract base class for writing data to external sources.
158+
159+
**Key Methods:**
160+
- `write(iterator) -> WriteResult` - Write data from iterator
161+
- `abort(messages: List[WriterCommitMessage])` - Handle write failures
162+
- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes
163+
164+
#### DataSourceArrowWriter
165+
Optimized writer using PyArrow RecordBatch for improved performance.
166+
167+
### Implementation Requirements
168+
169+
1. **Serialization**: All classes must be pickle serializable
170+
2. **Schema Definition**: Use PySpark StructType for schema specification
171+
3. **Data Types**: Support standard Spark SQL data types
172+
4. **Error Handling**: Implement proper exception handling
173+
5. **Resource Management**: Clean up resources properly in streaming sources
174+
6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")`
175+
176+
### Usage Patterns
177+
178+
```python
179+
# Custom data source implementation
180+
class MyDataSource(DataSource):
181+
def __init__(self, options):
182+
self.options = options
183+
184+
def name(self):
185+
return "myformat"
186+
187+
def schema(self):
188+
return StructType([StructField("id", IntegerType(), True)])
189+
190+
def reader(self, schema):
191+
return MyDataSourceReader(self.options, schema)
192+
193+
# Registration and usage
194+
spark.dataSource.register(MyDataSource)
195+
df = spark.read.format("myformat").option("key", "value").load()
196+
```
197+
198+
### Performance Optimizations
199+
200+
1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization
201+
2. **Partitioning**: Implement `partitions()` for parallel processing
202+
3. **Lazy Evaluation**: Defer expensive operations until read time
203+
204+
## Documentation
205+
- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/
206+
- Individual data source docs in `docs/datasources/`
207+
- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html
208+
- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py
209+
210+
### Data Source Docstring Guidelines
211+
When creating new data sources, include these sections in the class docstring:
212+
213+
**Required Sections:**
214+
- Brief description and `Name: "format_name"`
215+
- `Options` section documenting all parameters with types/defaults
216+
- `Examples` section with registration and basic usage
217+
218+
**Key Guidelines:**
219+
- **Include schema output**: Show `df.printSchema()` results for clarity
220+
- **Document error cases**: Show what happens with missing files/invalid options
221+
- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance
222+
- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data

RELEASE.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,21 @@ Then follow steps 2-4 above.
119119
- Authenticate: `gh auth login`
120120
- Check repository access: `gh repo view`
121121

122+
### PyArrow Compatibility Issues
123+
124+
If you see `objc_initializeAfterForkError` crashes on macOS, set this environment variable:
125+
126+
```bash
127+
# For single commands
128+
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES python your_script.py
129+
130+
# For Poetry environment
131+
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES poetry run python your_script.py
132+
133+
# To set permanently in your shell (add to ~/.zshrc or ~/.bash_profile):
134+
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
135+
```
136+
122137
## Useful Poetry Version Commands
123138

124139
```bash

pyspark_datasources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .arrow import ArrowDataSource
12
from .fake import FakeDataSource
23
from .github import GithubDataSource
34
from .googlesheets import GoogleSheetsDataSource

0 commit comments

Comments
 (0)