Skip to content

Commit 9b95164

Browse files
authored
Implement input chunking to handle large json input (#29)
1 parent fb80de1 commit 9b95164

File tree

9 files changed

+191
-56
lines changed

9 files changed

+191
-56
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,4 @@ test-results/
157157

158158
# SBOM file generated by cargo-cyclonedx
159159
**/*.cdx.json
160+
.vscode/settings.json

kic-script-gen/src/back_end/client_server.rs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use actix_ws::{Message, Session};
44
use futures::StreamExt;
55
use script_gen_manager::script_component::script::ScriptModel;
66
use serde::{Deserialize, Serialize};
7+
use std::collections::HashMap;
78
use std::sync::Arc;
89
use tokio::sync::{broadcast, Mutex};
910

@@ -101,6 +102,7 @@ async fn ws_index(
101102
app_state: web::Data<Arc<AppState>>,
102103
) -> Result<HttpResponse, Error> {
103104
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
105+
//msg_stream = msg_stream.max_frame_size(50 * 1024 * 1024); // 50MB
104106

105107
// Use the app_state here
106108
{
@@ -110,6 +112,8 @@ async fn ws_index(
110112

111113
let gen_script_tx = app_state.gen_script_tx.clone();
112114

115+
let mut chunk_buffers: HashMap<String, Vec<Option<String>>> = HashMap::new();
116+
113117
actix_web::rt::spawn(async move {
114118
while let Some(Ok(msg)) = msg_stream.next().await {
115119
match msg {
@@ -118,8 +122,55 @@ async fn ws_index(
118122
return;
119123
}
120124
}
121-
Message::Text(msg) => {
122-
// println!("Received input from client in session: {msg}");
125+
Message::Text(mut msg) => {
126+
let mut is_chunked = false;
127+
{
128+
use serde_json::Value;
129+
if let Ok(value) = serde_json::from_str::<Value>(&msg) {
130+
if let (
131+
Some(msg_id),
132+
Some(chunk_index),
133+
Some(total_chunks),
134+
Some(data),
135+
) = (
136+
value.get("msg_id").and_then(|v| v.as_str()),
137+
value.get("chunk_index").and_then(|v| v.as_u64()),
138+
value.get("total_chunks").and_then(|v| v.as_u64()),
139+
value.get("data").and_then(|v| v.as_str()),
140+
) {
141+
is_chunked = true;
142+
let entry = chunk_buffers
143+
.entry(msg_id.to_string())
144+
.or_insert_with(|| vec![None; total_chunks as usize]);
145+
entry[chunk_index as usize] = Some(data.to_string());
146+
if entry.iter().all(|c| c.is_some()) {
147+
let full_msg = entry
148+
.iter()
149+
.map(|c| c.as_ref().unwrap().as_str())
150+
.collect::<String>();
151+
chunk_buffers.remove(msg_id);
152+
println!(
153+
"Received complete chunked message of size: {} bytes",
154+
full_msg.len()
155+
);
156+
msg = full_msg.into();
157+
} else {
158+
println!(
159+
"Received chunk {}/{} for msg_id {}",
160+
chunk_index + 1,
161+
total_chunks,
162+
msg_id
163+
);
164+
continue;
165+
}
166+
}
167+
}
168+
}
169+
// --- End chunked message reassembly logic ---
170+
// Only fall through to normal processing if not a chunked message or if chunk is complete
171+
if is_chunked && msg.is_empty() {
172+
continue;
173+
}
123174
match serde_json::from_str::<IpcData>(&msg) {
124175
Ok(ipc_data) => {
125176
if ipc_data.request_type == "get_data" {
@@ -128,7 +179,6 @@ async fn ws_index(
128179
let mut data_model = app_state.data_model.lock().await;
129180
let response =
130181
data_model.process_data_from_client(ipc_data.json_value);
131-
println!("processed data from client {response}");
132182
// Send generate script signal
133183
if let Err(e) = gen_script_tx.send(()) {
134184
eprintln!("Failed to send signal: {e}");
@@ -173,6 +223,7 @@ async fn ws_index(
173223
_ => (),
174224
}
175225
}
226+
println!("WebSocket message loop ended - connection lost or closed");
176227
});
177228

178229
Ok(response)

kic-script-gen/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use script_gen_manager::{catalog, script_component::script::ScriptModel};
33

44
#[actix_web::main]
55
async fn main() -> anyhow::Result<()> {
6-
eprintln!("Welcome to KIC Script Generator!");
6+
println!("Welcome to KIC Script Generator!");
77

88
let mut catalog = catalog::Catalog::new();
99
catalog.refresh_function_metadata();

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@
2525
"ts-node": "^10.9.2",
2626
"typescript": "^5.3.3"
2727
}
28-
}
28+
}

0 commit comments

Comments
 (0)