From d62843e963b044cc3f74cd6a736a12d7b7c6ad8d Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Wed, 22 Oct 2025 19:53:25 -0400 Subject: [PATCH] update --- CLAUDE.md | 248 --------- README.md | 192 ++++--- contributing/DEVELOPMENT.md | 373 ++++++++++++++ RELEASE.md => contributing/RELEASE.md | 0 docs/api-reference.md | 557 +++++++++++++++++++++ docs/building-data-sources.md | 692 ++++++++++++++++++++++++++ docs/data-sources-guide.md | 498 ++++++++++++++++++ docs/datasources/arrow.md | 6 - docs/datasources/fake.md | 6 - docs/datasources/github.md | 3 - docs/datasources/googlesheets.md | 3 - docs/datasources/huggingface.md | 5 - docs/datasources/jsonplaceholder.md | 3 - docs/datasources/kaggle.md | 5 - docs/datasources/lance.md | 6 - docs/datasources/opensky.md | 5 - docs/datasources/robinhood.md | 6 - docs/datasources/salesforce.md | 6 - docs/datasources/simplejson.md | 3 - docs/datasources/stock.md | 3 - docs/datasources/weather.md | 5 - docs/index.md | 45 -- mkdocs.yml | 44 -- 23 files changed, 2229 insertions(+), 485 deletions(-) delete mode 100644 CLAUDE.md create mode 100644 contributing/DEVELOPMENT.md rename RELEASE.md => contributing/RELEASE.md (100%) create mode 100644 docs/api-reference.md create mode 100644 docs/building-data-sources.md create mode 100644 docs/data-sources-guide.md delete mode 100644 docs/datasources/arrow.md delete mode 100644 docs/datasources/fake.md delete mode 100644 docs/datasources/github.md delete mode 100644 docs/datasources/googlesheets.md delete mode 100644 docs/datasources/huggingface.md delete mode 100644 docs/datasources/jsonplaceholder.md delete mode 100644 docs/datasources/kaggle.md delete mode 100644 docs/datasources/lance.md delete mode 100644 docs/datasources/opensky.md delete mode 100644 docs/datasources/robinhood.md delete mode 100644 docs/datasources/salesforce.md delete mode 100644 docs/datasources/simplejson.md delete mode 100644 docs/datasources/stock.md delete mode 100644 docs/datasources/weather.md delete mode 100644 docs/index.md delete mode 100644 mkdocs.yml diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index b6bad61..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,248 +0,0 @@ -# PySpark Data Sources - Project Context for Claude - -## Project Overview -This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services. - -**Important**: This is a demo/educational project and not intended for production use. - -## Tech Stack -- **Language**: Python (3.9-3.12) -- **Framework**: Apache Spark 4.0+ (PySpark) -- **Package Management**: Poetry -- **Documentation**: MkDocs with Material theme -- **Testing**: pytest -- **Dependencies**: PyArrow, requests, faker, and optional extras - -## Project Structure -``` -pyspark_datasources/ -├── __init__.py # Main package exports -├── fake.py # Fake data generator using Faker -├── github.py # GitHub repository data connector -├── googlesheets.py # Public Google Sheets reader -├── huggingface.py # Hugging Face datasets connector -├── kaggle.py # Kaggle datasets connector -├── lance.py # Lance vector database connector -├── opensky.py # OpenSky flight data connector -├── simplejson.py # JSON writer for Databricks DBFS -├── stock.py # Alpha Vantage stock data reader -└── weather.py # Weather data connector -``` - -## Available Data Sources -| Short Name | File | Description | Dependencies | -|---------------|------|-------------|--------------| -| `fake` | fake.py | Generate fake data using Faker | faker | -| `github` | github.py | Read GitHub repository PRs | None | -| `googlesheets`| googlesheets.py | Read public Google Sheets | None | -| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets | -| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas | -| `opensky` | opensky.py | Flight data from OpenSky Network | None | -| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk | -| `stock` | stock.py | Stock data from Alpha Vantage | None | - -## Development Commands - -### Environment Setup -```bash -poetry install # Install dependencies -poetry install --extras all # Install all optional dependencies -poetry shell # Activate virtual environment -``` - -### Testing -```bash -# Note: On macOS, set this environment variable to avoid fork safety issues -export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES - -pytest # Run all tests -pytest tests/test_data_sources.py # Run specific test file -pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test -``` - -### Documentation -```bash -mkdocs serve # Start local docs server -mkdocs build # Build static documentation -``` - -### Code Formatting -This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting. - -```bash -poetry run ruff format . # Format code -poetry run ruff check . # Run linter -poetry run ruff check . --fix # Run linter with auto-fix -``` - -### Package Management -Please refer to RELEASE.md for more details. -```bash -poetry build # Build package -poetry publish # Publish to PyPI (requires auth) -poetry add # Add new dependency -poetry update # Update dependencies -``` - -## Usage Patterns -All data sources follow the Spark Data Source API pattern: - -```python -from pyspark_datasources import FakeDataSource - -# Register the data source -spark.dataSource.register(FakeDataSource) - -# Batch reading -df = spark.read.format("fake").option("numRows", 100).load() - -# Streaming (where supported) -stream = spark.readStream.format("fake").load() -``` - -## Testing Strategy -- Tests use pytest with PySpark session fixtures -- Each data source has basic functionality tests -- Tests verify data reading and schema validation -- Some tests may require external API access - -## Key Implementation Details -- All data sources inherit from Spark's DataSource base class -- Implements reader() method for batch reading -- Some implement streamReader() for streaming -- Schema is defined using PySpark StructType -- Options are passed via Spark's option() method - -## External Dependencies -- **GitHub API**: Uses public API, no auth required -- **Alpha Vantage**: Stock data API (may require API key) -- **Google Sheets**: Public sheets only, no auth -- **Kaggle**: Requires Kaggle API credentials -- **Databricks**: SDK for DBFS access -- **OpenSky**: Public flight data API - -## Common Issues -- Ensure PySpark >= 4.0.0 is installed -- Some data sources require API keys/credentials -- Network connectivity required for external APIs -- Rate limiting may affect some external services - -## Python Data Source API Specification - -### Core Abstract Base Classes - -#### DataSource -Primary abstract base class for custom data sources supporting read/write operations. - -**Key Methods:** -- `__init__(self, options: Dict[str, str])` - Initialize with user options (Optional; base class provides default) -- `name() -> str` - Return format name (Optional to override; defaults to class name) -- `schema() -> StructType` - Define data source schema (Required) -- `reader(schema: StructType) -> DataSourceReader` - Create batch reader (Required if batch read is supported) -- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer (Required if batch write is supported) -- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader (Required if streaming read is supported and `simpleStreamReader` is not implemented) -- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer (Required if streaming write is supported) -- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader (Required if streaming read is supported and `streamReader` is not implemented) - -#### DataSourceReader -Abstract base class for reading data from sources. - -**Key Methods:** -- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch (Required) -- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading (Optional; defaults to a single partition) - -#### DataSourceStreamReader -Abstract base class for streaming data sources with offset management. - -**Key Methods:** -- `initialOffset() -> dict` - Return starting offset (Required) -- `latestOffset() -> dict` - Return latest available offset (Required) -- `partitions(start: dict, end: dict) -> List[InputPartition]` - Get partitions for offset range (Required) -- `read(partition) -> Iterator` - Read data from partition (Required) -- `commit(end: dict) -> None` - Mark offsets as processed (Optional) -- `stop() -> None` - Clean up resources (Optional) - -#### SimpleDataSourceStreamReader -Simplified streaming reader interface without partition planning. - -**Key Methods:** -- `initialOffset() -> dict` - Return starting offset (Required) -- `read(start: dict) -> Tuple[Iterator, dict]` - Read from start offset; return an iterator and the next start offset (Required) -- `readBetweenOffsets(start: dict, end: dict) -> Iterator` - Deterministic replay between offsets for recovery (Optional; recommended for reliable recovery) -- `commit(end: dict) -> None` - Mark offsets as processed (Optional) - -#### DataSourceWriter -Abstract base class for writing data to external sources. - -**Key Methods:** -- `write(iterator) -> WriteResult` - Write data from iterator (Required) -- `abort(messages: List[WriterCommitMessage])` - Handle write failures (Optional) -- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes (Required) - -#### DataSourceStreamWriter -Abstract base class for writing data to external sinks in streaming queries. - -**Key Methods:** -- `write(iterator) -> WriterCommitMessage` - Write data for a partition and return a commit message (Required) -- `commit(messages: List[WriterCommitMessage], batchId: int) -> None` - Commit successful microbatch writes (Required) -- `abort(messages: List[WriterCommitMessage], batchId: int) -> None` - Handle write failures for a microbatch (Optional) - -#### DataSourceArrowWriter -Optimized writer using PyArrow RecordBatch for improved performance. - -### Implementation Requirements - -1. **Serialization**: All classes must be pickle serializable -2. **Schema Definition**: Use PySpark StructType for schema specification -3. **Data Types**: Support standard Spark SQL data types -4. **Error Handling**: Implement proper exception handling -5. **Resource Management**: Clean up resources properly in streaming sources -6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")` - -### Usage Patterns - -```python -# Custom data source implementation -class MyDataSource(DataSource): - def __init__(self, options): - self.options = options - - def name(self): - return "myformat" - - def schema(self): - return StructType([StructField("id", IntegerType(), True)]) - - def reader(self, schema): - return MyDataSourceReader(self.options, schema) - -# Registration and usage -spark.dataSource.register(MyDataSource) -df = spark.read.format("myformat").option("key", "value").load() -``` - -### Performance Optimizations - -1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization -2. **Partitioning**: Implement `partitions()` for parallel processing -3. **Lazy Evaluation**: Defer expensive operations until read time - -## Documentation -- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/ -- Individual data source docs in `docs/datasources/` -- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html -- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py - -### Data Source Docstring Guidelines -When creating new data sources, include these sections in the class docstring: - -**Required Sections:** -- Brief description and `Name: "format_name"` -- `Options` section documenting all parameters with types/defaults -- `Examples` section with registration and basic usage - -**Key Guidelines:** -- **Include schema output**: Show `df.printSchema()` results for clarity -- **Document error cases**: Show what happens with missing files/invalid options -- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance -- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data diff --git a/README.md b/README.md index c8120f7..fe790fb 100644 --- a/README.md +++ b/README.md @@ -3,122 +3,148 @@ [![pypi](https://img.shields.io/pypi/v/pyspark-data-sources.svg?color=blue)](https://pypi.org/project/pyspark-data-sources/) [![code style: ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) -This repository showcases custom Spark data sources built using the new [**Python Data Source API**](https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html) introduced in Apache Spark 4.0. -For an in-depth understanding of the API, please refer to the [API source code](https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py). -Note this repo is demo only and please be aware that it is not intended for production use. -Contributions and feedback are welcome to help improve the examples. +Custom Apache Spark data sources using the [Python Data Source API](https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html) (Spark 4.0+). Learn by example and build your own data sources. +## Quick Start -## Installation -``` +### Installation + +```bash pip install pyspark-data-sources -``` -## Usage -Make sure you have pyspark >= 4.0.0 installed. +# Install with specific extras +pip install pyspark-data-sources[faker] # For FakeDataSource -``` -pip install pyspark +pip install pyspark-data-sources[all] # All optional dependencies ``` -Or use [Databricks Runtime 15.4 LTS](https://docs.databricks.com/aws/en/release-notes/runtime/15.4lts) or above versions, or [Databricks Serverless](https://docs.databricks.com/aws/en/compute/serverless/). +### Requirements +- Apache Spark 4.0+ or [Databricks Runtime 15.4 LTS](https://docs.databricks.com/aws/en/release-notes/runtime/15.4lts)+ +- Python 3.9-3.12 +### Basic Usage ```python -from pyspark_datasources.fake import FakeDataSource +from pyspark.sql import SparkSession +from pyspark_datasources import FakeDataSource + +# Create Spark session +spark = SparkSession.builder.appName("datasource-demo").getOrCreate() # Register the data source spark.dataSource.register(FakeDataSource) -spark.read.format("fake").load().show() - -# For streaming data generation -spark.readStream.format("fake").load().writeStream.format("console").start() +# Read batch data +df = spark.read.format("fake").option("numRows", 5).load() +df.show() +# +--------------+----------+-------+------------+ +# | name| date|zipcode| state| +# +--------------+----------+-------+------------+ +# | Pam Mitchell|1988-10-20| 23788| Tennessee| +# |Melissa Turner|1996-06-14| 30851| Nevada| +# | Brian Ramsey|2021-08-21| 55277| Washington| +# | Caitlin Reed|1983-06-22| 89813|Pennsylvania| +# | Douglas James|2007-01-18| 46226| Alabama| +# +--------------+----------+-------+------------+ + +# Stream data +stream = spark.readStream.format("fake").load() +query = stream.writeStream.format("console").start() ``` -## Example Data Sources - -| Data Source | Short Name | Type | Description | Dependencies | Example | -|-------------------------------------------------------------------------|----------------|----------------|-----------------------------------------------|-----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Batch Read** | | | | | | -| [ArrowDataSource](pyspark_datasources/arrow.py) | `arrow` | Batch Read | Read Apache Arrow files (.arrow) | `pyarrow` | `pip install pyspark-data-sources[arrow]`
`spark.read.format("arrow").load("/path/to/file.arrow")` | -| [FakeDataSource](pyspark_datasources/fake.py) | `fake` | Batch/Streaming Read | Generate fake data using the `Faker` library | `faker` | `pip install pyspark-data-sources[fake]`
`spark.read.format("fake").load()` or `spark.readStream.format("fake").load()` | -| [GithubDataSource](pyspark_datasources/github.py) | `github` | Batch Read | Read pull requests from a Github repository | None | `pip install pyspark-data-sources`
`spark.read.format("github").load("apache/spark")` | -| [GoogleSheetsDataSource](pyspark_datasources/googlesheets.py) | `googlesheets` | Batch Read | Read table from public Google Sheets | None | `pip install pyspark-data-sources`
`spark.read.format("googlesheets").load("https://docs.google.com/spreadsheets/d/...")` | -| [HuggingFaceDatasets](pyspark_datasources/huggingface.py) | `huggingface` | Batch Read | Read datasets from HuggingFace Hub | `datasets` | `pip install pyspark-data-sources[huggingface]`
`spark.read.format("huggingface").load("imdb")` | -| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Batch Read | Read datasets from Kaggle | `kagglehub`, `pandas` | `pip install pyspark-data-sources[kaggle]`
`spark.read.format("kaggle").load("titanic")` | -| [StockDataSource](pyspark_datasources/stock.py) | `stock` | Batch Read | Read stock data from Alpha Vantage | None | `pip install pyspark-data-sources`
`spark.read.format("stock").option("symbols", "AAPL,GOOGL").option("api_key", "key").load()` | -| **Batch Write** | | | | | | -| [LanceSink](pyspark_datasources/lance.py) | `lance` | Batch Write | Write data in Lance format | `lance` | `pip install pyspark-data-sources[lance]`
`df.write.format("lance").mode("append").save("/tmp/lance_data")` | -| **Streaming Read** | | | | | | -| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Streaming Read | Read from OpenSky Network. | None | `pip install pyspark-data-sources`
`spark.readStream.format("opensky").option("region", "EUROPE").load()` | -| [WeatherDataSource](pyspark_datasources/weather.py) | `weather` | Streaming Read | Fetch weather data from tomorrow.io | None | `pip install pyspark-data-sources`
`spark.readStream.format("weather").option("locations", "[(37.7749, -122.4194)]").option("apikey", "key").load()` | -| **Streaming Write** | | | | | | -| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `pyspark.datasource.salesforce` | Streaming Write | Streaming datasource for writing data to Salesforce | `simple-salesforce` | `pip install pyspark-data-sources[salesforce]`
`df.writeStream.format("pyspark.datasource.salesforce").option("username", "user").start()` | +## Available Data Sources -See more here: https://allisonwang-db.github.io/pyspark-data-sources/. +| Data Source | Type | Description | Install | +|-------------|------|-------------|---------| +| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` | +| `github` | Batch | Read GitHub pull requests | Built-in | +| `googlesheets` | Batch | Read public Google Sheets | Built-in | +| `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` | +| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in | +| `opensky` | Batch/Stream | Live flight tracking data | Built-in | +| `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` | +| `arrow` | Batch | Read Apache Arrow files | `[arrow]` | +| `lance` | Batch Write | Write Lance vector format | `[lance]` | -## Official Data Sources +📚 **[See detailed examples for all data sources →](docs/data-sources-guide.md)** -For production use, consider these official data source implementations built with the Python Data Source API: +## Example: Generate Fake Data -| Data Source | Repository | Description | Features | -|--------------------------|-----------------------------------------------------------------------------------------------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------| -| **HuggingFace Datasets** | [@huggingface/pyspark_huggingface](https://github.com/huggingface/pyspark_huggingface) | Production-ready Spark Data Source for 🤗 Hugging Face Datasets | • Stream datasets as Spark DataFrames
• Select subsets/splits with filters
• Authentication support
• Save DataFrames to Hugging Face
| +```python +from pyspark_datasources import FakeDataSource -## Data Source Naming Convention +spark.dataSource.register(FakeDataSource) -When creating custom data sources using the Python Data Source API, follow these naming conventions for the `short_name` parameter: +# Generate synthetic data with custom schema +df = spark.read.format("fake") \ + .schema("name string, email string, company string") \ + .option("numRows", 5) \ + .load() + +df.show(truncate=False) +# +------------------+-------------------------+-----------------+ +# |name |email |company | +# +------------------+-------------------------+-----------------+ +# |Christine Sampson |johnsonjeremy@example.com|Hernandez-Nguyen | +# |Yolanda Brown |williamlowe@example.net |Miller-Hernandez | +# +------------------+-------------------------+-----------------+ +``` -### Recommended Approach -- **Use the system name directly**: Use lowercase system names like `huggingface`, `opensky`, `googlesheets`, etc. -- This provides clear, intuitive naming that matches the service being integrated +## Building Your Own Data Source -### Conflict Resolution -- **If there's a naming conflict**: Use the format `pyspark.datasource.` -- Example: `pyspark.datasource.salesforce` if "salesforce" conflicts with existing naming +Here's a minimal example to get started: -### Examples from this repository: ```python -# Direct system naming (preferred) -spark.read.format("github").load() # GithubDataSource -spark.read.format("googlesheets").load() # GoogleSheetsDataSource -spark.read.format("opensky").load() # OpenSkyDataSource - -# Namespaced format (when conflicts exist) -spark.read.format("pyspark.datasource.opensky").load() +from pyspark.sql.datasource import DataSource, DataSourceReader +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +class MyCustomDataSource(DataSource): + def name(self): + return "mycustom" + + def schema(self): + return StructType([ + StructField("id", IntegerType()), + StructField("name", StringType()) + ]) + + def reader(self, schema): + return MyCustomReader(self.options, schema) + +class MyCustomReader(DataSourceReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + + def read(self, partition): + # Your data reading logic here + for i in range(10): + yield (i, f"name_{i}") + +# Register and use +spark.dataSource.register(MyCustomDataSource) +df = spark.read.format("mycustom").load() ``` -## Contributing -We welcome and appreciate any contributions to enhance and expand the custom data sources.: +📖 **[Complete guide with advanced patterns →](docs/building-data-sources.md)** -- **Add New Data Sources**: Want to add a new data source using the Python Data Source API? Submit a pull request or open an issue. -- **Suggest Enhancements**: If you have ideas to improve a data source or the API, we'd love to hear them! -- **Report Bugs**: Found something that doesn't work as expected? Let us know by opening an issue. +## Documentation +- 📚 **[Data Sources Guide](docs/data-sources-guide.md)** - Detailed examples for each data source +- 🔧 **[Building Data Sources](docs/building-data-sources.md)** - Complete tutorial with advanced patterns +- 📖 **[API Reference](docs/api-reference.md)** - Full API specification and method signatures +- 💻 **[Development Guide](contributing/DEVELOPMENT.md)** - Contributing and development setup -## Development -### Environment Setup -``` -poetry install -poetry env activate -``` +## Requirements -### Build Docs -``` -mkdocs serve -``` +- Apache Spark 4.0+ or Databricks Runtime 15.4 LTS+ +- Python 3.9-3.12 -### Code Formatting -This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting. +## Contributing -```bash -# Format code -poetry run ruff format . +We welcome contributions! See our [Development Guide](contributing/DEVELOPMENT.md) for details. -# Run linter -poetry run ruff check . +## Resources -# Run linter with auto-fix -poetry run ruff check . --fix -``` +- [Python Data Source API Documentation](https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html) +- [API Source Code](https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py) diff --git a/contributing/DEVELOPMENT.md b/contributing/DEVELOPMENT.md new file mode 100644 index 0000000..c5f2a6c --- /dev/null +++ b/contributing/DEVELOPMENT.md @@ -0,0 +1,373 @@ +# Development Guide + +## Environment Setup + +### Prerequisites +- Python 3.9-3.12 +- Poetry for dependency management +- Apache Spark 4.0+ (or Databricks Runtime 15.4 LTS+) + +### Installation + +```bash +# Clone the repository +git clone https://github.com/allisonwang-db/pyspark-data-sources.git +cd pyspark-data-sources + +# Install dependencies +poetry install + +# Install with all optional dependencies +poetry install --extras all + +# Activate virtual environment +poetry shell +``` + +### macOS Setup +On macOS, you may encounter fork safety issues with PyArrow. Set this environment variable: + +```bash +export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES + +# Add to your shell profile for persistence +echo 'export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES' >> ~/.zshrc +``` + +## Testing + +### Running Tests + +```bash +# Run all tests +pytest + +# Run specific test file +pytest tests/test_data_sources.py + +# Run specific test with verbose output +pytest tests/test_data_sources.py::test_fake_datasource -v + +# Run with coverage +pytest --cov=pyspark_datasources --cov-report=html +``` + +### Writing Tests + +Tests follow this pattern: + +```python +import pytest +from pyspark.sql import SparkSession + +@pytest.fixture +def spark(): + return SparkSession.builder \ + .appName("test") \ + .config("spark.sql.shuffle.partitions", "1") \ + .getOrCreate() + +def test_my_datasource(spark): + from pyspark_datasources import MyDataSource + spark.dataSource.register(MyDataSource) + + df = spark.read.format("myformat").load() + assert df.count() > 0 + assert len(df.columns) == expected_columns +``` + +## Code Quality + +### Formatting with Ruff + +This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting. + +```bash +# Format code +poetry run ruff format . + +# Run linter +poetry run ruff check . + +# Run linter with auto-fix +poetry run ruff check . --fix + +# Check specific file +poetry run ruff check pyspark_datasources/fake.py +``` + +### Pre-commit Hooks (Optional) + +```bash +# Install pre-commit hooks +poetry add --group dev pre-commit +pre-commit install + +# Run manually +pre-commit run --all-files +``` + +## Documentation + +### Building Documentation + +The project previously used MkDocs for documentation. Documentation now lives primarily in: +- README.md - Main documentation +- Docstrings in source code +- Contributing guides in /contributing + +### Writing Docstrings + +Follow this pattern for data source docstrings: + +```python +class MyDataSource(DataSource): + """ + Brief description of the data source. + + Longer description explaining what it does and any important details. + + Name: `myformat` + + Options + ------- + option1 : str, optional + Description of option1 (default: "value") + option2 : int, required + Description of option2 + + Examples + -------- + Register and use the data source: + + >>> from pyspark_datasources import MyDataSource + >>> spark.dataSource.register(MyDataSource) + >>> df = spark.read.format("myformat").option("option2", 100).load() + >>> df.show() + +---+-----+ + | id|value| + +---+-----+ + | 1| foo| + | 2| bar| + +---+-----+ + + >>> df.printSchema() + root + |-- id: integer (nullable = true) + |-- value: string (nullable = true) + """ +``` + +## Adding New Data Sources + +### Step 1: Create the Data Source File + +Create a new file in `pyspark_datasources/`: + +```python +# pyspark_datasources/mynewsource.py +from pyspark.sql.datasource import DataSource, DataSourceReader +from pyspark.sql.types import StructType, StructField, StringType + +class MyNewDataSource(DataSource): + def name(self): + return "mynewformat" + + def schema(self): + return StructType([ + StructField("field1", StringType(), True), + StructField("field2", StringType(), True) + ]) + + def reader(self, schema): + return MyNewReader(self.options, schema) + +class MyNewReader(DataSourceReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + + def read(self, partition): + # Implement reading logic + for i in range(10): + yield ("value1", "value2") +``` + +### Step 2: Add to __init__.py + +```python +# pyspark_datasources/__init__.py +from pyspark_datasources.mynewsource import MyNewDataSource + +__all__ = [ + # ... existing exports ... + "MyNewDataSource", +] +``` + +### Step 3: Add Tests + +```python +# tests/test_data_sources.py +def test_mynew_datasource(spark): + from pyspark_datasources import MyNewDataSource + spark.dataSource.register(MyNewDataSource) + + df = spark.read.format("mynewformat").load() + assert df.count() == 10 + assert df.columns == ["field1", "field2"] +``` + +### Step 4: Update Documentation + +Add your data source to the table in README.md with examples. + +## Package Management + +### Adding Dependencies + +```bash +# Add required dependency +poetry add requests + +# Add optional dependency +poetry add --optional faker + +# Add dev dependency +poetry add --group dev pytest-cov + +# Update dependencies +poetry update +``` + +### Managing Extras + +Edit `pyproject.toml` to add optional dependency groups: + +```toml +[tool.poetry.extras] +mynewsource = ["special-library"] +all = ["faker", "datasets", "special-library", ...] +``` + +## Debugging + +### Enable Spark Logging + +```python +import logging +logging.basicConfig(level=logging.INFO) + +# Or in Spark config +spark = SparkSession.builder \ + .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ + .config("spark.log.level", "INFO") \ + .getOrCreate() +``` + +### Debug Data Source + +```python +class DebugReader(DataSourceReader): + def read(self, partition): + print(f"Reading partition: {partition.value if hasattr(partition, 'value') else partition}") + print(f"Options: {self.options}") + + # Your reading logic + for row in data: + print(f"Yielding: {row}") + yield row +``` + +### Common Issues + +1. **Serialization errors**: Ensure all class attributes are pickle-able +2. **Schema mismatch**: Verify returned data matches declared schema +3. **Missing dependencies**: Use try/except to provide helpful error messages +4. **API rate limits**: Implement backoff and retry logic + +## Performance Optimization + +### Use Partitioning + +```python +class OptimizedReader(DataSourceReader): + def partitions(self): + # Split work into multiple partitions + num_partitions = int(self.options.get("numPartitions", "4")) + return [InputPartition(i) for i in range(num_partitions)] + + def read(self, partition): + # Each partition processes its subset + partition_id = partition.value + # Process only this partition's data +``` + +### Use Arrow Format + +```python +import pyarrow as pa + +class ArrowOptimizedReader(DataSourceReader): + def read(self, partition): + # Return pyarrow.RecordBatch for better performance + arrays = [ + pa.array(["value1", "value2"]), + pa.array([1, 2]) + ] + batch = pa.RecordBatch.from_arrays(arrays, names=["col1", "col2"]) + yield batch +``` + +## Continuous Integration + +The project uses GitHub Actions for CI/CD. Workflows are defined in `.github/workflows/`. + +### Running CI Locally + +```bash +# Install act (GitHub Actions locally) +brew install act # macOS + +# Run workflows +act -j test +``` + +## Troubleshooting + +### PyArrow Issues on macOS + +```bash +# Set environment variable +export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES + +# Or in Python +import os +os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES" +``` + +### Poetry Issues + +```bash +# Clear cache +poetry cache clear pypi --all + +# Update lock file +poetry lock --no-update + +# Reinstall +poetry install --remove-untracked +``` + +### Spark Session Issues + +```python +# Stop existing session +from pyspark.sql import SparkSession +spark = SparkSession.getActiveSession() +if spark: + spark.stop() + +# Create new session +spark = SparkSession.builder.appName("debug").getOrCreate() +``` \ No newline at end of file diff --git a/RELEASE.md b/contributing/RELEASE.md similarity index 100% rename from RELEASE.md rename to contributing/RELEASE.md diff --git a/docs/api-reference.md b/docs/api-reference.md new file mode 100644 index 0000000..17a8693 --- /dev/null +++ b/docs/api-reference.md @@ -0,0 +1,557 @@ +# Python Data Source API Reference + +Complete reference for the Python Data Source API introduced in Apache Spark 4.0. + +## Core Abstract Base Classes + +### DataSource + +The primary abstract base class for custom data sources supporting read/write operations. + +```python +class DataSource: + def __init__(self, options: Dict[str, str]) + def name() -> str + def schema() -> StructType + def reader(schema: StructType) -> DataSourceReader + def writer(schema: StructType, overwrite: bool) -> DataSourceWriter + def streamReader(schema: StructType) -> DataSourceStreamReader + def streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter + def simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader +``` + +#### Methods + +| Method | Required | Description | Default | +|--------|----------|-------------|---------| +| `__init__(options)` | No | Initialize with user options | Base class provides default | +| `name()` | No | Return format name for registration | Class name | +| `schema()` | Yes | Define the data source schema | No default | +| `reader(schema)` | If batch read | Create batch reader | Not implemented | +| `writer(schema, overwrite)` | If batch write | Create batch writer | Not implemented | +| `streamReader(schema)` | If streaming* | Create streaming reader | Not implemented | +| `streamWriter(schema, overwrite)` | If stream write | Create streaming writer | Not implemented | +| `simpleStreamReader(schema)` | If streaming* | Create simple streaming reader | Not implemented | + +*For streaming read, implement either `streamReader` or `simpleStreamReader`, not both. + +### DataSourceReader + +Abstract base class for reading data from sources in batch mode. + +```python +class DataSourceReader: + def read(partition) -> Iterator + def partitions() -> List[InputPartition] +``` + +#### Methods + +| Method | Required | Description | Default | +|--------|----------|-------------|---------| +| `read(partition)` | Yes | Read data from partition | No default | +| `partitions()` | No | Return input partitions for parallel reading | Single partition | + +#### Return Types for read() + +The `read()` method can return: +- **Tuples**: Matching the schema field order +- **Row objects**: `from pyspark.sql import Row` +- **pyarrow.RecordBatch**: For better performance + +### DataSourceWriter + +Abstract base class for writing data to external sources in batch mode. + +```python +class DataSourceWriter: + def write(iterator) -> WriterCommitMessage + def commit(messages: List[WriterCommitMessage]) -> WriteResult + def abort(messages: List[WriterCommitMessage]) -> None +``` + +#### Methods + +| Method | Required | Description | +|--------|----------|-------------| +| `write(iterator)` | Yes | Write data from iterator, return commit message | +| `commit(messages)` | Yes | Commit successful writes | +| `abort(messages)` | No | Handle write failures and cleanup | + +### DataSourceStreamReader + +Abstract base class for streaming data sources with full offset management and partition planning. + +```python +class DataSourceStreamReader: + def initialOffset() -> dict + def latestOffset() -> dict + def partitions(start: dict, end: dict) -> List[InputPartition] + def read(partition) -> Iterator + def commit(end: dict) -> None + def stop() -> None +``` + +#### Methods + +| Method | Required | Description | +|--------|----------|-------------| +| `initialOffset()` | Yes | Return starting offset | +| `latestOffset()` | Yes | Return latest available offset | +| `partitions(start, end)` | Yes | Get partitions for offset range | +| `read(partition)` | Yes | Read data from partition | +| `commit(end)` | No | Mark offsets as processed | +| `stop()` | No | Clean up resources | + +### SimpleDataSourceStreamReader + +Simplified streaming reader interface without partition planning. Choose this over `DataSourceStreamReader` when your data source doesn't naturally partition. + +```python +class SimpleDataSourceStreamReader: + def initialOffset() -> dict + def read(start: dict) -> Tuple[Iterator, dict] + def readBetweenOffsets(start: dict, end: dict) -> Iterator + def commit(end: dict) -> None +``` + +#### Methods + +| Method | Required | Description | +|--------|----------|-------------| +| `initialOffset()` | Yes | Return starting offset | +| `read(start)` | Yes | Read from start offset, return (iterator, next_offset) | +| `readBetweenOffsets(start, end)` | Recommended | Deterministic replay between offsets | +| `commit(end)` | No | Mark offsets as processed | + +### DataSourceStreamWriter + +Abstract base class for writing data to external sinks in streaming queries. + +```python +class DataSourceStreamWriter: + def write(iterator) -> WriterCommitMessage + def commit(messages: List[WriterCommitMessage], batchId: int) -> None + def abort(messages: List[WriterCommitMessage], batchId: int) -> None +``` + +#### Methods + +| Method | Required | Description | +|--------|----------|-------------| +| `write(iterator)` | Yes | Write data for a partition | +| `commit(messages, batchId)` | Yes | Commit successful microbatch writes | +| `abort(messages, batchId)` | No | Handle write failures for a microbatch | + +## Helper Classes + +### InputPartition + +Represents a partition of input data. + +```python +class InputPartition: + def __init__(self, value: Any) +``` + +The `value` can be any serializable Python object that identifies the partition (int, dict, tuple, etc.). + +### WriterCommitMessage + +Message returned from successful write operations. + +```python +class WriterCommitMessage: + def __init__(self, value: Any) +``` + +The `value` typically contains metadata about the write (e.g., file path, row count). + +### WriteResult + +Final result after committing all writes. + +```python +class WriteResult: + def __init__(self, **kwargs) +``` + +Can contain any metadata about the completed write operation. + +## Implementation Requirements + +### 1. Serialization + +All classes must be pickle-serializable. This means: + +```python +# ❌ BAD - Connection objects can't be pickled +class BadReader(DataSourceReader): + def __init__(self, options): + import psycopg2 + self.connection = psycopg2.connect(options['url']) + +# ✅ GOOD - Store configuration, create connections in read() +class GoodReader(DataSourceReader): + def __init__(self, options): + self.connection_url = options['url'] + + def read(self, partition): + import psycopg2 + conn = psycopg2.connect(self.connection_url) + try: + # Use connection + yield from fetch_data(conn) + finally: + conn.close() +``` + +### 2. Schema Definition + +Use PySpark's type system: + +```python +from pyspark.sql.types import * + +def schema(self): + return StructType([ + StructField("id", IntegerType(), nullable=False), + StructField("name", StringType(), nullable=True), + StructField("tags", ArrayType(StringType()), nullable=True), + StructField("metadata", MapType(StringType(), StringType()), nullable=True), + StructField("created_at", TimestampType(), nullable=True), + ]) +``` + +### 3. Data Types Support + +Supported Spark SQL data types: + +| Python Type | Spark Type | Notes | +|-------------|------------|-------| +| `int` | `IntegerType`, `LongType` | Use LongType for large integers | +| `float` | `FloatType`, `DoubleType` | DoubleType is default for decimals | +| `str` | `StringType` | Unicode strings | +| `bool` | `BooleanType` | True/False | +| `datetime.date` | `DateType` | Date without time | +| `datetime.datetime` | `TimestampType` | Date with time | +| `bytes` | `BinaryType` | Raw bytes | +| `list` | `ArrayType(elementType)` | Homogeneous arrays | +| `dict` | `MapType(keyType, valueType)` | Key-value pairs | +| `tuple/Row` | `StructType([fields])` | Nested structures | + +### 4. Error Handling + +Implement proper exception handling: + +```python +def read(self, partition): + try: + data = fetch_external_data() + yield from data + except NetworkError as e: + if self.options.get("failOnError", "true") == "true": + raise DataSourceError(f"Failed to fetch data: {e}") + else: + # Log and continue + logger.warning(f"Skipping partition due to error: {e}") + return # Empty iterator +``` + +### 5. Resource Management + +Clean up resources properly: + +```python +class ManagedReader(DataSourceStreamReader): + def stop(self): + """Called when streaming query stops.""" + if hasattr(self, 'connection_pool'): + self.connection_pool.close() + if hasattr(self, 'temp_files'): + for f in self.temp_files: + os.remove(f) +``` + +### 6. File Path Convention + +When dealing with file paths: + +```python +# ✅ CORRECT - Path in load() +df = spark.read.format("myformat").load("/path/to/data") + +# ❌ INCORRECT - Path in option() +df = spark.read.format("myformat").option("path", "/path/to/data").load() +``` + +### 7. Lazy Initialization + +Defer expensive operations: + +```python +class LazyReader(DataSourceReader): + def __init__(self, options): + # ✅ Just store configuration + self.api_url = options['url'] + self.api_key = options['apiKey'] + + def read(self, partition): + # ✅ Expensive operations here + session = create_authenticated_session(self.api_key) + data = session.get(self.api_url) + yield from parse_data(data) +``` + +## Performance Optimization Techniques + +### 1. Arrow Integration + +Return `pyarrow.RecordBatch` for better serialization: + +```python +import pyarrow as pa + +def read(self, partition): + # Read data efficiently + data = fetch_data_batch() + + # Convert to Arrow + batch = pa.RecordBatch.from_pandas(data) + yield batch +``` + +Benefits: +- Zero-copy data transfer when possible +- Columnar format efficiency +- Better memory usage + +### 2. Partitioning Strategy + +Effective partitioning for parallelism: + +```python +def partitions(self): + # Example: Partition by date ranges + dates = get_date_range(self.start_date, self.end_date) + return [InputPartition({"date": date}) for date in dates] + +def read(self, partition): + date = partition.value["date"] + # Read only data for this date + yield from read_data_for_date(date) +``` + +Guidelines: +- Aim for equal-sized partitions +- Consider data locality +- Balance between too many small partitions and too few large ones + +### 3. Batch Processing + +Process data in batches: + +```python +def read(self, partition): + batch_size = 1000 + offset = 0 + + while True: + batch = fetch_batch(offset, batch_size) + if not batch: + break + + # Process entire batch at once + processed = process_batch(batch) + yield from processed + + offset += batch_size +``` + +### 4. Connection Pooling + +Reuse connections within partitions: + +```python +def read(self, partition): + # Create connection once per partition + conn = create_connection() + try: + # Reuse for all records in partition + for record_id in partition.value["ids"]: + data = fetch_with_connection(conn, record_id) + yield data + finally: + conn.close() +``` + +### 5. Caching Strategies + +Cache frequently accessed data: + +```python +class CachedReader(DataSourceReader): + _cache = {} # Class-level cache + + def read(self, partition): + cache_key = partition.value["key"] + + if cache_key not in self._cache: + self._cache[cache_key] = fetch_expensive_data(cache_key) + + data = self._cache[cache_key] + yield from process_data(data) +``` + +## Usage Examples + +### Registration and Basic Usage + +```python +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("datasource-example").getOrCreate() + +# Register the data source +spark.dataSource.register(MyDataSource) + +# Use with format() +df = spark.read.format("myformat").option("key", "value").load() + +# Use with streaming +stream = spark.readStream.format("myformat").load() +``` + +### Schema Handling + +```python +# Let data source define schema +df = spark.read.format("myformat").load() + +# Override with custom schema +custom_schema = StructType([ + StructField("id", LongType()), + StructField("value", DoubleType()) +]) +df = spark.read.format("myformat").schema(custom_schema).load() + +# Schema subset - read only specific columns +df = spark.read.format("myformat").load().select("id", "name") +``` + +### Options Pattern + +```python +df = spark.read.format("myformat") \ + .option("url", "https://api.example.com") \ + .option("apiKey", "secret-key") \ + .option("timeout", "30") \ + .option("retries", "3") \ + .option("batchSize", "1000") \ + .load() +``` + +## Common Pitfalls and Solutions + +### Pitfall 1: Non-Serializable Classes + +**Problem**: Storing database connections, HTTP clients, or other non-serializable objects as instance variables. + +**Solution**: Create these objects in `read()` method. + +### Pitfall 2: Ignoring Schema Parameter + +**Problem**: Always using `self.schema()` instead of the schema parameter passed to `reader()`. + +**Solution**: Respect user-specified schema for column pruning. + +### Pitfall 3: Blocking in Constructor + +**Problem**: Making API calls or heavy I/O in `__init__`. + +**Solution**: Defer to `read()` method for lazy evaluation. + +### Pitfall 4: No Partitioning + +**Problem**: Not implementing `partitions()`, limiting parallelism. + +**Solution**: Implement logical partitioning based on your data source. + +### Pitfall 5: Poor Error Messages + +**Problem**: Generic exceptions without context. + +**Solution**: Provide detailed, actionable error messages. + +### Pitfall 6: Resource Leaks + +**Problem**: Not cleaning up connections, files, or memory. + +**Solution**: Use try/finally blocks and implement `stop()` for streaming. + +### Pitfall 7: Inefficient Data Transfer + +**Problem**: Yielding individual rows for large datasets. + +**Solution**: Use Arrow RecordBatch or batch processing. + +## Testing Guidelines + +### Unit Testing + +```python +import pytest +from pyspark.sql import SparkSession + +@pytest.fixture(scope="session") +def spark(): + return SparkSession.builder \ + .appName("test") \ + .master("local[2]") \ + .config("spark.sql.shuffle.partitions", "2") \ + .getOrCreate() + +def test_reader(spark): + spark.dataSource.register(MyDataSource) + df = spark.read.format("myformat").load() + assert df.count() > 0 + assert df.schema == expected_schema +``` + +### Integration Testing + +```python +def test_with_real_data(spark, real_api_key): + spark.dataSource.register(MyDataSource) + + df = spark.read.format("myformat") \ + .option("apiKey", real_api_key) \ + .load() + + # Verify real data + assert df.count() > 0 + df.write.mode("overwrite").parquet("/tmp/test_output") +``` + +### Performance Testing + +```python +def test_performance(spark, benchmark): + spark.dataSource.register(MyDataSource) + + with benchmark("read_performance"): + df = spark.read.format("myformat") \ + .option("numRows", "1000000") \ + .load() + count = df.count() + + assert count == 1000000 + benchmark.assert_timing(max_seconds=10) +``` + +## Additional Resources + +- [Apache Spark Python Data Source API](https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html) +- [Source Code](https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py) +- [Example Implementations](https://github.com/allisonwang-db/pyspark-data-sources) +- [Simple Stream Reader Architecture](../contributing/simple-stream-reader-architecture.md) \ No newline at end of file diff --git a/docs/building-data-sources.md b/docs/building-data-sources.md new file mode 100644 index 0000000..d32e069 --- /dev/null +++ b/docs/building-data-sources.md @@ -0,0 +1,692 @@ +# Building Custom Data Sources + +This guide teaches you how to create your own custom data sources using the Python Data Source API in Apache Spark 4.0+. + +## Table of Contents +- [Basic Implementation Pattern](#basic-implementation-pattern) +- [Advanced Features](#advanced-features) +- [Best Practices](#best-practices) +- [Common Patterns](#common-patterns) +- [Debugging and Testing](#debugging-and-testing) + +## Basic Implementation Pattern + +Every custom data source starts with two classes: a `DataSource` and a `DataSourceReader`. + +### Minimal Example + +```python +from pyspark.sql.datasource import DataSource, DataSourceReader +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +class MyCustomDataSource(DataSource): + """Custom data source implementation.""" + + def __init__(self, options): + """Initialize with user-provided options.""" + self.options = options + + def name(self): + """Return the short name for this data source.""" + return "mycustom" + + def schema(self): + """Define the schema for this data source.""" + return StructType([ + StructField("id", IntegerType(), nullable=False), + StructField("name", StringType(), nullable=True), + StructField("value", StringType(), nullable=True) + ]) + + def reader(self, schema): + """Create a reader for batch data.""" + return MyCustomReader(self.options, schema) + +class MyCustomReader(DataSourceReader): + """Reader implementation for the custom data source.""" + + def __init__(self, options, schema): + self.options = options + self.schema = schema + + def read(self, partition): + """ + Read data from the source. + Yields tuples matching the schema. + """ + # Example: Read from an API, file, database, etc. + for i in range(10): + yield (i, f"name_{i}", f"value_{i}") + +# Register and use +spark.dataSource.register(MyCustomDataSource) +df = spark.read.format("mycustom").load() +df.show() +``` + +## Advanced Features + +### 1. Partitioned Reading for Parallelism + +Split your data into partitions to leverage Spark's parallel processing capabilities. + +```python +from pyspark.sql.datasource import InputPartition + +class PartitionedReader(DataSourceReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self.total_records = int(options.get("totalRecords", "1000")) + + def partitions(self): + """Split data into partitions for parallel processing.""" + num_partitions = int(self.options.get("numPartitions", "4")) + records_per_partition = self.total_records // num_partitions + + partitions = [] + for i in range(num_partitions): + start = i * records_per_partition + end = start + records_per_partition if i < num_partitions - 1 else self.total_records + partitions.append(InputPartition({"start": start, "end": end})) + + return partitions + + def read(self, partition): + """Read only this partition's data.""" + partition_info = partition.value + start = partition_info["start"] + end = partition_info["end"] + + for i in range(start, end): + yield (i, f"data_{i}", f"value_{i}") +``` + +### 2. Streaming Data Source + +Implement streaming capabilities with offset management. + +```python +from pyspark.sql.datasource import DataSourceStreamReader, InputPartition +import time +import json + +class MyStreamReader(DataSourceStreamReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self.rate = int(options.get("rowsPerSecond", "10")) + + def initialOffset(self): + """Return the initial offset to start reading from.""" + return {"timestamp": 0, "offset": 0} + + def latestOffset(self): + """Return the latest available offset.""" + current_time = int(time.time()) + # Calculate how many records should be available + if not hasattr(self, '_start_time'): + self._start_time = current_time + self._last_offset = 0 + + elapsed = current_time - self._start_time + latest = elapsed * self.rate + return {"timestamp": current_time, "offset": latest} + + def partitions(self, start, end): + """Return partitions between start and end offsets.""" + start_offset = start["offset"] + end_offset = end["offset"] + + # Create a single partition for simplicity + # In production, you'd split this into multiple partitions + return [InputPartition({"start": start_offset, "end": end_offset})] + + def read(self, partition): + """Read data for the partition.""" + partition_data = partition.value + start_offset = partition_data["start"] + end_offset = partition_data["end"] + + # Generate data between offsets + for offset in range(int(start_offset), int(end_offset)): + timestamp = self._start_time + (offset // self.rate) + yield (offset, f"stream_data_{offset}", timestamp) + + def commit(self, end): + """Commit the offset after successful processing.""" + print(f"Committed offset: {end}") + + def stop(self): + """Clean up resources when streaming stops.""" + print("Streaming stopped") +``` + +### 3. Simple Streaming Reader (Without Partitions) + +For simpler streaming scenarios where partitioning adds unnecessary complexity. + +```python +from pyspark.sql.datasource import SimpleDataSourceStreamReader +import time + +class SimpleStreamReader(SimpleDataSourceStreamReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self.batch_size = int(options.get("batchSize", "100")) + + def initialOffset(self): + """Return the initial offset.""" + return {"offset": 0} + + def read(self, start): + """Read from start offset and return data with next offset.""" + current_offset = start["offset"] + + # Generate batch of data + data = [] + for i in range(self.batch_size): + record_id = current_offset + i + data.append((record_id, f"data_{record_id}", time.time())) + + # Return data iterator and next offset + next_offset = {"offset": current_offset + self.batch_size} + return iter(data), next_offset + + def readBetweenOffsets(self, start, end): + """Read deterministically between offsets for recovery.""" + start_offset = start["offset"] + end_offset = end["offset"] + + data = [] + for i in range(start_offset, end_offset): + data.append((i, f"data_{i}", "replay")) + + return iter(data) + + def commit(self, end): + """Optional: Mark offset as processed.""" + print(f"Processed up to offset: {end}") +``` + +### 4. Arrow-Optimized Reader + +Use PyArrow for better performance with large datasets. + +```python +import pyarrow as pa +import numpy as np + +class ArrowOptimizedReader(DataSourceReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self.batch_size = int(options.get("batchSize", "10000")) + + def read(self, partition): + """Return data as PyArrow RecordBatch for better performance.""" + # Generate data efficiently using NumPy + ids = np.arange(self.batch_size) + names = [f"name_{i}" for i in ids] + values = np.random.random(self.batch_size) + + # Create Arrow RecordBatch + batch = pa.RecordBatch.from_arrays([ + pa.array(ids), + pa.array(names), + pa.array(values) + ], names=["id", "name", "value"]) + + yield batch + + def partitions(self): + """Create multiple partitions for parallel Arrow processing.""" + num_partitions = int(self.options.get("numPartitions", "4")) + return [InputPartition(i) for i in range(num_partitions)] +``` + +### 5. Data Source with Write Support + +Implement both reading and writing capabilities. + +```python +from pyspark.sql.datasource import DataSourceWriter, WriterCommitMessage +import json + +class MyDataSourceWithWrite(DataSource): + def writer(self, schema, overwrite): + """Create a writer for batch data.""" + return MyWriter(self.options, schema, overwrite) + +class MyWriter(DataSourceWriter): + def __init__(self, options, schema, overwrite): + self.options = options + self.schema = schema + self.overwrite = overwrite + self.path = options.get("path", "/tmp/output") + + def write(self, iterator): + """Write data from iterator to destination.""" + import tempfile + import os + + # Write to a temporary file + with tempfile.NamedTemporaryFile(mode='w', delete=False, + suffix='.json', dir=self.path) as f: + count = 0 + for row in iterator: + # Convert row to dict and write as JSON + row_dict = row.asDict() if hasattr(row, 'asDict') else dict(zip(self.schema.names, row)) + json.dump(row_dict, f) + f.write('\n') + count += 1 + + temp_path = f.name + + # Return commit message with metadata + return WriterCommitMessage({"file": temp_path, "count": count}) + + def commit(self, messages): + """Commit all successful writes.""" + total_count = 0 + files = [] + + for msg in messages: + total_count += msg.value["count"] + files.append(msg.value["file"]) + + print(f"Successfully wrote {total_count} records to {len(files)} files") + return {"status": "success", "files": files, "total_count": total_count} + + def abort(self, messages): + """Clean up on failure.""" + import os + for msg in messages: + if msg and "file" in msg.value: + try: + os.remove(msg.value["file"]) + except: + pass + print("Write aborted, temporary files cleaned up") +``` + +## Best Practices + +### 1. Always Validate Options + +```python +def __init__(self, options): + # Validate required options + if "api_key" not in options: + raise ValueError("api_key is required for this data source") + + # Parse options with defaults and type checking + self.api_key = options["api_key"] + self.batch_size = int(options.get("batchSize", "1000")) + self.timeout = float(options.get("timeout", "30.0")) + + # Validate option values + if self.batch_size <= 0: + raise ValueError("batchSize must be positive") + + self.options = options +``` + +### 2. Handle Errors Gracefully + +```python +def read(self, partition): + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + # Attempt to fetch data + data = self._fetch_data_from_api() + for record in data: + yield self._transform_record(record) + return # Success, exit + + except ConnectionError as e: + if attempt < max_retries - 1: + print(f"Connection failed, retrying in {retry_delay}s...") + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + # Final attempt failed + if self.options.get("failOnError", "true").lower() == "true": + raise + else: + print(f"Skipping partition due to error: {e}") + return # Return empty iterator + +def _fetch_data_from_api(self): + """Fetch data with proper error handling.""" + import requests + + response = requests.get( + self.api_url, + timeout=self.timeout, + headers={"Authorization": f"Bearer {self.api_key}"} + ) + response.raise_for_status() # Raise on HTTP errors + return response.json() +``` + +### 3. Make Classes Serializable + +```python +class SerializableReader(DataSourceReader): + def __init__(self, options, schema): + # ✅ Store only serializable data + self.connection_string = options.get("connectionString") + self.query = options.get("query") + self.schema = schema + + # ❌ Don't store non-serializable objects + # self.connection = psycopg2.connect(connection_string) + # self.http_client = requests.Session() + + def read(self, partition): + # Create non-serializable objects locally in read() + import psycopg2 + + conn = psycopg2.connect(self.connection_string) + try: + cursor = conn.cursor() + cursor.execute(self.query) + + for row in cursor: + yield row + finally: + conn.close() +``` + +### 4. Implement Resource Management + +```python +class ResourceManagedReader(DataSourceStreamReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self._resources = [] + + def _acquire_resource(self, resource_id): + """Track resources for cleanup.""" + resource = self._create_resource(resource_id) + self._resources.append(resource) + return resource + + def stop(self): + """Clean up all resources when streaming stops.""" + for resource in self._resources: + try: + resource.close() + except Exception as e: + print(f"Error closing resource: {e}") + + self._resources.clear() + print("All resources cleaned up") + + def __del__(self): + """Ensure cleanup even if stop() isn't called.""" + if self._resources: + self.stop() +``` + +### 5. Optimize for Performance + +```python +class PerformantReader(DataSourceReader): + def read(self, partition): + # Use batch fetching instead of row-by-row + batch_size = 1000 + offset = 0 + + while True: + # Fetch batch of records + batch = self._fetch_batch(offset, batch_size) + if not batch: + break + + # Use generator for memory efficiency + for record in batch: + yield record + + offset += batch_size + + def _fetch_batch(self, offset, limit): + """Fetch a batch of records efficiently.""" + # Example: Use SQL LIMIT/OFFSET + query = f"SELECT * FROM table LIMIT {limit} OFFSET {offset}" + return self._execute_query(query) +``` + +## Common Patterns + +### Reading from REST APIs + +```python +class RestApiReader(DataSourceReader): + def __init__(self, options, schema): + self.base_url = options.get("url") + self.auth_token = options.get("authToken") + self.page_size = int(options.get("pageSize", "100")) + self.schema = schema + + def read(self, partition): + import requests + + headers = {"Authorization": f"Bearer {self.auth_token}"} if self.auth_token else {} + page = 1 + + while True: + response = requests.get( + f"{self.base_url}?page={page}&size={self.page_size}", + headers=headers + ) + response.raise_for_status() + + data = response.json() + if not data.get("items"): + break + + for item in data["items"]: + yield self._transform_to_row(item) + + if not data.get("hasMore", False): + break + + page += 1 +``` + +### Reading from Databases + +```python +class DatabaseReader(DataSourceReader): + def __init__(self, options, schema): + self.jdbc_url = options.get("url") + self.table = options.get("table") + self.username = options.get("username") + self.password = options.get("password") + self.schema = schema + + def partitions(self): + """Partition by ID ranges for parallel reads.""" + import jaydebeapi + + conn = self._get_connection() + cursor = conn.cursor() + + # Get min and max IDs + cursor.execute(f"SELECT MIN(id), MAX(id) FROM {self.table}") + min_id, max_id = cursor.fetchone() + conn.close() + + # Create partitions + num_partitions = 4 + range_size = (max_id - min_id + 1) // num_partitions + + partitions = [] + for i in range(num_partitions): + start = min_id + (i * range_size) + end = start + range_size if i < num_partitions - 1 else max_id + 1 + partitions.append(InputPartition({"start": start, "end": end})) + + return partitions + + def read(self, partition): + conn = self._get_connection() + cursor = conn.cursor() + + bounds = partition.value + query = f""" + SELECT * FROM {self.table} + WHERE id >= {bounds['start']} AND id < {bounds['end']} + """ + + cursor.execute(query) + for row in cursor: + yield row + + conn.close() + + def _get_connection(self): + import jaydebeapi + return jaydebeapi.connect( + "com.mysql.jdbc.Driver", + self.jdbc_url, + [self.username, self.password] + ) +``` + +### Handling Authentication + +```python +class AuthenticatedDataSource(DataSource): + def __init__(self, options): + # Handle different authentication methods + if "api_key" in options: + self.auth_method = "api_key" + self.credentials = options["api_key"] + elif "oauth_token" in options: + self.auth_method = "oauth" + self.credentials = options["oauth_token"] + elif "username" in options and "password" in options: + self.auth_method = "basic" + self.credentials = (options["username"], options["password"]) + else: + raise ValueError("No valid authentication method provided") + + self.options = options + + def reader(self, schema): + return AuthenticatedReader(self.options, self.auth_method, self.credentials, schema) +``` + +## Debugging and Testing + +### Enable Debug Logging + +```python +class DebuggableReader(DataSourceReader): + def __init__(self, options, schema): + self.options = options + self.schema = schema + self.debug = options.get("debug", "false").lower() == "true" + + def read(self, partition): + if self.debug: + print(f"Starting read for partition: {partition.value if hasattr(partition, 'value') else 'single'}") + print(f"Options: {self.options}") + print(f"Schema: {self.schema}") + + count = 0 + for record in self._read_data(): + if self.debug and count < 5: + print(f"Record {count}: {record}") + + yield record + count += 1 + + if self.debug: + print(f"Finished reading {count} records") +``` + +### Unit Testing Your Data Source + +```python +import pytest +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, IntegerType, StringType + +@pytest.fixture +def spark(): + return SparkSession.builder \ + .appName("test") \ + .config("spark.sql.shuffle.partitions", "1") \ + .getOrCreate() + +def test_basic_read(spark): + # Register your data source + spark.dataSource.register(MyCustomDataSource) + + # Read data + df = spark.read.format("mycustom") \ + .option("numRows", "10") \ + .load() + + # Verify results + assert df.count() == 10 + assert len(df.columns) == 3 + assert df.schema == expected_schema + +def test_partitioned_read(spark): + spark.dataSource.register(MyCustomDataSource) + + df = spark.read.format("mycustom") \ + .option("numPartitions", "4") \ + .load() + + # Check partitioning + assert df.rdd.getNumPartitions() == 4 + + # Verify data integrity + assert df.select("id").distinct().count() == df.count() + +def test_error_handling(spark): + spark.dataSource.register(MyCustomDataSource) + + # Test missing required option + with pytest.raises(ValueError, match="api_key is required"): + spark.read.format("mycustom").load() + +def test_streaming(spark): + spark.dataSource.register(MyStreamingDataSource) + + # Start streaming query + query = spark.readStream.format("mystreaming") \ + .load() \ + .writeStream \ + .format("memory") \ + .queryName("test_stream") \ + .start() + + # Wait for some data + query.processAllAvailable() + + # Verify streamed data + result = spark.table("test_stream") + assert result.count() > 0 + + query.stop() +``` + +## Next Steps + +- Review the [API Reference](api-reference.md) for complete method signatures +- Study the [example data sources](data-sources-guide.md) for real-world patterns +- Check the [Development Guide](../contributing/DEVELOPMENT.md) for testing and debugging tips + +Remember that building good data sources is iterative. Start simple, test thoroughly, and gradually add features like partitioning, streaming, and optimization as needed. \ No newline at end of file diff --git a/docs/data-sources-guide.md b/docs/data-sources-guide.md new file mode 100644 index 0000000..08113ca --- /dev/null +++ b/docs/data-sources-guide.md @@ -0,0 +1,498 @@ +# Data Sources Guide + +This guide provides detailed examples and usage patterns for all available data sources in the PySpark Data Sources library. + +## Table of Contents +1. [FakeDataSource - Generate Synthetic Data](#1-fakedatasource---generate-synthetic-data) +2. [GitHubDataSource - Read GitHub Pull Requests](#2-githubdatasource---read-github-pull-requests) +3. [GoogleSheetsDataSource - Read Public Google Sheets](#3-googlesheetsdatasource---read-public-google-sheets) +4. [HuggingFaceDataSource - Load Datasets from Hugging Face Hub](#4-huggingfacedatasource---load-datasets-from-hugging-face-hub) +5. [StockDataSource - Fetch Stock Market Data](#5-stockdatasource---fetch-stock-market-data) +6. [OpenSkyDataSource - Stream Live Flight Data](#6-openskydatasource---stream-live-flight-data) +7. [KaggleDataSource - Load Kaggle Datasets](#7-kaggledatasource---load-kaggle-datasets) +8. [ArrowDataSource - Read Apache Arrow Files](#8-arrowdatasource---read-apache-arrow-files) +9. [LanceDataSource - Vector Database Format](#9-lancedatasource---vector-database-format) + +## 1. FakeDataSource - Generate Synthetic Data + +Generate fake data using the Faker library for testing and development. + +### Installation +```bash +pip install pyspark-data-sources[faker] +``` + +### Basic Usage +```python +from pyspark_datasources import FakeDataSource + +spark.dataSource.register(FakeDataSource) + +# Basic usage with default schema +df = spark.read.format("fake").load() +df.show() +# +-----------+----------+-------+-------+ +# | name| date|zipcode| state| +# +-----------+----------+-------+-------+ +# |Carlos Cobb|2018-07-15| 73003|Indiana| +# | Eric Scott|1991-08-22| 10085| Idaho| +# | Amy Martin|1988-10-28| 68076| Oregon| +# +-----------+----------+-------+-------+ +``` + +### Custom Schema +Field names must match Faker provider methods. See [Faker documentation](https://faker.readthedocs.io/en/master/providers.html) for available providers. + +```python +# Custom schema - field names must match Faker provider methods +df = spark.read.format("fake") \ + .schema("name string, email string, phone_number string, company string") \ + .option("numRows", 5) \ + .load() +df.show(truncate=False) +# +------------------+-------------------------+----------------+-----------------+ +# |name |email |phone_number |company | +# +------------------+-------------------------+----------------+-----------------+ +# |Christine Sampson |johnsonjeremy@example.com|+1-673-684-4608 |Hernandez-Nguyen | +# |Yolanda Brown |williamlowe@example.net |(262)562-4152 |Miller-Hernandez | +# |Joshua Hernandez |mary38@example.com |7785366623 |Davis Group | +# |Joseph Gallagher |katiepatterson@example.net|+1-648-619-0997 |Brown-Fleming | +# |Tina Morrison |johnbell@example.com |(684)329-3298 |Sherman PLC | +# +------------------+-------------------------+----------------+-----------------+ +``` + +### Streaming Usage +```python +# Streaming usage - generates continuous data +stream = spark.readStream.format("fake") \ + .option("rowsPerMicrobatch", 10) \ + .load() + +query = stream.writeStream \ + .format("console") \ + .outputMode("append") \ + .trigger(processingTime="5 seconds") \ + .start() +``` + +## 2. GitHubDataSource - Read GitHub Pull Requests + +Fetch pull request data from GitHub repositories using the public GitHub API. + +### Basic Usage +```python +from pyspark_datasources import GithubDataSource + +spark.dataSource.register(GithubDataSource) + +# Read pull requests from a public repository +df = spark.read.format("github").load("apache/spark") +df.select("number", "title", "state", "user", "created_at").show(truncate=False) +# +------+--------------------------------------------+------+---------------+--------------------+ +# |number|title |state |user |created_at | +# +------+--------------------------------------------+------+---------------+--------------------+ +# |48730 |[SPARK-49998] Fix JSON parsing regression |open |john_doe |2024-11-20T10:15:30Z| +# |48729 |[SPARK-49997] Improve error messages |merged|contributor123 |2024-11-19T15:22:45Z| +# +------+--------------------------------------------+------+---------------+--------------------+ + +# Schema of the GitHub data source +df.printSchema() +# root +# |-- number: integer (nullable = true) +# |-- title: string (nullable = true) +# |-- state: string (nullable = true) +# |-- user: string (nullable = true) +# |-- created_at: string (nullable = true) +# |-- updated_at: string (nullable = true) +``` + +### Notes +- Uses the public GitHub API (rate limited to 60 requests/hour for unauthenticated requests) +- Returns the most recent pull requests +- Repository format: "owner/repository" + +## 3. GoogleSheetsDataSource - Read Public Google Sheets + +Read data from publicly accessible Google Sheets. + +### Basic Usage +```python +from pyspark_datasources import GoogleSheetsDataSource + +spark.dataSource.register(GoogleSheetsDataSource) + +# Read from a public Google Sheet +sheet_url = "https://docs.google.com/spreadsheets/d/1H7bKPGpAXbPRhTYFxqg1h6FmKl5ZhCTM_5OlAqCHfVs" +df = spark.read.format("googlesheets").load(sheet_url) +df.show() +# +------+-------+--------+ +# |Name |Age |City | +# +------+-------+--------+ +# |Alice |25 |NYC | +# |Bob |30 |LA | +# |Carol |28 |Chicago | +# +------+-------+--------+ +``` + +### Specify Sheet Name or Index +```python +# Read a specific sheet by name +df = spark.read.format("googlesheets") \ + .option("sheetName", "Sheet2") \ + .load(sheet_url) + +# Or by index (0-based) +df = spark.read.format("googlesheets") \ + .option("sheetIndex", "1") \ + .load(sheet_url) +``` + +### Requirements +- Sheet must be publicly accessible (anyone with link can view) +- First row is used as column headers +- Data is read as strings by default + +## 4. HuggingFaceDataSource - Load Datasets from Hugging Face Hub + +Access thousands of datasets from the Hugging Face Hub. + +### Installation +```bash +pip install pyspark-data-sources[huggingface] +``` + +### Basic Usage +```python +from pyspark_datasources import HuggingFaceDataSource + +spark.dataSource.register(HuggingFaceDataSource) + +# Load a dataset from Hugging Face +df = spark.read.format("huggingface").load("imdb") +df.select("text", "label").show(2, truncate=False) +# +--------------------------------------------------+-----+ +# |text |label| +# +--------------------------------------------------+-----+ +# |I rented this movie last night and I must say... |0 | +# |This film is absolutely fantastic! The acting... |1 | +# +--------------------------------------------------+-----+ +``` + +### Advanced Options +```python +# Load specific split +df = spark.read.format("huggingface") \ + .option("split", "train") \ + .load("squad") + +# Load dataset with configuration +df = spark.read.format("huggingface") \ + .option("config", "plain_text") \ + .load("wikipedia") + +# Load specific subset +df = spark.read.format("huggingface") \ + .option("split", "validation") \ + .option("config", "en") \ + .load("wikipedia") +``` + +### Performance Tips +- The data source automatically partitions large datasets for parallel processing +- First load may be slow as it downloads and caches the dataset +- Subsequent reads use the cached data + +## 5. StockDataSource - Fetch Stock Market Data + +Get stock market data from Alpha Vantage API. + +### Setup +Obtain a free API key from [Alpha Vantage](https://www.alphavantage.co/support/#api-key). + +### Basic Usage +```python +from pyspark_datasources import StockDataSource + +spark.dataSource.register(StockDataSource) + +# Fetch stock data (requires Alpha Vantage API key) +df = spark.read.format("stock") \ + .option("symbols", "AAPL,GOOGL,MSFT") \ + .option("api_key", "YOUR_API_KEY") \ + .load() + +df.show() +# +------+----------+------+------+------+------+--------+ +# |symbol|timestamp |open |high |low |close |volume | +# +------+----------+------+------+------+------+--------+ +# |AAPL |2024-11-20|175.50|178.25|175.00|177.80|52341200| +# |GOOGL |2024-11-20|142.30|143.90|141.50|143.25|18234500| +# |MSFT |2024-11-20|425.10|428.75|424.50|427.90|12456300| +# +------+----------+------+------+------+------+--------+ + +# Schema +df.printSchema() +# root +# |-- symbol: string (nullable = true) +# |-- timestamp: string (nullable = true) +# |-- open: double (nullable = true) +# |-- high: double (nullable = true) +# |-- low: double (nullable = true) +# |-- close: double (nullable = true) +# |-- volume: long (nullable = true) +``` + +### Options +- `symbols`: Comma-separated list of stock symbols +- `api_key`: Your Alpha Vantage API key +- `function`: Time series function (default: "TIME_SERIES_DAILY") + +## 6. OpenSkyDataSource - Stream Live Flight Data + +Stream real-time flight tracking data from OpenSky Network. + +### Streaming Usage +```python +from pyspark_datasources import OpenSkyDataSource + +spark.dataSource.register(OpenSkyDataSource) + +# Stream flight data for a specific region +stream = spark.readStream.format("opensky") \ + .option("region", "EUROPE") \ + .load() + +# Process the stream +query = stream.select("icao24", "callsign", "origin_country", "longitude", "latitude", "altitude") \ + .writeStream \ + .format("console") \ + .outputMode("append") \ + .trigger(processingTime="10 seconds") \ + .start() + +# Sample output: +# +------+--------+--------------+---------+--------+--------+ +# |icao24|callsign|origin_country|longitude|latitude|altitude| +# +------+--------+--------------+---------+--------+--------+ +# |4b1806|SWR123 |Switzerland |8.5123 |47.3765 |10058.4 | +# |3c6750|DLH456 |Germany |13.4050 |52.5200 |11887.2 | +# +------+--------+--------------+---------+--------+--------+ +``` + +### Batch Usage +```python +# Read current flight data as batch +df = spark.read.format("opensky") \ + .option("region", "USA") \ + .load() + +df.count() # Number of flights currently in USA airspace +``` + +### Region Options +- `EUROPE`: European airspace +- `USA`: United States airspace +- `ASIA`: Asian airspace +- `WORLD`: Global (all flights) + +## 7. KaggleDataSource - Load Kaggle Datasets + +Download and read datasets from Kaggle. + +### Setup +1. Install Kaggle API: `pip install pyspark-data-sources[kaggle]` +2. Get API credentials from https://www.kaggle.com/account +3. Place credentials in `~/.kaggle/kaggle.json` + +### Basic Usage +```python +from pyspark_datasources import KaggleDataSource + +spark.dataSource.register(KaggleDataSource) + +# Load a Kaggle dataset +df = spark.read.format("kaggle").load("titanic") +df.select("PassengerId", "Name", "Age", "Survived").show(5) +# +-----------+--------------------+----+--------+ +# |PassengerId|Name |Age |Survived| +# +-----------+--------------------+----+--------+ +# |1 |Braund, Mr. Owen |22.0|0 | +# |2 |Cumings, Mrs. John |38.0|1 | +# |3 |Heikkinen, Miss Laina|26.0|1 | +# |4 |Futrelle, Mrs Jacques|35.0|1 | +# |5 |Allen, Mr. William |35.0|0 | +# +-----------+--------------------+----+--------+ +``` + +### Multi-file Datasets +```python +# Load specific file from multi-file dataset +df = spark.read.format("kaggle") \ + .option("file", "train.csv") \ + .load("competitionname/datasetname") + +# Load all CSV files +df = spark.read.format("kaggle") \ + .option("pattern", "*.csv") \ + .load("multi-file-dataset") +``` + +## 8. ArrowDataSource - Read Apache Arrow Files + +Efficiently read Arrow format files with zero-copy operations. + +### Installation +```bash +pip install pyspark-data-sources[arrow] +``` + +### Usage +```python +from pyspark_datasources import ArrowDataSource + +spark.dataSource.register(ArrowDataSource) + +# Read Arrow files +df = spark.read.format("arrow").load("/path/to/data.arrow") + +# Read multiple Arrow files +df = spark.read.format("arrow").load("/path/to/arrow/files/*.arrow") + +# The Arrow reader is optimized for performance +df.count() # Fast counting due to Arrow metadata + +# Schema is preserved from Arrow files +df.printSchema() +``` + +### Performance Benefits +- Zero-copy reads when possible +- Preserves Arrow metadata +- Efficient columnar data access +- Automatic schema inference + +## 9. LanceDataSource - Vector Database Format + +Read and write Lance format data, optimized for vector/ML workloads. + +### Installation +```bash +pip install pyspark-data-sources[lance] +``` + +### Write Data +```python +from pyspark_datasources import LanceDataSource + +spark.dataSource.register(LanceDataSource) + +# Prepare your DataFrame +df = spark.range(1000).selectExpr( + "id", + "array(rand(), rand(), rand()) as vector", + "rand() as score" +) + +# Write DataFrame to Lance format +df.write.format("lance") \ + .mode("overwrite") \ + .save("/path/to/lance/dataset") +``` + +### Read Data +```python +# Read Lance dataset +lance_df = spark.read.format("lance") \ + .load("/path/to/lance/dataset") + +lance_df.show(5) + +# Lance preserves vector columns efficiently +lance_df.printSchema() +# root +# |-- id: long (nullable = false) +# |-- vector: array (nullable = true) +# | |-- element: double (containsNull = true) +# |-- score: double (nullable = true) +``` + +### Features +- Optimized for vector/embedding data +- Efficient storage of array columns +- Fast random access +- Version control built-in + +## Common Patterns + +### Error Handling +Most data sources will raise informative errors when required options are missing: + +```python +# This will raise an error about missing API key +df = spark.read.format("stock") \ + .option("symbols", "AAPL") \ + .load() +# ValueError: api_key option is required for StockDataSource +``` + +### Schema Inference vs Specification +Some data sources support both automatic schema inference and explicit schema specification: + +```python +# Automatic schema inference +df = spark.read.format("fake").load() + +# Explicit schema specification +from pyspark.sql.types import StructType, StructField, StringType + +schema = StructType([ + StructField("first_name", StringType()), + StructField("last_name", StringType()), + StructField("email", StringType()) +]) + +df = spark.read.format("fake") \ + .schema(schema) \ + .load() +``` + +### Partitioning for Performance +Many data sources automatically partition data for parallel processing: + +```python +# HuggingFace automatically partitions large datasets +df = spark.read.format("huggingface").load("wikipedia") +df.rdd.getNumPartitions() # Returns number of partitions + +# You can control partitioning in some data sources +df = spark.read.format("fake") \ + .option("numPartitions", "8") \ + .load() +``` + +## Troubleshooting + +### Common Issues + +1. **Missing Dependencies** + ```bash + # Install specific extras + pip install pyspark-data-sources[faker,huggingface,kaggle] + ``` + +2. **API Rate Limits** + - GitHub: 60 requests/hour (unauthenticated) + - Alpha Vantage: 5 requests/minute (free tier) + - Use caching or implement retry logic + +3. **Network Issues** + - Most data sources require internet access + - Check firewall/proxy settings + - Some sources support offline mode after initial cache + +4. **Memory Issues with Large Datasets** + - Use partitioning for large datasets + - Consider sampling: `df.sample(0.1)` + - Increase Spark executor memory + +For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub. \ No newline at end of file diff --git a/docs/datasources/arrow.md b/docs/datasources/arrow.md deleted file mode 100644 index c64d848..0000000 --- a/docs/datasources/arrow.md +++ /dev/null @@ -1,6 +0,0 @@ -# ArrowDataSource - -> Requires the [`PyArrow`](https://arrow.apache.org/docs/python/) library. You can install it manually: `pip install pyarrow` -> or use `pip install pyspark-data-sources[arrow]`. - -::: pyspark_datasources.arrow.ArrowDataSource diff --git a/docs/datasources/fake.md b/docs/datasources/fake.md deleted file mode 100644 index acb3ddc..0000000 --- a/docs/datasources/fake.md +++ /dev/null @@ -1,6 +0,0 @@ -# FakeDataSource - -> Requires the [`Faker`](https://github.com/joke2k/faker) library. You can install it manually: `pip install faker` -> or use `pip install pyspark-data-sources[faker]`. - -::: pyspark_datasources.fake.FakeDataSource diff --git a/docs/datasources/github.md b/docs/datasources/github.md deleted file mode 100644 index 22daa7f..0000000 --- a/docs/datasources/github.md +++ /dev/null @@ -1,3 +0,0 @@ -# GithubDataSource - -::: pyspark_datasources.github.GithubDataSource diff --git a/docs/datasources/googlesheets.md b/docs/datasources/googlesheets.md deleted file mode 100644 index 084191b..0000000 --- a/docs/datasources/googlesheets.md +++ /dev/null @@ -1,3 +0,0 @@ -# GoogleSheetsDataSource - -::: pyspark_datasources.googlesheets.GoogleSheetsDataSource diff --git a/docs/datasources/huggingface.md b/docs/datasources/huggingface.md deleted file mode 100644 index f4937ab..0000000 --- a/docs/datasources/huggingface.md +++ /dev/null @@ -1,5 +0,0 @@ -# HuggingFaceDatasets - -> Requires the [`datasets`](https://huggingface.co/docs/datasets/en/index) library. - -::: pyspark_datasources.huggingface.HuggingFaceDatasets diff --git a/docs/datasources/jsonplaceholder.md b/docs/datasources/jsonplaceholder.md deleted file mode 100644 index a175dd9..0000000 --- a/docs/datasources/jsonplaceholder.md +++ /dev/null @@ -1,3 +0,0 @@ -# JSONPlaceholderDataSource - -::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource \ No newline at end of file diff --git a/docs/datasources/kaggle.md b/docs/datasources/kaggle.md deleted file mode 100644 index b031ad0..0000000 --- a/docs/datasources/kaggle.md +++ /dev/null @@ -1,5 +0,0 @@ -# KaggleDataSource - -> Requires the [`kagglehub`](https://github.com/Kaggle/kagglehub) library. - -::: pyspark_datasources.kaggle.KaggleDataSource diff --git a/docs/datasources/lance.md b/docs/datasources/lance.md deleted file mode 100644 index e6c7848..0000000 --- a/docs/datasources/lance.md +++ /dev/null @@ -1,6 +0,0 @@ -# LanceSink - -> Requires the [`Lance`](https://lancedb.github.io/lance/) library. You can install it manually: `pip install lance` -> or use `pip install pyspark-data-sources[lance]`. - -::: pyspark_datasources.lance.LanceSink diff --git a/docs/datasources/opensky.md b/docs/datasources/opensky.md deleted file mode 100644 index f611186..0000000 --- a/docs/datasources/opensky.md +++ /dev/null @@ -1,5 +0,0 @@ -# OpenSkyDataSource - -> No additional dependencies required. Uses the OpenSky Network REST API for real-time aircraft tracking data. - -::: pyspark_datasources.opensky.OpenSkyDataSource diff --git a/docs/datasources/robinhood.md b/docs/datasources/robinhood.md deleted file mode 100644 index ccfb33e..0000000 --- a/docs/datasources/robinhood.md +++ /dev/null @@ -1,6 +0,0 @@ -# RobinhoodDataSource - -> Requires the [`pynacl`](https://github.com/pyca/pynacl) library for cryptographic signing. You can install it manually: `pip install pynacl` -> or use `pip install pyspark-data-sources[robinhood]`. - -::: pyspark_datasources.robinhood.RobinhoodDataSource diff --git a/docs/datasources/salesforce.md b/docs/datasources/salesforce.md deleted file mode 100644 index 688b61c..0000000 --- a/docs/datasources/salesforce.md +++ /dev/null @@ -1,6 +0,0 @@ -# SalesforceDataSource - -> Requires the [`simple-salesforce`](https://github.com/simple-salesforce/simple-salesforce) library. You can install it manually: `pip install simple-salesforce` -> or use `pip install pyspark-data-sources[salesforce]`. - -::: pyspark_datasources.salesforce.SalesforceDataSource \ No newline at end of file diff --git a/docs/datasources/simplejson.md b/docs/datasources/simplejson.md deleted file mode 100644 index c72e846..0000000 --- a/docs/datasources/simplejson.md +++ /dev/null @@ -1,3 +0,0 @@ -# SimpleJsonDataSource - -::: pyspark_datasources.simplejson.SimpleJsonDataSource diff --git a/docs/datasources/stock.md b/docs/datasources/stock.md deleted file mode 100644 index b6b506e..0000000 --- a/docs/datasources/stock.md +++ /dev/null @@ -1,3 +0,0 @@ -# StockDataSource - -::: pyspark_datasources.stock.StockDataSource \ No newline at end of file diff --git a/docs/datasources/weather.md b/docs/datasources/weather.md deleted file mode 100644 index f7f5258..0000000 --- a/docs/datasources/weather.md +++ /dev/null @@ -1,5 +0,0 @@ -# WeatherDataSource - -> No additional dependencies required. Uses the Tomorrow.io API for weather data. Requires an API key. - -::: pyspark_datasources.weather.WeatherDataSource diff --git a/docs/index.md b/docs/index.md deleted file mode 100644 index ed53cd7..0000000 --- a/docs/index.md +++ /dev/null @@ -1,45 +0,0 @@ -# PySpark Data Sources - -Custom Spark data sources for reading and writing data in Apache Spark, using the Python Data Source API. - -## Installation - -```bash -pip install pyspark-data-sources -``` - -If you want to install all extra dependencies, use: - -```bash -pip install pyspark-data-sources[all] -``` - -## Usage - -```python -from pyspark_datasources.fake import FakeDataSource - -# Register the data source -spark.dataSource.register(FakeDataSource) - -spark.read.format("fake").load().show() - -# For streaming data generation -spark.readStream.format("fake").load().writeStream.format("console").start() -``` - - -## Data Sources - -| Data Source | Short Name | Description | Dependencies | -| ------------------------------------------------------- | -------------- | --------------------------------------------- | --------------------- | -| [GithubDataSource](./datasources/github.md) | `github` | Read pull requests from a Github repository | None | -| [FakeDataSource](./datasources/fake.md) | `fake` | Generate fake data using the `Faker` library | `faker` | -| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` | -| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None | -| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | -| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | -| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | -| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | -| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` | -| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml deleted file mode 100644 index 6d0d8c0..0000000 --- a/mkdocs.yml +++ /dev/null @@ -1,44 +0,0 @@ -# yaml-language-server: $schema=https://squidfunk.github.io/mkdocs-material/schema.json - -site_name: PySpark Data Sources -site_url: https://allisonwang-db.github.io/pyspark-data-sources -repo_url: https://github.com/allisonwang-db/pyspark-data-sources -theme: - name: material - -plugins: - - mkdocstrings: - default_handler: python - handlers: - python: - options: - docstring_style: numpy - - search - -nav: - - Index: index.md - - Data Sources: - - datasources/github.md - - datasources/fake.md - - datasources/huggingface.md - - datasources/stock.md - - datasources/simplejson.md - - datasources/salesforce.md - - datasources/googlesheets.md - - datasources/kaggle.md - - datasources/jsonplaceholder.md - - datasources/robinhood.md - -markdown_extensions: - - pymdownx.highlight: - anchor_linenums: true - - pymdownx.inlinehilite - - pymdownx.snippets - - admonition - - pymdownx.arithmatex: - generic: true - - footnotes - - pymdownx.details - - pymdownx.superfences - - pymdownx.mark - - attr_list