11use parking_lot:: Mutex ;
2- use serde_json:: { Value , json} ;
3- use std:: cell:: RefCell ;
4- use std:: fs;
5- use std:: io:: { Error as IoError , ErrorKind } ;
2+ use serde_json:: Value ;
3+ use std:: fs:: { self , File } ;
4+ use std:: io:: { self , BufWriter , Write } ;
65use std:: thread;
76
87use crossbeam_channel:: { Receiver , Sender , unbounded} ;
8+ use simd_json:: OwnedValue ; // Use simd_json's Value type
9+ use simd_json:: serde:: to_borrowed_value; // For converting serde_json::Value
910
10- // Thread-local buffer to reuse across saves
11+ /// BufferWriter writes data into a preallocated Vec<u8>.
12+ struct BufferWriter < ' a > {
13+ buf : & ' a mut Vec < u8 > ,
14+ }
15+
16+ impl < ' a > BufferWriter < ' a > {
17+ fn new ( buf : & ' a mut Vec < u8 > ) -> Self {
18+ BufferWriter { buf }
19+ }
20+ }
21+
22+ impl < ' a > Write for BufferWriter < ' a > {
23+ fn write ( & mut self , data : & [ u8 ] ) -> io:: Result < usize > {
24+ self . buf . extend_from_slice ( data) ;
25+ Ok ( data. len ( ) )
26+ }
27+ fn flush ( & mut self ) -> io:: Result < ( ) > {
28+ Ok :: < ( ) , io:: Error > ( ( ) )
29+ }
30+ }
31+
32+ /// Thread-local buffer for serialization.
1133thread_local ! {
12- static SERIALIZE_BUF : RefCell <Vec <u8 >> = RefCell :: new( Vec :: with_capacity( 1024 * 64 ) ) ; // 64 KB buffer
34+ static SERIALIZE_BUF : std :: cell :: RefCell <Vec <u8 >> = std :: cell :: RefCell :: new( Vec :: with_capacity( 64 * 1024 ) ) ; // 64 KB buffer
1335}
1436
1537/// JsonMutexDB provides thread-safe access to a JSON file acting as a simple database.
16- /// It supports asynchronous (batched) updates and fast serialization (using simd-json)
17- /// in compact mode.
38+ /// It supports asynchronous (batched) updates and fast serialization.
1839pub struct JsonMutexDB {
1940 /// A lightweight mutex protecting the in-memory JSON data.
2041 data : Mutex < Value > ,
2142 /// The path to the JSON file on disk.
2243 path : String ,
2344 /// Whether to pretty-print when saving.
2445 pretty : bool ,
25- /// Whether to use fast serialization (simd-json) when in compact mode .
46+ /// Whether to use fast serialization (simulated here by using our preallocated writer) .
2647 fast_serialization : bool ,
2748 /// If asynchronous updates are enabled, this channel is used to send update closures.
2849 update_sender : Option < Sender < Box < dyn FnOnce ( & mut Value ) + Send > > > ,
@@ -34,20 +55,19 @@ impl JsonMutexDB {
3455 /// Creates a new instance of JsonMutexDB.
3556 ///
3657 /// * `path` - path to the JSON file
37- /// * `pretty` - if true, saved JSON will be human-readable (pretty printed).
38- /// Note: fast serialization using simd-json is only used if `pretty` is false.
58+ /// * `pretty` - if true, saved JSON will be pretty printed.
3959 /// * `async_updates` - if true, update calls are enqueued to a background thread.
40- /// * `fast_serialization` - if true and in compact mode, uses simd-json for serialization .
60+ /// * `fast_serialization` - if true and in compact mode, uses a preallocated writer to avoid extra allocations .
4161 pub fn new (
4262 path : & str ,
4363 pretty : bool ,
4464 async_updates : bool ,
4565 fast_serialization : bool ,
46- ) -> Result < Self , IoError > {
66+ ) -> io :: Result < Self > {
4767 let json = match fs:: read_to_string ( path) {
4868 Ok ( content) => serde_json:: from_str :: < Value > ( & content)
49- . map_err ( |_| IoError :: new ( ErrorKind :: InvalidData , "Invalid JSON content" ) ) ?,
50- Err ( err) if err. kind ( ) == ErrorKind :: NotFound => {
69+ . map_err ( |_| io :: Error :: new ( io :: ErrorKind :: InvalidData , "Invalid JSON content" ) ) ?,
70+ Err ( err) if err. kind ( ) == io :: ErrorKind :: NotFound => {
5171 // File doesn't exist: start with an empty JSON object.
5272 Value :: Object ( serde_json:: Map :: new ( ) )
5373 }
@@ -59,28 +79,12 @@ impl JsonMutexDB {
5979 Sender < Box < dyn FnOnce ( & mut Value ) + Send > > ,
6080 Receiver < Box < dyn FnOnce ( & mut Value ) + Send > > ,
6181 ) = unbounded ( ) ;
62- // We'll spawn a background thread that applies all enqueued updates.
63- // Note: For simplicity, we do not implement a sophisticated shutdown.
64- let data_mutex = Mutex :: new ( ( ) ) ; // dummy mutex to capture ordering in the closure
65- let path_str = path. to_string ( ) ;
66- let handle = thread:: spawn ( {
67- // We capture a pointer to the same JSON data (we'll borrow it via a reference).
68- // Safety: This thread will be the sole executor of queued updates.
69- let data_ref = Mutex :: new ( json. clone ( ) ) ;
70- // We wrap data_ref in a parking_lot::Mutex that we own (the same one in the struct).
71- move || {
72- // Loop until the channel is closed.
73- for update in rx {
74- // We simply lock the global data mutex from the main struct.
75- // SAFETY: The background thread must coordinate with synchronous callers.
76- // In this design, all mutations occur via either this thread or direct lock.
77- // (In a production design, you’d want a more robust design for ordering.)
78- unsafe {
79- // Reinterpret the raw pointer back into a reference.
80- let mut data = data_ref. lock ( ) ;
81- update ( & mut * data) ;
82- }
83- }
82+ let json_for_thread = json. clone ( ) ; // Clone the JSON for the thread.
83+ // Spawn a background thread that applies all enqueued updates.
84+ let handle = thread:: spawn ( move || {
85+ for update in rx {
86+ // In a production system, you’d want more robust ordering.
87+ update ( & mut json_for_thread. clone ( ) ) ;
8488 }
8589 } ) ;
8690 ( Some ( tx) , Some ( handle) )
@@ -99,23 +103,19 @@ impl JsonMutexDB {
99103 }
100104
101105 /// Returns a clone of the in-memory JSON data.
102- /// Note: When using async updates, queued updates may not yet be applied.
106+ /// ( Note: When using async updates, queued updates may not yet be applied.)
103107 pub fn get ( & self ) -> Value {
104- // (For simplicity we do not flush pending async updates here.)
105108 let data_guard = self . data . lock ( ) ;
106109 data_guard. clone ( )
107110 }
108111
109112 /// Updates the in-memory JSON data using the provided closure.
110- /// If async_updates is enabled, the update is enqueued and the call returns immediately.
111- /// Otherwise, the update is applied synchronously.
113+ /// If async_updates is enabled, the update is enqueued; otherwise, it is applied synchronously.
112114 pub fn update < F > ( & self , update_fn : F )
113115 where
114116 F : FnOnce ( & mut Value ) + Send + ' static ,
115117 {
116118 if let Some ( ref sender) = self . update_sender {
117- // Enqueue the update.
118- // In a production system, you might want to handle errors here.
119119 sender
120120 . send ( Box :: new ( update_fn) )
121121 . expect ( "Failed to send update" ) ;
@@ -126,84 +126,49 @@ impl JsonMutexDB {
126126 }
127127
128128 /// Synchronously saves the current in-memory JSON data to the file on disk.
129- /// The JSON is saved in either pretty printed or compact format based on configuration .
130- pub fn save_sync ( & self ) -> Result < ( ) , IoError > {
129+ /// In compact mode with fast_serialization enabled, it reuses a preallocated buffer to avoid extra allocations .
130+ pub fn save_sync ( & self ) -> io :: Result < ( ) > {
131131 let data_guard = self . data . lock ( ) ;
132132 let json_data = & * data_guard;
133133
134- let content = if self . pretty {
135- // Use pretty printing via serde_json (fallback)
136- serde_json:: to_string_pretty ( json_data)
137- . map_err ( |e| IoError :: new ( ErrorKind :: Other , e. to_string ( ) ) ) ?
134+ let mut file = BufWriter :: new ( File :: create ( & self . path ) ?) ;
135+
136+ if self . pretty {
137+ // Pretty printing using serde_json.
138+ serde_json:: to_writer_pretty ( & mut file, json_data)
139+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e. to_string ( ) ) ) ?;
138140 } else if self . fast_serialization {
139- // Use fast compact serialization with a preallocated buffer.
140141 SERIALIZE_BUF . with ( |buf_cell| {
141142 let mut buf = buf_cell. borrow_mut ( ) ;
142- // Clear the buffer but keep its capacity
143143 buf. clear ( ) ;
144- // Here we use simd-json's fast serialization method if available.
145- // For demonstration, we'll use serde_json's to_vec and then convert to String.
146- // In practice, you might have a more direct API that writes into buf.
147- let vec = simd_json:: to_vec ( json_data) . map_err ( |e| {
148- IoError :: new ( ErrorKind :: Other , format ! ( "simd_json error: {:?}" , e) )
149- } ) ?;
150- // Ensure our buffer is large enough and copy the data into it.
151- buf. extend_from_slice ( & vec) ;
152- // Convert the pre-allocated buffer into a String.
153- // Since the output is valid UTF-8, this is safe.
154- String :: from_utf8 ( buf. clone ( ) )
155- . map_err ( |e| IoError :: new ( ErrorKind :: Other , e. to_string ( ) ) )
156- } ) ?
157- } else {
158- // Default compact serialization via serde_json.
159- serde_json:: to_string ( json_data)
160- . map_err ( |e| IoError :: new ( ErrorKind :: Other , e. to_string ( ) ) ) ?
161- } ;
162- fs:: write ( & self . path , content)
163- }
164-
165- /// Asynchronously saves the current in-memory JSON data.
166- ///
167- /// This spawns a background thread so that the calling thread is not blocked by I/O or serialization.
168- /// Any errors in the background thread are printed to stderr.
169- pub fn save_async ( & self ) {
170- let data = self . get ( ) ; // grab a snapshot of the current data
171- let path = self . path . clone ( ) ;
172- let pretty = self . pretty ;
173- let fast_serialization = self . fast_serialization ;
174- thread:: spawn ( move || {
175- let result = if pretty {
176- serde_json:: to_string_pretty ( & data)
177- . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e. to_string ( ) ) )
178- } else if fast_serialization {
179- simd_json:: to_string ( & data) . map_err ( |e| {
180- std:: io:: Error :: new (
181- std:: io:: ErrorKind :: Other ,
182- format ! ( "simd_json error: {:?}" , e) ,
183- )
184- } )
185- } else {
186- serde_json:: to_string ( & data)
187- . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e. to_string ( ) ) )
188- } ;
189- match result {
190- Ok ( content) => {
191- if let Err ( e) = fs:: write ( & path, content) {
192- eprintln ! ( "Async save failed: {}" , e) ;
144+ {
145+ let mut writer = BufferWriter :: new ( & mut buf) ;
146+ // Convert serde_json::Value to simd_json::OwnedValue for potentially faster serialization
147+ if let Ok ( borrowed) = to_borrowed_value ( json_data) {
148+ simd_json:: to_writer ( & mut writer, & borrowed)
149+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e. to_string ( ) ) ) ?;
150+ } else {
151+ // Fallback to serde_json if conversion fails (shouldn't happen for valid serde_json::Value)
152+ serde_json:: to_writer ( & mut writer, json_data)
153+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e. to_string ( ) ) ) ?;
193154 }
194155 }
195- Err ( e) => eprintln ! ( "Serialization error in async save: {}" , e) ,
196- }
197- } ) ;
156+ file. write_all ( & buf) ?;
157+ Ok :: < ( ) , io:: Error > ( ( ) )
158+ } ) ?;
159+ } else {
160+ // Default compact serialization via serde_json.
161+ serde_json:: to_writer ( & mut file, json_data)
162+ . map_err ( |e| io:: Error :: new ( io:: ErrorKind :: Other , e. to_string ( ) ) ) ?;
163+ }
164+ Ok ( ( ) )
198165 }
199166}
200167
201168impl Drop for JsonMutexDB {
202169 fn drop ( & mut self ) {
203- // If async_updates is enabled, drop the sender so that the background thread can exit.
204170 self . update_sender = None ;
205171 if let Some ( handle) = self . update_handle . take ( ) {
206- // Wait for the background thread to finish.
207172 let _ = handle. join ( ) ;
208173 }
209174 }
@@ -223,7 +188,6 @@ mod tests {
223188 let tmp_path = "test_db.json" ;
224189 let _ = fs:: remove_file ( tmp_path) ;
225190
226- // Create a new DB instance (file not found, so should initialize with empty object)
227191 let db =
228192 JsonMutexDB :: new ( tmp_path, false , false , false ) . expect ( "Failed to create JsonMutexDB" ) ;
229193 let data = db. get ( ) ;
@@ -240,7 +204,6 @@ mod tests {
240204 let db =
241205 JsonMutexDB :: new ( tmp_path, false , false , true ) . expect ( "Failed to create JsonMutexDB" ) ;
242206
243- // Set new data and save synchronously using compact mode with fast serialization.
244207 let new_data = json ! ( {
245208 "key" : "value" ,
246209 "numbers" : [ 1 , 2 , 3 ]
@@ -249,7 +212,6 @@ mod tests {
249212 db. update ( move |d| * d = new_data_clone) ;
250213 db. save_sync ( ) . expect ( "Failed to save JSON data" ) ;
251214
252- // Read file back and compare
253215 let file_content = fs:: read_to_string ( tmp_path) . expect ( "Failed to read file" ) ;
254216 let file_json: Value = serde_json:: from_str ( & file_content) . expect ( "Invalid JSON in file" ) ;
255217 assert_eq ! ( file_json, new_data) ;
@@ -270,7 +232,6 @@ mod tests {
270232 db. update ( |d| * d = new_data) ;
271233 db. save_sync ( ) . unwrap ( ) ;
272234 let file_content = fs:: read_to_string ( tmp_path) . unwrap ( ) ;
273- // Verify that the JSON is pretty printed (contains newlines)
274235 assert ! (
275236 file_content. contains( "\n " ) ,
276237 "JSON file not pretty printed: {}" ,
@@ -281,7 +242,6 @@ mod tests {
281242
282243 #[ test]
283244 fn test_multithreading_sync ( ) {
284- // Synchronous updates for comparison.
285245 let tmp_path = "test_db_multithread.json" ;
286246 let _ = fs:: remove_file ( tmp_path) ;
287247
@@ -302,27 +262,24 @@ mod tests {
302262 } ) ;
303263 handles. push ( handle) ;
304264 }
305-
306265 for handle in handles {
307266 handle. join ( ) . expect ( "Thread panicked" ) ;
308267 }
309-
310- // Confirm that the total number of keys equals the sum of all updates.
311268 let data = db. get ( ) ;
312269 let obj = data. as_object ( ) . expect ( "JSON is not an object" ) ;
313270 assert_eq ! ( obj. len( ) , num_threads * updates_per_thread) ;
314271
315272 let _ = fs:: remove_file ( tmp_path) ;
316273 }
317274
318- /// Benchmark saving the database with fast compact serialization.
319- /// This test is marked #[ignore] because it is performance sensitive.
275+ /// Benchmark saving the database with our fast compact serialization using the preallocated writer .
276+ /// This test is marked #[ignore] as it is performance sensitive.
320277 #[ test]
321278 #[ ignore]
322279 fn benchmark_save_compact_fast ( ) {
323280 let tmp_path = "test_db_perf.json" ;
324281 let _ = fs:: remove_file ( tmp_path) ;
325- // Use compact mode with fast serialization enabled.
282+ // Use compact mode with fast_serialization enabled (using our preallocated writer) .
326283 let db = JsonMutexDB :: new ( tmp_path, false , false , true ) . unwrap ( ) ;
327284
328285 // Create a large JSON object with 1000 key-value pairs.
@@ -344,9 +301,9 @@ mod tests {
344301 ) ;
345302 let avg = elapsed. as_secs_f64 ( ) / iterations as f64 ;
346303 println ! ( "Average time per compact save: {} seconds" , avg) ;
347- // Expect average save time to be under 100 microseconds .
304+ // Set a more realistic threshold after optimization .
348305 assert ! (
349- avg < 0.0001 ,
306+ avg < 0.00005 , // Adjust this threshold based on your results
350307 "Average compact save time too slow: {} seconds" ,
351308 avg
352309 ) ;
@@ -367,7 +324,6 @@ mod tests {
367324 let num_threads = 10 ;
368325 let updates_per_thread = 1000 ;
369326 let start = Instant :: now ( ) ;
370-
371327 let mut handles = vec ! [ ] ;
372328 for thread_id in 0 ..num_threads {
373329 let db_clone = Arc :: clone ( & db) ;
@@ -381,16 +337,12 @@ mod tests {
381337 } ) ;
382338 handles. push ( handle) ;
383339 }
384-
385340 for handle in handles {
386341 handle. join ( ) . expect ( "Thread panicked" ) ;
387342 }
388-
389- // (Note: In async mode, some updates might still be queued; in a production system
390- // you would flush the channel. For this benchmark we measure just the enqueue cost.)
391343 let elapsed = start. elapsed ( ) ;
392344 println ! ( "Elapsed time for async multithread update: {:?}" , elapsed) ;
393- // Expect total async update time to be under 5 milliseconds .
345+ // Expect total async update time to be under a chosen threshold (e.g., 5 ms) .
394346 assert ! (
395347 elapsed. as_secs_f64( ) < 0.005 ,
396348 "Multithread async update took too long: {:?}" ,
0 commit comments