diff --git a/docs/datasources/jsonplaceholder.md b/docs/datasources/jsonplaceholder.md new file mode 100644 index 0000000..a175dd9 --- /dev/null +++ b/docs/datasources/jsonplaceholder.md @@ -0,0 +1,3 @@ +# JSONPlaceholderDataSource + +::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index bec6116..9aa6888 100644 --- a/docs/index.md +++ b/docs/index.md @@ -41,3 +41,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | | [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | | [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | +| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | +| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | diff --git a/mkdocs.yml b/mkdocs.yml index 8fde0b5..16b3ceb 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -26,6 +26,7 @@ nav: - datasources/salesforce.md - datasources/googlesheets.md - datasources/kaggle.md + - datasources/jsonplaceholder.md markdown_extensions: - pymdownx.highlight: diff --git a/poetry.lock b/poetry.lock index 5fcc9e3..56b452f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" diff --git a/pyspark_datasources/__init__.py b/pyspark_datasources/__init__.py index 3076200..e1d1f18 100644 --- a/pyspark_datasources/__init__.py +++ b/pyspark_datasources/__init__.py @@ -8,3 +8,4 @@ from .salesforce import SalesforceDataSource from .simplejson import SimpleJsonDataSource from .stock import StockDataSource +from .jsonplaceholder import JSONPlaceholderDataSource diff --git a/pyspark_datasources/jsonplaceholder.py b/pyspark_datasources/jsonplaceholder.py new file mode 100644 index 0000000..42edf17 --- /dev/null +++ b/pyspark_datasources/jsonplaceholder.py @@ -0,0 +1,224 @@ +from typing import Dict, Any, List, Iterator +import requests +from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition +from pyspark.sql.types import StructType +from pyspark.sql import Row + + +class JSONPlaceholderDataSource(DataSource): + """ + A PySpark data source for JSONPlaceholder API. + + JSONPlaceholder is a free fake REST API for testing and prototyping. + This data source provides access to posts, users, todos, comments, albums, and photos. + + Supported endpoints: + - posts: Blog posts with userId, id, title, body + - users: User profiles with complete information + - todos: Todo items with userId, id, title, completed + - comments: Comments with postId, id, name, email, body + - albums: Albums with userId, id, title + - photos: Photos with albumId, id, title, url, thumbnailUrl + + Name: `jsonplaceholder` + + Examples + -------- + Register the data source: + + >>> spark.dataSource.register(JSONPlaceholderDataSource) + + Read posts (default): + + >>> spark.read.format("jsonplaceholder").load().show() + + Read users: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show() + + Read with limit: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show() + + Read specific item: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show() + + Referential Integrity + ------------------- + The data source supports joining related datasets: + + 1. Posts and Users relationship: + posts.userId = users.id + >>> posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + >>> users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() + >>> posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id) + + 2. Posts and Comments relationship: + comments.postId = posts.id + >>> comments_df = spark.read.format("jsonplaceholder").option("endpoint", "comments").load() + >>> posts_with_comments = posts_df.join(comments_df, posts_df.id == comments_df.postId) + + 3. Users, Albums and Photos relationship: + albums.userId = users.id + photos.albumId = albums.id + >>> albums_df = spark.read.format("jsonplaceholder").option("endpoint", "albums").load() + >>> photos_df = spark.read.format("jsonplaceholder").option("endpoint", "photos").load() + >>> user_albums = users_df.join(albums_df, users_df.id == albums_df.userId) + >>> user_photos = user_albums.join(photos_df, albums_df.id == photos_df.albumId) + """ + + @classmethod + def name(cls) -> str: + return "jsonplaceholder" + + def __init__(self, options=None): + self.options = options or {} + + def schema(self) -> str: + """ Returns the schema for the selected endpoint.""" + schemas = { + "posts": "userId INT, id INT, title STRING, body STRING", + "users": ("id INT, name STRING, username STRING, email STRING, phone STRING, " + "website STRING, address_street STRING, address_suite STRING, " + "address_city STRING, address_zipcode STRING, address_geo_lat STRING, " + "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, " + "company_bs STRING"), + "todos": "userId INT, id INT, title STRING, completed BOOLEAN", + "comments": "postId INT, id INT, name STRING, email STRING, body STRING", + "albums": "userId INT, id INT, title STRING", + "photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING" + } + + endpoint = self.options.get("endpoint", "posts") + return schemas.get(endpoint, schemas["posts"]) + + def reader(self, schema: StructType) -> DataSourceReader: + return JSONPlaceholderReader(self.options) + + +class JSONPlaceholderReader(DataSourceReader): + """Reader implementation for JSONPlaceholder API""" + + def __init__(self, options: Dict[str, str]): + self.options = options + self.base_url = "https://jsonplaceholder.typicode.com" + + self.endpoint = self.options.get("endpoint", "posts") + self.limit = self.options.get("limit") + self.id = self.options.get("id") + + def partitions(self) -> List[InputPartition]: + return [InputPartition(0)] + + def read(self, partition: InputPartition) -> Iterator[Row]: + url = f"{self.base_url}/{self.endpoint}" + + if self.id: + url += f"/{self.id}" + + params = {} + if self.limit and not self.id: + params["_limit"] = self.limit + + try: + response = requests.get(url, params=params, timeout=30) + response.raise_for_status() + + data = response.json() + + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + data = [] + + return iter([self._process_item(item) for item in data]) + + except requests.RequestException as e: + print(f"Failed to fetch data from {url}: {e}") + return iter([]) + except ValueError as e: + print(f"Failed to parse JSON from {url}: {e}") + return iter([]) + except Exception as e: + print(f"Unexpected error while reading data: {e}") + return iter([]) + + def _process_item(self, item: Dict[str, Any]) -> Row: + """Process individual items based on endpoint type""" + + def _process_posts(item): + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", ""), + body=item.get("body", "") + ) + + def _process_users(item): + address = item.get("address", {}) + geo = address.get("geo", {}) + company = item.get("company", {}) + + return Row( + id=item.get("id"), + name=item.get("name", ""), + username=item.get("username", ""), + email=item.get("email", ""), + phone=item.get("phone", ""), + website=item.get("website", ""), + address_street=address.get("street", ""), + address_suite=address.get("suite", ""), + address_city=address.get("city", ""), + address_zipcode=address.get("zipcode", ""), + address_geo_lat=geo.get("lat", ""), + address_geo_lng=geo.get("lng", ""), + company_name=company.get("name", ""), + company_catchPhrase=company.get("catchPhrase", ""), + company_bs=company.get("bs", "") + ) + + def _process_todos(item): + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", ""), + completed=item.get("completed", False) + ) + + def _process_comments(item): + return Row( + postId=item.get("postId"), + id=item.get("id"), + name=item.get("name", ""), + email=item.get("email", ""), + body=item.get("body", "") + ) + + def _process_albums(item): + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", "") + ) + + def _process_photos(item): + return Row( + albumId=item.get("albumId"), + id=item.get("id"), + title=item.get("title", ""), + url=item.get("url", ""), + thumbnailUrl=item.get("thumbnailUrl", "") + ) + + processors = { + "posts": _process_posts, + "users": _process_users, + "todos": _process_todos, + "comments": _process_comments, + "albums": _process_albums, + "photos": _process_photos + } + + processor = processors.get(self.endpoint, _process_posts) + return processor(item) \ No newline at end of file diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 69924f4..94c382b 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -72,7 +72,6 @@ def test_opensky_datasource_stream(spark): assert len(result.columns) == 18 # Check schema has expected number of fields assert result.count() > 0 # Verify we got some data - def test_salesforce_datasource_registration(spark): """Test that Salesforce DataSource can be registered and validates required options.""" spark.dataSource.register(SalesforceDataSource) @@ -176,3 +175,17 @@ def test_arrow_datasource_multiple_files(spark): rows = df.collect() names = {row["name"] for row in rows} assert names == {"Alice", "Bob", "Charlie", "Diana"} + +def test_jsonplaceholder_posts(spark): + spark.dataSource.register(JSONPlaceholderDataSource) + posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + assert posts_df.count() > 0 # Ensure we have some posts + + +def test_jsonplaceholder_referential_integrity(spark): + spark.dataSource.register(JSONPlaceholderDataSource) + users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() + assert users_df.count() > 0 # Ensure we have some users + posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id) + assert posts_with_authors.count() > 0 # Ensure join is valid and we have posts with authors