1+ from typing import Dict , Any , List , Iterator
2+ import requests
3+ from pyspark .sql .datasource import DataSource , DataSourceReader , InputPartition
4+ from pyspark .sql .types import StructType
5+ from pyspark .sql import Row
6+
7+
8+ class JSONPlaceholderDataSource (DataSource ):
9+ """
10+ A PySpark data source for JSONPlaceholder API.
11+
12+ JSONPlaceholder is a free fake REST API for testing and prototyping.
13+ This data source provides access to posts, users, todos, comments, albums, and photos.
14+
15+ Supported endpoints:
16+ - posts: Blog posts with userId, id, title, body
17+ - users: User profiles with complete information
18+ - todos: Todo items with userId, id, title, completed
19+ - comments: Comments with postId, id, name, email, body
20+ - albums: Albums with userId, id, title
21+ - photos: Photos with albumId, id, title, url, thumbnailUrl
22+
23+ Name: `jsonplaceholder`
24+
25+ Examples
26+ --------
27+ Register the data source:
28+
29+ >>> spark.dataSource.register(JSONPlaceholderDataSource)
30+
31+ Read posts (default):
32+
33+ >>> spark.read.format("jsonplaceholder").load().show()
34+
35+ Read users:
36+
37+ >>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show()
38+
39+ Read with limit:
40+
41+ >>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()
42+
43+ Read specific item:
44+
45+ >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show()
46+
47+ Referential Integrity
48+ -------------------
49+ The data source supports joining related datasets:
50+
51+ 1. Posts and Users relationship:
52+ posts.userId = users.id
53+ >>> posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load()
54+ >>> users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load()
55+ >>> posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id)
56+
57+ 2. Posts and Comments relationship:
58+ comments.postId = posts.id
59+ >>> comments_df = spark.read.format("jsonplaceholder").option("endpoint", "comments").load()
60+ >>> posts_with_comments = posts_df.join(comments_df, posts_df.id == comments_df.postId)
61+
62+ 3. Users, Albums and Photos relationship:
63+ albums.userId = users.id
64+ photos.albumId = albums.id
65+ >>> albums_df = spark.read.format("jsonplaceholder").option("endpoint", "albums").load()
66+ >>> photos_df = spark.read.format("jsonplaceholder").option("endpoint", "photos").load()
67+ >>> user_albums = users_df.join(albums_df, users_df.id == albums_df.userId)
68+ >>> user_photos = user_albums.join(photos_df, albums_df.id == photos_df.albumId)
69+ """
70+
71+ @classmethod
72+ def name (cls ) -> str :
73+ return "jsonplaceholder"
74+
75+ def __init__ (self , options = None ):
76+ self .options = options or {}
77+
78+ def schema (self ) -> str :
79+ """ Returns the schema for the selected endpoint."""
80+ schemas = {
81+ "posts" : "userId INT, id INT, title STRING, body STRING" ,
82+ "users" : ("id INT, name STRING, username STRING, email STRING, phone STRING, "
83+ "website STRING, address_street STRING, address_suite STRING, "
84+ "address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
85+ "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
86+ "company_bs STRING" ),
87+ "todos" : "userId INT, id INT, title STRING, completed BOOLEAN" ,
88+ "comments" : "postId INT, id INT, name STRING, email STRING, body STRING" ,
89+ "albums" : "userId INT, id INT, title STRING" ,
90+ "photos" : "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
91+ }
92+
93+ endpoint = self .options .get ("endpoint" , "posts" )
94+ return schemas .get (endpoint , schemas ["posts" ])
95+
96+ def reader (self , schema : StructType ) -> DataSourceReader :
97+ return JSONPlaceholderReader (self .options )
98+
99+
100+ class JSONPlaceholderReader (DataSourceReader ):
101+ """Reader implementation for JSONPlaceholder API"""
102+
103+ def __init__ (self , options : Dict [str , str ]):
104+ self .options = options
105+ self .base_url = "https://jsonplaceholder.typicode.com"
106+
107+ self .endpoint = self .options .get ("endpoint" , "posts" )
108+ self .limit = self .options .get ("limit" )
109+ self .id = self .options .get ("id" )
110+
111+ def partitions (self ) -> List [InputPartition ]:
112+ return [InputPartition (0 )]
113+
114+ def read (self , partition : InputPartition ) -> Iterator [Row ]:
115+ url = f"{ self .base_url } /{ self .endpoint } "
116+
117+ if self .id :
118+ url += f"/{ self .id } "
119+
120+ params = {}
121+ if self .limit and not self .id :
122+ params ["_limit" ] = self .limit
123+
124+ try :
125+ response = requests .get (url , params = params , timeout = 30 )
126+ response .raise_for_status ()
127+
128+ data = response .json ()
129+
130+ if isinstance (data , dict ):
131+ data = [data ]
132+ elif not isinstance (data , list ):
133+ data = []
134+
135+ return iter ([self ._process_item (item ) for item in data ])
136+
137+ except requests .RequestException as e :
138+ print (f"Failed to fetch data from { url } : { e } " )
139+ return iter ([])
140+ except ValueError as e :
141+ print (f"Failed to parse JSON from { url } : { e } " )
142+ return iter ([])
143+ except Exception as e :
144+ print (f"Unexpected error while reading data: { e } " )
145+ return iter ([])
146+
147+ def _process_item (self , item : Dict [str , Any ]) -> Row :
148+ """Process individual items based on endpoint type"""
149+
150+ def _process_posts (item ):
151+ return Row (
152+ userId = item .get ("userId" ),
153+ id = item .get ("id" ),
154+ title = item .get ("title" , "" ),
155+ body = item .get ("body" , "" )
156+ )
157+
158+ def _process_users (item ):
159+ address = item .get ("address" , {})
160+ geo = address .get ("geo" , {})
161+ company = item .get ("company" , {})
162+
163+ return Row (
164+ id = item .get ("id" ),
165+ name = item .get ("name" , "" ),
166+ username = item .get ("username" , "" ),
167+ email = item .get ("email" , "" ),
168+ phone = item .get ("phone" , "" ),
169+ website = item .get ("website" , "" ),
170+ address_street = address .get ("street" , "" ),
171+ address_suite = address .get ("suite" , "" ),
172+ address_city = address .get ("city" , "" ),
173+ address_zipcode = address .get ("zipcode" , "" ),
174+ address_geo_lat = geo .get ("lat" , "" ),
175+ address_geo_lng = geo .get ("lng" , "" ),
176+ company_name = company .get ("name" , "" ),
177+ company_catchPhrase = company .get ("catchPhrase" , "" ),
178+ company_bs = company .get ("bs" , "" )
179+ )
180+
181+ def _process_todos (item ):
182+ return Row (
183+ userId = item .get ("userId" ),
184+ id = item .get ("id" ),
185+ title = item .get ("title" , "" ),
186+ completed = item .get ("completed" , False )
187+ )
188+
189+ def _process_comments (item ):
190+ return Row (
191+ postId = item .get ("postId" ),
192+ id = item .get ("id" ),
193+ name = item .get ("name" , "" ),
194+ email = item .get ("email" , "" ),
195+ body = item .get ("body" , "" )
196+ )
197+
198+ def _process_albums (item ):
199+ return Row (
200+ userId = item .get ("userId" ),
201+ id = item .get ("id" ),
202+ title = item .get ("title" , "" )
203+ )
204+
205+ def _process_photos (item ):
206+ return Row (
207+ albumId = item .get ("albumId" ),
208+ id = item .get ("id" ),
209+ title = item .get ("title" , "" ),
210+ url = item .get ("url" , "" ),
211+ thumbnailUrl = item .get ("thumbnailUrl" , "" )
212+ )
213+
214+ processors = {
215+ "posts" : _process_posts ,
216+ "users" : _process_users ,
217+ "todos" : _process_todos ,
218+ "comments" : _process_comments ,
219+ "albums" : _process_albums ,
220+ "photos" : _process_photos
221+ }
222+
223+ processor = processors .get (self .endpoint , _process_posts )
224+ return processor (item )
0 commit comments