1+ # pyspark_datasources/jsonplaceholder.py
2+
3+ from typing import Dict , Any , List , Iterator
4+ import requests
5+ from pyspark .sql .datasource import DataSource , DataSourceReader , InputPartition
6+ from pyspark .sql .types import StructType
7+ from pyspark .sql import Row
8+
9+
10+ class JSONPlaceholderDataSource (DataSource ):
11+ """
12+ A PySpark data source for JSONPlaceholder API.
13+
14+ JSONPlaceholder is a free fake REST API for testing and prototyping.
15+ This data source provides access to posts, users, todos, comments, albums, and photos.
16+
17+ Supported endpoints:
18+ - posts: Blog posts with userId, id, title, body
19+ - users: User profiles with complete information
20+ - todos: Todo items with userId, id, title, completed
21+ - comments: Comments with postId, id, name, email, body
22+ - albums: Albums with userId, id, title
23+ - photos: Photos with albumId, id, title, url, thumbnailUrl
24+
25+ Name: `jsonplaceholder`
26+
27+ Examples
28+ --------
29+ Register the data source:
30+
31+ >>> spark.dataSource.register(JSONPlaceholderDataSource)
32+
33+ Read posts (default):
34+
35+ >>> spark.read.format("jsonplaceholder").load().show()
36+
37+ Read users:
38+
39+ >>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show()
40+
41+ Read with limit:
42+
43+ >>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show()
44+
45+ Read specific item:
46+
47+ >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show()
48+ """
49+
50+ @classmethod
51+ def name (cls ) -> str :
52+ return "jsonplaceholder"
53+
54+ def __init__ (self , options = None ):
55+ self .options = options or {}
56+
57+ def schema (self ) -> str :
58+ endpoint = self .options .get ("endpoint" , "posts" )
59+
60+ if endpoint == "posts" :
61+ return "userId INT, id INT, title STRING, body STRING"
62+ elif endpoint == "users" :
63+ return ("id INT, name STRING, username STRING, email STRING, phone STRING, "
64+ "website STRING, address_street STRING, address_suite STRING, "
65+ "address_city STRING, address_zipcode STRING, address_geo_lat STRING, "
66+ "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, "
67+ "company_bs STRING" )
68+ elif endpoint == "todos" :
69+ return "userId INT, id INT, title STRING, completed BOOLEAN"
70+ elif endpoint == "comments" :
71+ return "postId INT, id INT, name STRING, email STRING, body STRING"
72+ elif endpoint == "albums" :
73+ return "userId INT, id INT, title STRING"
74+ elif endpoint == "photos" :
75+ return "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING"
76+ else :
77+ return "userId INT, id INT, title STRING, body STRING"
78+
79+ def reader (self , schema : StructType ) -> DataSourceReader :
80+ return JSONPlaceholderReader (self .options )
81+
82+
83+ class JSONPlaceholderReader (DataSourceReader ):
84+ """Reader implementation for JSONPlaceholder API"""
85+
86+ def __init__ (self , options : Dict [str , str ]):
87+ self .options = options
88+ self .base_url = "https://jsonplaceholder.typicode.com"
89+
90+ self .endpoint = self .options .get ("endpoint" , "posts" )
91+ self .limit = self .options .get ("limit" )
92+ self .id = self .options .get ("id" )
93+
94+ def partitions (self ) -> List [InputPartition ]:
95+ return [InputPartition (0 )]
96+
97+ def read (self , partition : InputPartition ) -> Iterator [Row ]:
98+ url = f"{ self .base_url } /{ self .endpoint } "
99+
100+ if self .id :
101+ url += f"/{ self .id } "
102+
103+ params = {}
104+ if self .limit and not self .id :
105+ params ["_limit" ] = self .limit
106+
107+ try :
108+ response = requests .get (url , params = params , timeout = 30 )
109+ response .raise_for_status ()
110+
111+ data = response .json ()
112+
113+ if isinstance (data , dict ):
114+ data = [data ]
115+ elif not isinstance (data , list ):
116+ data = []
117+
118+ processed_data = []
119+ for item in data :
120+ processed_item = self ._process_item (item )
121+ processed_data .append (processed_item )
122+
123+ return iter (processed_data )
124+
125+ except Exception :
126+ return iter ([])
127+
128+ def _process_item (self , item : Dict [str , Any ]) -> Row :
129+ """Process individual items based on endpoint type"""
130+
131+ if self .endpoint == "posts" :
132+ return Row (
133+ userId = item .get ("userId" ),
134+ id = item .get ("id" ),
135+ title = item .get ("title" , "" ),
136+ body = item .get ("body" , "" )
137+ )
138+
139+ elif self .endpoint == "users" :
140+ address = item .get ("address" , {})
141+ geo = address .get ("geo" , {})
142+ company = item .get ("company" , {})
143+
144+ return Row (
145+ id = item .get ("id" ),
146+ name = item .get ("name" , "" ),
147+ username = item .get ("username" , "" ),
148+ email = item .get ("email" , "" ),
149+ phone = item .get ("phone" , "" ),
150+ website = item .get ("website" , "" ),
151+ address_street = address .get ("street" , "" ),
152+ address_suite = address .get ("suite" , "" ),
153+ address_city = address .get ("city" , "" ),
154+ address_zipcode = address .get ("zipcode" , "" ),
155+ address_geo_lat = geo .get ("lat" , "" ),
156+ address_geo_lng = geo .get ("lng" , "" ),
157+ company_name = company .get ("name" , "" ),
158+ company_catchPhrase = company .get ("catchPhrase" , "" ),
159+ company_bs = company .get ("bs" , "" )
160+ )
161+
162+ elif self .endpoint == "todos" :
163+ return Row (
164+ userId = item .get ("userId" ),
165+ id = item .get ("id" ),
166+ title = item .get ("title" , "" ),
167+ completed = item .get ("completed" , False )
168+ )
169+
170+ elif self .endpoint == "comments" :
171+ return Row (
172+ postId = item .get ("postId" ),
173+ id = item .get ("id" ),
174+ name = item .get ("name" , "" ),
175+ email = item .get ("email" , "" ),
176+ body = item .get ("body" , "" )
177+ )
178+
179+ elif self .endpoint == "albums" :
180+ return Row (
181+ userId = item .get ("userId" ),
182+ id = item .get ("id" ),
183+ title = item .get ("title" , "" )
184+ )
185+
186+ elif self .endpoint == "photos" :
187+ return Row (
188+ albumId = item .get ("albumId" ),
189+ id = item .get ("id" ),
190+ title = item .get ("title" , "" ),
191+ url = item .get ("url" , "" ),
192+ thumbnailUrl = item .get ("thumbnailUrl" , "" )
193+ )
194+
195+ else :
196+ return Row (
197+ userId = item .get ("userId" ),
198+ id = item .get ("id" ),
199+ title = item .get ("title" , "" ),
200+ body = item .get ("body" , "" )
201+ )
0 commit comments