-
Notifications
You must be signed in to change notification settings - Fork 34
Performance Tuning
rUv edited this page Jul 31, 2025
·
1 revision
Optimize FACT for maximum performance across all platforms and use cases.
FACT is designed for sub-100ms response times with these key metrics:
| Metric | Target | Typical | Best Case |
|---|---|---|---|
| Cache Hit Latency | <50ms | 25-40ms | 10ms |
| Cache Miss Latency | <140ms | 80-120ms | 50ms |
| Template Processing | <50ms | 15-30ms | 5ms |
| Memory Usage | <500MB | 200-300MB | 100MB |
| Cache Hit Rate | >85% | 85-95% | 99% |
# Python
config = Config(
cache_enabled=True,
cache_max_size=10000, # Increase for better hit rate
cache_ttl=7200, # 2 hours for stable data
)// Rust
let config = Config {
cache: CacheConfig {
max_size: 10000,
ttl_seconds: 7200,
enable_disk_cache: true,
},
..Default::default()
};# Python - Database connection pooling
DATABASE_CONFIG = {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 3600,
}// Rust - Parallel execution
let fact = Fact::with_config(Config {
performance: PerformanceConfig {
parallel_operations: true,
max_threads: num_cpus::get(),
},
..Default::default()
})?;Calculate optimal cache size based on your workload:
def calculate_optimal_cache_size(
daily_unique_queries: int,
avg_result_size_kb: float,
desired_hit_rate: float = 0.85
) -> int:
"""Calculate optimal cache size"""
# Pareto principle: 80% of queries are repeats
active_set = daily_unique_queries * 0.2
# Add buffer for desired hit rate
cache_size = int(active_set / (1 - desired_hit_rate))
# Check memory constraints
memory_usage_mb = (cache_size * avg_result_size_kb) / 1024
return cache_size, memory_usage_mb
# Example: 10,000 daily queries, 5KB average size
size, memory = calculate_optimal_cache_size(10000, 5)
print(f"Recommended cache size: {size} entries ({memory:.1f} MB)")Pre-populate cache with common queries:
async def warm_cache(driver: FACTDriver, queries: List[str]):
"""Warm cache with common queries"""
tasks = []
for query in queries:
task = driver.process_query(query, use_cache=True)
tasks.append(task)
# Process in batches to avoid overload
batch_size = 10
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
await asyncio.gather(*batch)def optimize_cache_key(query: str, context: dict) -> str:
"""Generate optimized cache key"""
# Normalize query
normalized = query.lower().strip()
normalized = re.sub(r'\s+', ' ', normalized)
# Extract important context only
key_context = {
k: v for k, v in context.items()
if k in ['user_type', 'data_version', 'filters']
}
# Use fast hash
key_data = f"{normalized}:{json.dumps(key_context, sort_keys=True)}"
return hashlib.blake2b(key_data.encode(), digest_size=16).hexdigest()class QueryOptimizer:
"""Optimize queries for performance"""
def __init__(self):
self.query_cache = {}
self.execution_stats = defaultdict(list)
def optimize(self, query: str) -> str:
"""Optimize query for execution"""
# Check if we've seen similar query
query_pattern = self._extract_pattern(query)
if query_pattern in self.query_cache:
# Reuse execution plan
return self.query_cache[query_pattern]
# Analyze query complexity
complexity = self._analyze_complexity(query)
# Choose optimization strategy
if complexity > 0.8:
optimized = self._optimize_complex_query(query)
else:
optimized = self._optimize_simple_query(query)
self.query_cache[query_pattern] = optimized
return optimizedasync def batch_process_queries(
driver: FACTDriver,
queries: List[str],
batch_size: int = 10
) -> List[str]:
"""Process queries in optimized batches"""
results = []
# Sort by estimated complexity
sorted_queries = sorted(queries, key=lambda q: len(q))
# Process in parallel batches
for i in range(0, len(sorted_queries), batch_size):
batch = sorted_queries[i:i + batch_size]
batch_results = await asyncio.gather(
*[driver.process_query(q) for q in batch]
)
results.extend(batch_results)
return resultsimport gc
import psutil
import tracemalloc
class MemoryManager:
"""Manage memory usage"""
def __init__(self, max_memory_mb: int = 500):
self.max_memory_mb = max_memory_mb
tracemalloc.start()
def check_memory(self):
"""Check current memory usage"""
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
if memory_mb > self.max_memory_mb:
self._reduce_memory()
return memory_mb
def _reduce_memory(self):
"""Reduce memory usage"""
# Force garbage collection
gc.collect()
# Clear caches
if hasattr(self, 'cache'):
self.cache.evict_oldest(count=1000)
# Log memory snapshot
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')[:10]
for stat in top_stats:
logger.debug(f"Memory: {stat}")use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
struct MemoryTracker;
static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
unsafe impl GlobalAlloc for MemoryTracker {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ret = System.alloc(layout);
if !ret.is_null() {
ALLOCATED.fetch_add(layout.size(), Ordering::Relaxed);
}
ret
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
System.dealloc(ptr, layout);
ALLOCATED.fetch_sub(layout.size(), Ordering::Relaxed);
}
}
#[global_allocator]
static GLOBAL: MemoryTracker = MemoryTracker;
// Monitor memory usage
fn get_memory_usage() -> usize {
ALLOCATED.load(Ordering::Relaxed)
}import asyncio
from asyncio import Semaphore
class AsyncOptimizer:
"""Optimize async operations"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.running_tasks = set()
async def run_with_limit(self, coro):
"""Run coroutine with concurrency limit"""
async with self.semaphore:
task = asyncio.create_task(coro)
self.running_tasks.add(task)
try:
return await task
finally:
self.running_tasks.remove(task)
async def gather_with_progress(self, coros, callback=None):
"""Gather with progress callback"""
tasks = [self.run_with_limit(coro) for coro in coros]
results = []
for i, task in enumerate(asyncio.as_completed(tasks)):
result = await task
results.append(result)
if callback:
callback(i + 1, len(tasks))
return resultsuse tokio::sync::Semaphore;
use futures::stream::{self, StreamExt};
pub struct AsyncProcessor {
semaphore: Arc<Semaphore>,
max_concurrent: usize,
}
impl AsyncProcessor {
pub fn new(max_concurrent: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
max_concurrent,
}
}
pub async fn process_batch<T, F, Fut>(
&self,
items: Vec<T>,
processor: F,
) -> Vec<Result<Value>>
where
F: Fn(T) -> Fut + Clone,
Fut: Future<Output = Result<Value>>,
T: Send + 'static,
{
let semaphore = self.semaphore.clone();
stream::iter(items)
.map(move |item| {
let processor = processor.clone();
let permit = semaphore.clone().acquire_owned();
async move {
let _permit = permit.await?;
processor(item).await
}
})
.buffer_unordered(self.max_concurrent)
.collect()
.await
}
}import time
from dataclasses import dataclass
from typing import Dict, List
import prometheus_client as prom
# Define metrics
query_duration = prom.Histogram(
'fact_query_duration_seconds',
'Query processing duration',
['query_type', 'cache_hit']
)
cache_operations = prom.Counter(
'fact_cache_operations_total',
'Cache operations',
['operation', 'result']
)
@dataclass
class PerformanceMonitor:
"""Monitor performance metrics"""
def __init__(self):
self.metrics: Dict[str, List[float]] = defaultdict(list)
def measure(self, name: str):
"""Context manager for measuring duration"""
return TimeMeasure(self, name)
def record(self, name: str, duration: float):
"""Record a measurement"""
self.metrics[name].append(duration)
# Update Prometheus metrics
query_duration.labels(
query_type=name,
cache_hit='unknown'
).observe(duration)
def get_stats(self, name: str) -> dict:
"""Get statistics for a metric"""
values = self.metrics.get(name, [])
if not values:
return {}
return {
'count': len(values),
'mean': sum(values) / len(values),
'min': min(values),
'max': max(values),
'p50': sorted(values)[len(values) // 2],
'p95': sorted(values)[int(len(values) * 0.95)],
'p99': sorted(values)[int(len(values) * 0.99)],
}
class TimeMeasure:
"""Context manager for timing"""
def __init__(self, monitor: PerformanceMonitor, name: str):
self.monitor = monitor
self.name = name
self.start_time = None
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, *args):
duration = time.perf_counter() - self.start_time
self.monitor.record(self.name, duration)def create_performance_dashboard():
"""Create performance monitoring dashboard"""
dashboard = {
'queries': {
'total': query_duration._count.sum(),
'rate': query_duration._count.rate(),
'latency_p50': query_duration.percentile(0.5),
'latency_p99': query_duration.percentile(0.99),
},
'cache': {
'hit_rate': cache_hit_rate(),
'operations': cache_operations._count.sum(),
'memory_usage': get_cache_memory_usage(),
},
'system': {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_io': psutil.disk_io_counters(),
}
}
return dashboard# Use PyPy for CPU-intensive operations
# Use uvloop for better async performance
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Use msgpack for faster serialization
import msgpack
def fast_serialize(data):
return msgpack.packb(data, use_bin_type=True)
def fast_deserialize(data):
return msgpack.unpackb(data, raw=False)
# Use lru_cache for expensive computations
from functools import lru_cache
@lru_cache(maxsize=1000)
def expensive_computation(input_data: str) -> dict:
# Expensive processing
return result// Compile with optimizations
// In Cargo.toml:
[profile.release]
opt-level = 3
lto = "fat"
codegen-units = 1
panic = "abort"
// Use parking_lot for faster locks
use parking_lot::{RwLock, Mutex};
// Use ahash for faster hashing
use ahash::AHashMap;
// Use smallvec for stack allocation
use smallvec::SmallVec;
// Profile-guided optimization
#[cfg(feature = "pgo")]
#[profile(always)]
fn hot_path_function() {
// Critical path code
}// Use Web Workers for parallel processing
class WASMWorkerPool {
constructor(workerCount = 4) {
this.workers = Array(workerCount).fill(null).map(() =>
new Worker('fact-worker.js')
);
this.taskQueue = [];
this.busyWorkers = new Set();
}
async process(data) {
const worker = await this.getAvailableWorker();
this.busyWorkers.add(worker);
return new Promise((resolve, reject) => {
worker.onmessage = (e) => {
this.busyWorkers.delete(worker);
resolve(e.data);
};
worker.onerror = reject;
worker.postMessage(data);
});
}
}
// Use SharedArrayBuffer for zero-copy
if (typeof SharedArrayBuffer !== 'undefined') {
const buffer = new SharedArrayBuffer(1024 * 1024); // 1MB
const view = new Float32Array(buffer);
// Share buffer between workers
}# config/performance.yaml
cache:
max_size: 10000
ttl_seconds: 7200
eviction_policy: lru
warm_on_startup: true
persist_to_disk: true
database:
connection_pool_size: 20
query_timeout: 5
enable_query_cache: true
async:
max_concurrent_queries: 50
worker_threads: 8
io_threads: 4
monitoring:
enable_metrics: true
metrics_port: 9090
profile_enabled: false# Python performance
export FACT_WORKERS=8
export FACT_ASYNC_POOL_SIZE=100
export FACT_CACHE_MEMORY_MB=500
export PYTHONOPTIMIZE=2
# Rust performance
export FACT_THREADS=16
export FACT_CACHE_SHARDS=8
export RUST_LOG=warn
export FACT_PROFILE=release
# System tuning
ulimit -n 65536 # Increase file descriptors
echo 1 > /proc/sys/vm/overcommit_memory # Memory overcommit# Python benchmarks
python -m benchmarking.framework \
--suite all \
--iterations 10000 \
--concurrent 100 \
--output results.json
# Rust benchmarks
fact benchmark \
--operations 100000 \
--threads 8 \
--warmup 1000 \
--output bench.json
# Compare results
python compare_benchmarks.py baseline.json optimized.jsonimport asyncio
import statistics
from typing import List, Callable
async def benchmark_operation(
operation: Callable,
iterations: int = 1000,
warmup: int = 100
) -> dict:
"""Benchmark an operation"""
# Warmup
for _ in range(warmup):
await operation()
# Measure
times = []
for _ in range(iterations):
start = time.perf_counter()
await operation()
times.append(time.perf_counter() - start)
return {
'iterations': iterations,
'mean': statistics.mean(times),
'median': statistics.median(times),
'stdev': statistics.stdev(times),
'min': min(times),
'max': max(times),
'p95': sorted(times)[int(len(times) * 0.95)],
'p99': sorted(times)[int(len(times) * 0.99)],
}Symptoms: Hit rate below 80%
Solutions:
- Increase cache size
- Improve cache key generation
- Increase TTL for stable data
- Implement cache warming
Symptoms: Growing memory usage
Solutions:
- Enable garbage collection monitoring
- Use memory profiling tools
- Implement object pooling
- Set memory limits
Symptoms: P95 latency above 200ms
Solutions:
- Enable query caching
- Optimize database indexes
- Use query batching
- Enable parallel processing
Symptoms: CPU usage above 80%
Solutions:
- Scale horizontally
- Optimize hot paths
- Use more efficient algorithms
- Enable hardware acceleration
- Cache enabled with appropriate size
- Connection pooling configured
- Async operations optimized
- Memory limits set
- Monitoring enabled
- Benchmarks established
- Query patterns analyzed
- Indexes optimized
- Hardware acceleration enabled
- Load testing completed
For more optimization techniques, see the Architecture and Best Practices guides.