Skip to content
Merged

Dev #51

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions rust/examples/index_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,43 @@ async fn main() -> vectorless::Result<()> {
.await
.map_err(|e| vectorless::Error::Config(e.to_string()))?;

let content = r#"# Project Overview
let content = r#"# Distributed Data Processing Platform

## Introduction

This document describes the architecture of a distributed system
designed for high-throughput data processing.
This document provides a comprehensive overview of the distributed data processing platform architecture. The system is designed to handle petabyte-scale data workloads with sub-second query latency, supporting both real-time streaming and batch processing paradigms. The architecture follows a microservices-based approach with independent scaling capabilities for each component, enabling cost-effective resource utilization across varying workload patterns.

## Components

### API Gateway
## System Architecture

Handles authentication, rate limiting, and request routing.
Supports both REST and gRPC protocols.
The platform follows a layered architecture pattern with clear separation of concerns between ingestion, processing, storage, and serving layers. Each layer can be independently deployed, scaled, and upgraded without affecting other layers, following the principle of bounded contexts from domain-driven design. Inter-layer communication uses a combination of asynchronous message passing for data flow and synchronous gRPC calls for control plane operations.

### Worker Pool
### Ingestion Layer

Processes tasks from the message queue. Each worker handles
one task at a time with configurable timeout.
The ingestion layer serves as the entry point for all data entering the platform. It supports multiple protocols including HTTP REST, gRPC, Apache Kafka, and AWS Kinesis. The layer is responsible for data validation, schema enforcement, initial transformation, and routing to downstream processing pipelines. Built on a reactive architecture using backpressure-aware operators, the ingestion layer gracefully handles burst traffic patterns without overwhelming downstream services.

## Performance

Under load testing, the system achieves 50k requests/second
with p99 latency under 200ms.
### Processing Engine

## Conclusion
The processing engine is the core computational component of the platform, responsible for transforming, enriching, aggregating, and analyzing ingested data. It supports both stream processing for real-time analytics and batch processing for historical analysis. The engine is built on a custom execution framework that optimizes query plans based on data statistics and available compute resources.

The modular design allows independent scaling of each component.
### Storage Layer

The storage layer provides a unified abstraction over multiple storage backends, each optimized for different access patterns. The hot tier uses an in-memory columnar cache for frequently accessed dimensions and recent fact data, providing microsecond-level access latency. The warm tier uses a distributed key-value store backed by NVMe SSDs for data accessed within the past 30 days. The cold tier uses object storage with Parquet file format for historical data, achieving cost efficiency at the expense of higher access latency.

Data is automatically tiered based on configurable policies that consider access frequency, data age, and query patterns. The tiering engine runs as a background service that continuously monitors access patterns and migrates data between tiers. Metadata about data placement is maintained in a distributed metadata service built on etcd, which provides consistent reads and writes with linearizable semantics.

### Query Serving Layer

The query serving layer provides the external-facing API for executing analytical queries against the processed data. It supports SQL queries via a PostgreSQL-compatible wire protocol, making it accessible to a wide range of BI tools and existing applications without requiring driver changes. The query router analyzes incoming queries and determines the optimal execution strategy, considering which storage tiers contain the relevant data and whether partial results can be served from cached aggregations.

Query results are optionally materialized in a result cache that uses a time-to-live (TTL) policy combined with lazy invalidation based on upstream data freshness markers. The cache achieves a hit rate of approximately 85% for dashboard workloads, significantly reducing the computational load on the processing engine for repetitive query patterns.

## Deployment and Operations

The platform is deployed on Kubernetes with Helm charts that encapsulate all deployment configurations, resource limits, and scaling policies. Each microservice is packaged as a container image with multi-stage builds that minimize image size and attack surface. The CI/CD pipeline uses a GitOps workflow with ArgoCD, ensuring that all changes to production are auditable, reproducible, and reversible.

Monitoring is implemented using a Prometheus and Grafana stack, with custom metrics exported by each service using a shared instrumentation library. Key performance indicators including query latency percentiles, ingestion throughput, processing lag, and error rates are tracked on operational dashboards with automated alerting through PagerDuty integration. Distributed tracing using OpenTelemetry provides end-to-end visibility into request flows across microservices, enabling rapid diagnosis of performance anomalies and error root causes.
"#;

// Index from content string
Expand Down
3 changes: 3 additions & 0 deletions rust/src/client/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ impl IndexerClient {
.with_tree(tree)
.with_metrics(result.metrics);

doc.reasoning_index = result.reasoning_index;

if let Some(p) = path {
doc = doc.with_source_path(p);
}
Expand Down Expand Up @@ -444,6 +446,7 @@ impl IndexerClient {
persisted.add_page(page.page, &page.content);
}

persisted.reasoning_index = doc.reasoning_index;
persisted.meta.update_processing_stats(node_count, summary_tokens, duration_ms);

persisted
Expand Down
4 changes: 4 additions & 0 deletions rust/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct IndexedDocument {

/// Indexing pipeline metrics.
pub metrics: Option<IndexMetrics>,

/// Pre-computed reasoning index for retrieval acceleration.
pub reasoning_index: Option<crate::document::ReasoningIndex>,
}

impl IndexedDocument {
Expand All @@ -64,6 +67,7 @@ impl IndexedDocument {
tree: None,
pages: Vec::new(),
metrics: None,
reasoning_index: None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust/src/index/stages/enhance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ impl IndexStage for EnhanceStage {
if summary.is_empty() {
failed += 1;
} else {
ctx.metrics.add_tokens_generated(
crate::utils::estimate_tokens(&summary),
);
tree.set_summary(node_id, &summary);
generated += 1;
ctx.metrics.increment_summaries();
Expand Down
2 changes: 1 addition & 1 deletion rust/src/index/summary/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Default for SummaryStrategyConfig {
max_tokens: 200,
min_content_tokens: 50,
persist_lazy: false,
shortcut_threshold: 200,
shortcut_threshold: 50,
}
}
}
Expand Down
24 changes: 21 additions & 3 deletions rust/src/llm/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,20 +376,38 @@ impl LlmExecutor {
let request =
request.map_err(|e| LlmError::Request(format!("Failed to build request: {}", e)))?;

debug!("Sending LLM request to {} with model {}", endpoint, model);

info!(
"LLM request → endpoint: {}, model: {}, system: {} chars, user: {} chars",
endpoint,
model,
system.len(),
truncated.len()
);

let request_start = std::time::Instant::now();
let response = client.chat().create(request).await.map_err(|e| {
let msg = e.to_string();
LlmError::from_api_message(&msg)
})?;
let request_elapsed = request_start.elapsed();

let usage = response.usage.as_ref();
let prompt_tokens = usage.map(|u| u.prompt_tokens).unwrap_or(0);
let completion_tokens = usage.map(|u| u.completion_tokens).unwrap_or(0);

let content = response
.choices
.first()
.and_then(|choice| choice.message.content.clone())
.ok_or(LlmError::NoContent)?;

debug!("LLM response length: {} chars", content.len());
info!(
"LLM response ← {}ms, tokens: {} prompt + {} completion, content: {} chars",
request_elapsed.as_millis(),
prompt_tokens,
completion_tokens,
content.len()
);

Ok(content)
}
Expand Down
8 changes: 6 additions & 2 deletions rust/src/storage/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl Workspace {

/// Get the storage key for a document.
fn doc_key(id: &str) -> String {
format!("doc:{}", id)
id.to_string()
}

/// Load the meta index from backend.
Expand Down Expand Up @@ -516,7 +516,11 @@ impl Workspace {
/// Rebuild the meta index from existing documents.
fn rebuild_meta_index(inner: &mut WorkspaceInner) -> Result<()> {
let keys = inner.backend.keys()?;
let doc_keys: Vec<_> = keys.iter().filter(|k| k.starts_with("doc:")).collect();
let reserved = ["meta", "_graph"];
let doc_keys: Vec<_> = keys
.iter()
.filter(|k| !reserved.contains(&k.as_str()))
.collect();

for key in doc_keys {
if let Some(bytes) = inner.backend.get(key)? {
Expand Down
1 change: 1 addition & 0 deletions samples/1938cb46-4085-4a70-b9e6-70b97d3c8ba9.bin

Large diffs are not rendered by default.

Loading