diff --git a/.env.example b/.env.example index 086d66f..ca3603b 100644 --- a/.env.example +++ b/.env.example @@ -14,4 +14,7 @@ LLM_MODEL=gpt-3.5-turbo LLM_API_KEY=your-api-key-here LLM_ENGINE_PORT=8001 +# Query Router Configuration +QUERY_ROUTER_PORT=8002 + RUST_LOG=debug \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 41cc5fa..8b7202a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -46,7 +46,7 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max - build-query-runner: + build-query-router: runs-on: ubuntu-latest steps: - name: Checkout repository @@ -55,18 +55,18 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - - name: Build Query Runner service + - name: Build Query Router service uses: docker/build-push-action@v4 with: - context: ./query_runner + context: ./query_router push: false load: true - tags: lucidata-query-runner:latest + tags: lucidata-query-router:latest cache-from: type=gha cache-to: type=gha,mode=max system-test: - needs: [build-api, build-llm-engine, build-query-runner] + needs: [build-api, build-llm-engine, build-query-router] runs-on: ubuntu-latest steps: - name: Checkout repository @@ -95,20 +95,20 @@ jobs: cache-from: type=gha outputs: type=docker,dest=/tmp/llm-engine-image.tar - - name: Download Query Runner image + - name: Download Query Router image uses: docker/build-push-action@v4 with: - context: ./query_runner + context: ./query_router load: true - tags: lucidata-query-runner:latest + tags: lucidata-query-router:latest cache-from: type=gha - outputs: type=docker,dest=/tmp/query-runner-image.tar + outputs: type=docker,dest=/tmp/query-router-image.tar - name: Load saved images run: | docker load < /tmp/api-image.tar docker load < /tmp/llm-engine-image.tar - docker load < /tmp/query-runner-image.tar + docker load < /tmp/query-router-image.tar docker images - name: Start services with docker compose diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f9f7168..8bb703e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,8 +62,8 @@ jobs: working-directory: ./llm_engine run: cargo test - query_runner: - name: Query Runner Service + query_router: + name: Query Router Service runs-on: ubuntu-latest steps: @@ -76,9 +76,9 @@ jobs: override: true - name: Run cargo check - working-directory: ./query_runner + working-directory: ./query_router run: cargo check - name: Run cargo test - working-directory: ./query_runner + working-directory: ./query_router run: cargo test diff --git a/README.md b/README.md index d831fa9..ce9c8ba 100644 --- a/README.md +++ b/README.md @@ -4,37 +4,62 @@ Lucidata is an LLM based query tool designed to democratize data access. It translates natural language questions into SQL/API queries over structured datasets, returning clear, traceable answers and exports. -## Features (WIP) +## Features - Natural Language Interface: Ask questions in plain English -- Query Translation: Automatic conversion to SQL/API queries -- Result Visualization: Clear tables and charts -- Export Options: Download results in various formats (CSV, Excel, etc.) -- Query Transparency: Track and export generated queries +- Query Translation: Automatic conversion to SQL queries +- Query Transparency: Track and export generated queries, explanations, and model confidence + +### Road-Map + +- Support for Generic WebAPI queries +- Result Visualization ## Getting Started ### Prerequisites - `docker` installed -- OpenAPI `API_KEY` +- An OpenAPI `API_KEY` ### Usage 1. Clone the repository ```bash - git clone https://github.com/jdhoffa/lucidata.git + gh repo clone jdhoffa/lucidata cd lucidata ``` -2. Start the application with Docker Compose +2. Build and start the application with `docker compose`: ```bash + docker compose build # it can take a while to compile, be patient :-) docker compose up ``` -3. Enter your natural language query in the input field and click "Submit" +3. Send your query to the query_router endpoint, and check out the results! +``` bash +curl -X POST "http://localhost:8002/translate-and-execute" \ + -H "Content-Type: application/json" \ + -d '{ + "natural_query": "Show me the cars with the best power-to-weight ratio, sorted from highest to lowest" + }' +``` -4. Review the results and use the export options as needed +4. (Optional) Pipe the output to the `jq` CLI: +``` bash +curl -X POST "http://localhost:8002/translate-and-execute" \ + -H "Content-Type: application/json" \ + -d '{ + "natural_query": "Show me the cars with the best power-to-weight ratio, sorted from highest to lowest" + }' | jq + +# you can also select a specific tag +curl -X POST "http://localhost:8002/translate-and-execute" \ + -H "Content-Type: application/json" \ + -d '{ + "natural_query": "Show me the cars with the best power-to-weight ratio, sorted from highest to lowest" + }' | jq '.results' +``` ## System Architecture @@ -77,9 +102,18 @@ graph TD ## Example Queries ``` -"What is the projected energy mix in 2030 according to IEA's Net Zero scenario?" +# Query #1 tests mathematical operations (division of hp/wt) +"Show me the cars with the best power-to-weight ratio, sorted from highest to lowest." + +# Query #2 tests sorting and multi-column selection +"Compare fuel efficiency (MPG) and horsepower for all cars, sorted by MPG." + +# Query #3 tests aggregation functions with grouping +"What's the average horsepower and MPG for automatic vs manual transmission cars?" -"How does natural gas production in the US compare to China over the next decade in WoodMac's base case?" +# Query #4 tests more complex aggregation and grouping +"Show me the relationship between number of cylinders and fuel efficiency with average MPG by cylinder count" -"Show me the top 5 countries by renewable energy growth in the next 5 years." +# Query #5 tests limiting results and specific column selection +"Find the top 5 cars with the highest horsepower and their quarter-mile time (qsec)" ``` diff --git a/api/Dockerfile b/api/Dockerfile index bac0cbf..f6c8a2d 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -34,11 +34,6 @@ RUN apt-get update && \ # Copy the binary from the builder stage COPY --from=builder /app/target/release/lucidata-api /app/lucidata-api -# Create a .env file only if environment variables aren't provided -RUN touch .env && \ - echo "DATABASE_URL=\${DATABASE_URL:-postgres://postgres:postgres@db:5432/pbtar}" > .env && \ - echo "RUST_LOG=\${RUST_LOG:-info}" >> .env - EXPOSE 8000 CMD ["/app/lucidata-api"] diff --git a/docker-compose.yml b/docker-compose.yml index 8e631b9..aef96b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,7 +37,7 @@ services: db: condition: service_healthy healthcheck: - test: ["CMD", "curl", "-f", "http://api:${API_PORT}/api/health"] + test: ["CMD", "curl", "-f", "http://localhost:${API_PORT}/api/health"] interval: 10s timeout: 5s retries: 5 @@ -48,7 +48,7 @@ services: build: context: ./llm_engine ports: - - "8001:8001" + - "${LLM_ENGINE_PORT}:${LLM_ENGINE_PORT}" dns: - 8.8.8.8 - 1.1.1.1 @@ -66,26 +66,33 @@ services: condition: service_healthy api: condition: service_healthy - tty: true - stdin_open: true healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8001/health"] + test: ["CMD", "curl", "-f", "http://localhost:${LLM_ENGINE_PORT}/health"] interval: 10s timeout: 5s retries: 3 start_period: 5s - query_runner: + query_router: build: - context: ./query_runner + context: ./query_router + ports: + - "${QUERY_ROUTER_PORT}:${QUERY_ROUTER_PORT}" env_file: - ./.env environment: DATABASE_URL: ${DATABASE_URL} API_URL: ${API_URL} + QUERY_ROUTER_PORT: ${QUERY_ROUTER_PORT} + LLM_ENGINE_URL: ${LLM_ENGINE_URL} + RUST_LOG: ${RUST_LOG} depends_on: + db: + condition: service_healthy api: - condition: service_started + condition: service_healthy + llm_engine: + condition: service_healthy volumes: postgres_data: diff --git a/llm_engine/Dockerfile b/llm_engine/Dockerfile index 796edff..aaf408b 100644 --- a/llm_engine/Dockerfile +++ b/llm_engine/Dockerfile @@ -35,9 +35,6 @@ RUN apt-get update && \ # Copy the binary from the builder stage COPY --from=builder /app/target/release/llm_engine /app/llm_engine -# Set environment variables -ENV LLM_ENGINE_PORT=8001 - # Expose the port EXPOSE 8001 diff --git a/query_runner/Cargo.toml b/query_router/Cargo.toml similarity index 89% rename from query_runner/Cargo.toml rename to query_router/Cargo.toml index 3f6d0b2..de9187b 100644 --- a/query_runner/Cargo.toml +++ b/query_router/Cargo.toml @@ -1,15 +1,16 @@ [package] -name = "query_runner" +name = "query_router" version = "0.1.0" edition = "2021" [dependencies] tokio = { version = "1.29", features = ["full"] } -axum = "0.6.20" +axum = "0.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tower = "0.4" tower-http = { version = "0.4", features = ["cors"] } reqwest = { version = "0.11", features = ["json"] } dotenvy = "0.15" @@ -18,3 +19,4 @@ thiserror = "1.0" tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] } futures = "0.3" chrono = { version = "0.4", features = ["serde"] } +dotenv = "0.15" diff --git a/query_runner/Dockerfile b/query_router/Dockerfile similarity index 87% rename from query_runner/Dockerfile rename to query_router/Dockerfile index 46aee17..105c8fa 100644 --- a/query_runner/Dockerfile +++ b/query_router/Dockerfile @@ -32,7 +32,9 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # Copy the binary from the builder stage -COPY --from=builder /app/target/release/query_runner /app/query_runner +COPY --from=builder /app/target/release/query_router /app/query_router + +EXPOSE 8002 # Command to run the application -CMD ["/app/query_runner"] +CMD ["/app/query_router"] diff --git a/query_router/src/main.rs b/query_router/src/main.rs new file mode 100644 index 0000000..8594ebe --- /dev/null +++ b/query_router/src/main.rs @@ -0,0 +1,264 @@ +use std::sync::Arc; +use std::{env, net::SocketAddr, time::Instant}; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tower::ServiceBuilder; +use tower_http::cors::{Any, CorsLayer}; +use tracing::error; + +// Models for requests and responses +#[derive(Debug, Deserialize)] +struct TranslateAndExecuteRequest { + natural_query: String, + #[serde(default = "default_model")] + model: String, +} + +fn default_model() -> String { + "gpt-3.5-turbo".to_string() +} + +#[derive(Debug, Serialize)] +struct TranslateAndExecuteResponse { + natural_query: String, + sql_query: String, + results: Value, + explanation: String, + metadata: ResponseMetadata, +} + +#[derive(Debug, Serialize)] +struct ResponseMetadata { + confidence: f64, + execution_time_ms: u64, + llm_processing_time_ms: u64, + total_time_ms: u64, +} + +// LLM Engine response structure +#[derive(Debug, Deserialize)] +struct LlmResponse { + sql_query: String, + explanation: String, + confidence: f64, +} + +// Application state +struct AppState { + client: Client, + llm_engine_url: String, + api_url: String, +} + +// Error types +#[derive(thiserror::Error, Debug)] +enum AppError { + #[error("Failed to call LLM engine: {0}")] + LlmEngineError(#[from] reqwest::Error), + + #[error("Failed to process LLM response: {0}")] + LlmResponseError(String), + + #[error("Failed to execute SQL query: {0}")] + SqlExecutionError(String), +} + +// Convert AppError to Axum Response +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let (status, error_message) = match self { + AppError::LlmEngineError(e) => (StatusCode::BAD_GATEWAY, format!("LLM engine error: {}", e)), + AppError::LlmResponseError(msg) => (StatusCode::BAD_REQUEST, msg), + AppError::SqlExecutionError(msg) => (StatusCode::BAD_REQUEST, msg), + }; + + let body = Json(json!({ + "error": error_message + })); + + (status, body).into_response() + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Load environment variables, don't require .env file in Docker + let _ = dotenv::dotenv(); + + // Initialize logging + use tracing_subscriber::prelude::*; + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Initialize application state + let llm_engine_url = env::var("LLM_ENGINE_URL") + .expect("LLM_ENGINE_URL must be set"); + + let api_url = env::var("API_URL") + .expect("API_URL must be set"); + + let state = Arc::new(AppState { + client: Client::new(), + llm_engine_url, + api_url, + }); + + // Create middleware stack with CORS + let middleware = ServiceBuilder::new() + .layer( + CorsLayer::new() + .allow_origin(Any) + .allow_methods(Any) + .allow_headers(Any) + ); + + // Create the router + let app = Router::new() + .route("/health", get(health_check)) + .route("/translate-and-execute", post(translate_and_execute)) + .with_state(state) + .layer(middleware); + + // Get port from environment variable or use default + let port = env::var("QUERY_ROUTER_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .unwrap_or(8002); + + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + tracing::info!("Query Router server starting on {}", addr); + + // Start the server with graceful error handling for address-in-use errors + match axum::Server::try_bind(&addr) { + Ok(server) => { + server.serve(app.into_make_service()).await?; + }, + Err(err) => { + if err.to_string().contains("Address already in use") { + tracing::error!("Port {} is already in use. Try setting a different port with the QUERY_ROUTER_PORT environment variable.", port); + std::process::exit(1); + } else { + return Err(err.into()); + } + } + }; + + Ok(()) +} + +// Health check endpoint +async fn health_check() -> &'static str { + "OK" +} + +// Main endpoint for translating natural language to SQL and executing it +async fn translate_and_execute( + State(state): State>, + Json(request): Json, +) -> Result, AppError> { + let start_time = Instant::now(); + + // Step 1: Forward the natural language query to the LLM engine + let llm_start_time = Instant::now(); + let llm_response = call_llm_engine(&state, &request).await?; + let llm_processing_time = llm_start_time.elapsed().as_millis() as u64; + + // Step 2: Send the generated SQL to the API execution endpoint + let execution_start_time = Instant::now(); + let query_result = execute_sql_query(&state, &llm_response.sql_query).await?; + let execution_time = execution_start_time.elapsed().as_millis() as u64; + + // Step 3: Combine results and return to client + let total_time = start_time.elapsed().as_millis() as u64; + + let response = TranslateAndExecuteResponse { + natural_query: request.natural_query, + sql_query: llm_response.sql_query, + results: query_result, + explanation: llm_response.explanation, + metadata: ResponseMetadata { + confidence: llm_response.confidence, + execution_time_ms: execution_time, + llm_processing_time_ms: llm_processing_time, + total_time_ms: total_time, + }, + }; + + Ok(Json(response)) +} + +// Call LLM engine to convert natural language to SQL +async fn call_llm_engine( + state: &AppState, + request: &TranslateAndExecuteRequest +) -> Result { + let url = format!("{}/process-query", state.llm_engine_url); + + let llm_request = json!({ + "query": request.natural_query, + "model": request.model + }); + + let response = state.client + .post(&url) + .json(&llm_request) + .send() + .await + .map_err(AppError::LlmEngineError)?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string()); + return Err(AppError::LlmResponseError(format!("LLM engine returned error ({}): {}", status, error_text))); + } + + let llm_response = response.json::().await + .map_err(|e| AppError::LlmResponseError(format!("Failed to parse LLM response: {}", e)))?; + + Ok(llm_response) +} + +// Execute SQL using the API service +async fn execute_sql_query(state: &AppState, sql_query: &str) -> Result { + let url = format!("{}/api/query", state.api_url); + + let query_request = json!({ + "query": sql_query + }); + + let response = state.client + .post(&url) + .json(&query_request) + .send() + .await + .map_err(|e| AppError::SqlExecutionError(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_else(|_| "Unknown error".to_string()); + return Err(AppError::SqlExecutionError(format!("SQL execution failed ({}): {}", status, error_text))); + } + + let json_response: serde_json::Value = response.json().await + .map_err(|e| AppError::SqlExecutionError(format!("Failed to parse API response: {}", e)))?; + + // Extract the results field from the API response + let result = json_response["result"].clone(); + if result.is_null() { + return Err(AppError::SqlExecutionError("API returned null result".to_string())); + } + + Ok(result) +} diff --git a/query_runner/src/main.rs b/query_runner/src/main.rs deleted file mode 100644 index 4d1c45d..0000000 --- a/query_runner/src/main.rs +++ /dev/null @@ -1,255 +0,0 @@ -use std::env; -use std::net::SocketAddr; -use axum::{ - routing::{get, post}, - Router, - Json, - http::StatusCode, - extract::State, - response::IntoResponse, -}; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; -use tower_http::cors::{CorsLayer, Any}; -use tracing::{info, error}; -use anyhow::Result; -use std::sync::Arc; -use std::collections::HashMap; -use tokio_postgres::{NoTls, Row}; - -// Initialize tracing -fn setup_logging() { - tracing_subscriber::fmt::init(); -} - -#[derive(Clone)] -struct AppState { - db_url: String, -} - -// Request model -#[derive(Deserialize)] -struct ExecuteQueryRequest { - query: String, -} - -// Response model -#[derive(Serialize)] -struct ExecuteQueryResponse { - results: Vec>, - metadata: HashMap, -} - -// Error handling -enum AppError { - InternalError(String), - BadRequest(String), -} - -impl IntoResponse for AppError { - fn into_response(self) -> axum::response::Response { - match self { - AppError::InternalError(msg) => { - (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ - "error": msg - }))).into_response() - }, - AppError::BadRequest(msg) => { - (StatusCode::BAD_REQUEST, Json(json!({ - "error": msg - }))).into_response() - } - } - } -} - -// Root endpoint -async fn root() -> impl IntoResponse { - Json(json!({ - "status": "ok", - "message": "Query Runner Service is running" - })) -} - -// Health check endpoint -async fn health_check() -> impl IntoResponse { - Json(json!({ - "status": "ok" - })) -} - -// Execute query endpoint -async fn execute_query( - State(state): State>, - Json(request): Json, -) -> Result { - info!("Executing query: {}", request.query); - - // Connect to the database - let client = match tokio_postgres::connect(&state.db_url, NoTls).await { - Ok((client, connection)) => { - // Handle the connection in the background - tokio::spawn(async move { - if let Err(e) = connection.await { - error!("Connection error: {}", e); - } - }); - - client - }, - Err(e) => { - let error_msg = format!("Database connection error: {}", e); - error!("{}", error_msg); - return Err(AppError::InternalError(error_msg)); - } - }; - - // Execute the query - let start_time = std::time::Instant::now(); - - let rows = match client.query(&request.query, &[]).await { - Ok(rows) => rows, - Err(e) => { - let error_msg = format!("Query execution error: {}", e); - error!("{}", error_msg); - return Err(AppError::BadRequest(error_msg)); - } - }; - - let query_time = start_time.elapsed().as_millis(); - - // Convert rows to the expected output format - let results = match rows_to_json(rows) { - Ok(data) => data, - Err(e) => { - let error_msg = format!("Error converting query results: {}", e); - error!("{}", error_msg); - return Err(AppError::InternalError(error_msg)); - } - }; - - // Get column names if available - let column_names = if !results.is_empty() { - let first_row = &results[0]; - first_row.keys().map(|k| k.to_string()).collect::>() - } else { - Vec::new() - }; - - // Create the response - let metadata = HashMap::from([ - ("row_count".to_string(), json!(results.len())), - ("query_execution_time_ms".to_string(), json!(query_time)), - ("column_names".to_string(), json!(column_names)), - ]); - - let response = ExecuteQueryResponse { - results, - metadata, - }; - - Ok(Json(response)) -} - -// Helper function to convert Postgres rows to JSON-compatible format -fn rows_to_json(rows: Vec) -> Result>, anyhow::Error> { - let mut result = Vec::new(); - - for row in rows { - let mut row_data = HashMap::new(); - - for i in 0..row.len() { - let column_name = match row.columns()[i].name() { - name => name.to_string(), - }; - - // Handle different data types - let value = if let Ok(v) = row.try_get::<_, Option>(i) { - match v { - Some(val) => json!(val), - None => Value::Null, - } - } else if let Ok(v) = row.try_get::<_, Option>(i) { - match v { - Some(val) => json!(val), - None => Value::Null, - } - } else if let Ok(v) = row.try_get::<_, Option>(i) { - match v { - Some(val) => json!(val), - None => Value::Null, - } - } else if let Ok(v) = row.try_get::<_, Option>(i) { - match v { - Some(val) => json!(val), - None => Value::Null, - } - } else if let Ok(v) = row.try_get::<_, Option>(i) { - match v { - Some(val) => json!(val), - None => Value::Null, - } - } else { - // If we can't determine the type, convert to string representation - json!(format!("{:?}", row.get::<_, Value>(i))) - }; - - row_data.insert(column_name, value); - } - - result.push(row_data); - } - - Ok(result) -} - -#[tokio::main] -async fn main() { - // Load environment variables - dotenvy::dotenv().ok(); - - // Set up logging - setup_logging(); - - // Get database URL from environment - let db_url = match env::var("DATABASE_URL") { - Ok(url) => url, - Err(_) => { - error!("DATABASE_URL environment variable not set"); - std::process::exit(1); - } - }; - - // Create app state - let state = Arc::new(AppState { - db_url, - }); - - // Build our application with routes - let app = Router::new() - .route("/", get(root)) - .route("/health", get(health_check)) - .route("/execute-query", post(execute_query)) - .layer( - CorsLayer::new() - .allow_origin(Any) - .allow_headers(Any) - .allow_methods(Any) - ) - .with_state(state); - - // Get the port from environment or use default - let port = env::var("QUERY_RUNNER_PORT") - .unwrap_or_else(|_| "8003".to_string()) - .parse::() - .expect("PORT must be a valid number"); - - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - info!("Query Runner listening on {}", addr); - - // Run the server - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); -}