diff --git a/CMakeLists.txt b/CMakeLists.txt index a05c76c..a9065f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,12 +11,28 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON) option(POMAI_BUILD_TESTS "Build tests" OFF) option(POMAI_BUILD_BENCH "Build benchmarks" OFF) + # Prefer integer (SQ8/FP16) distance paths where data is quantized; reduces float use on embedded. option(POMAI_PREFER_INTEGER_MATH "Prefer integer/SQ8/FP16 paths for distance (embedded)" ON) +# Edge-oriented build profile for constrained devices. +# When enabled, we bias towards smaller binaries and lower memory/debug overhead. +option(POMAI_EDGE_BUILD "Optimize PomaiDB build for edge devices (size/footprint)" OFF) + +# Strict build mode: treat project warnings as errors (useful for CI/hardening). +# We keep a couple of warning categories non-fatal to tolerate intentional diagnostics +# from some vendored headers included by the core. +option(POMAI_STRICT "Treat warnings as errors for PomaiDB code" OFF) + # Ensure POMAI_PREFER_INTEGER_MATH is defined for source code (0 or 1) add_compile_definitions(POMAI_PREFER_INTEGER_MATH=$) +if (POMAI_EDGE_BUILD) + add_compile_definitions(POMAI_EDGE_BUILD=1) + # Favor size over speed and strip most debug info in edge builds. + add_compile_options(-Os -g0) +endif() + # ========================= # Native HNSW (Replaces FAISS HNSW) @@ -79,6 +95,13 @@ target_include_directories(pomai ${CMAKE_CURRENT_SOURCE_DIR}/third_party ) +# Vendored code: keep strict warnings for PomaiDB, but avoid breaking builds on +# third_party sources when POMAI_STRICT is enabled. +if (POMAI_STRICT AND NOT MSVC) + set_source_files_properties(third_party/pomaidb_hnsw/hnsw.cc PROPERTIES COMPILE_OPTIONS + "-Wno-error;-Wno-shadow;-Wno-unused-parameter;-Wno-unused-but-set-variable") +endif() + # OpenMP for parallel builds if needed find_package(OpenMP QUIET) if (OpenMP_CXX_FOUND) @@ -88,11 +111,29 @@ endif() if (MSVC) target_compile_options(pomai PRIVATE /W4 /permissive-) else() - # Strict warnings for pomaidb code; then suppress vendored third_party (simd) warnings + # Strict warnings for pomaidb code; then suppress vendored third_party (simd) warnings. + # We do not use / -Werror globally because some third_party headers intentionally emit #warning. target_compile_options(pomai PRIVATE -Wall -Wextra -Wpedantic -Wconversion -Wshadow + -Wunused-parameter -Wunused-variable -Wno-cpp -Wno-unknown-pragmas -Wno-conversion -Wno-float-conversion -Wno-unused-function + ) +endif() + +if (POMAI_STRICT) + if (MSVC) + target_compile_options(pomai PRIVATE /WX) + else() + target_compile_options(pomai PRIVATE + -Werror + # Some vendored headers use extensions that trigger pedantic/cpp warnings. + -Wno-error=pedantic + -Wno-error=cpp + # third_party/simd uses #warning; pre-C++23 this triggers -Wc++23-extensions. + # We disable that warning category entirely to keep strict builds usable. + -Wno-c++23-extensions ) + endif() endif() @@ -133,8 +174,24 @@ if (MSVC) target_compile_options(pomai_c PRIVATE /W4 /permissive-) target_compile_options(pomai_c_static PRIVATE /W4 /permissive-) else() - target_compile_options(pomai_c PRIVATE -Wall -Wextra -Wpedantic -Wconversion -Wshadow) - target_compile_options(pomai_c_static PRIVATE -Wall -Wextra -Wpedantic -Wconversion -Wshadow) + target_compile_options(pomai_c PRIVATE + -Wall -Wextra -Wpedantic -Wconversion -Wshadow + -Wunused-parameter -Wunused-variable + ) + target_compile_options(pomai_c_static PRIVATE + -Wall -Wextra -Wpedantic -Wconversion -Wshadow + -Wunused-parameter -Wunused-variable + ) +endif() + +if (POMAI_STRICT) + if (MSVC) + target_compile_options(pomai_c PRIVATE /WX) + target_compile_options(pomai_c_static PRIVATE /WX) + else() + target_compile_options(pomai_c PRIVATE -Werror -Wno-error=pedantic -Wno-error=cpp -Wno-c++23-extensions) + target_compile_options(pomai_c_static PRIVATE -Werror -Wno-error=pedantic -Wno-error=cpp -Wno-c++23-extensions) + endif() endif() # ========================= @@ -269,6 +326,18 @@ if (POMAI_BUILD_TESTS) pomai_setup_test(db_partial_search_test) pomai_add_labeled_test(db_partial_search_test "integ") + add_executable(db_backpressure_test tests/integ/db_backpressure_test.cc) + pomai_setup_test(db_backpressure_test) + pomai_add_labeled_test(db_backpressure_test "integ") + + add_executable(db_edge_workload_test tests/integ/db_edge_workload_test.cc) + pomai_setup_test(db_edge_workload_test) + pomai_add_labeled_test(db_edge_workload_test "integ") + + add_executable(db_error_paths_test tests/integ/db_error_paths_test.cc) + pomai_setup_test(db_error_paths_test) + pomai_add_labeled_test(db_error_paths_test "integ") + add_executable(routing_engine_test tests/integ/routing_engine_test.cc) pomai_setup_test(routing_engine_test) pomai_add_labeled_test(routing_engine_test "integ") @@ -356,6 +425,10 @@ if (POMAI_BUILD_TESTS) pomai_setup_test(shard_runtime_tsan_test) pomai_add_labeled_test(shard_runtime_tsan_test "tsan") + add_executable(backpressure_tsan_test tests/tsan/backpressure_tsan_test.cc) + pomai_setup_test(backpressure_tsan_test) + pomai_add_labeled_test(backpressure_tsan_test "tsan") + add_executable(basic_workload_tsan_test tests/tsan/basic_workload_tsan_test.cc) pomai_setup_test(basic_workload_tsan_test) pomai_add_labeled_test(basic_workload_tsan_test "tsan") diff --git a/README.md b/README.md index d2b062d..52f0831 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,13 @@ For the smallest footprint on embedded devices: 2. **Slim palloc submodule** (saves ~6MB): after clone, run `./scripts/slim_palloc_submodule.sh` so `third_party/palloc` omits `media/`, `test/`, `bench/`, and `contrib/`. 3. **Optional sparse checkout of pomaidb**: for a production embedded build you can exclude `benchmarks/`, `examples/`, or `tools/` via your own sparse-checkout if you do not need them at build time. +### Edge deployments & failure semantics + +For recommended settings on real edge devices (build flags, durability policies, backpressure, and how PomaiDB behaves on power loss), see: + +- `docs/EDGE_DEPLOYMENT.md` — **edge-device configuration & failure behavior** +- `docs/FAILURE_SEMANTICS.md` — low-level WAL / manifest crash semantics + ### Docker: run benchmarks Build the image, then run benchmarks in constrained (IoT/Edge) or server-style containers: diff --git a/benchmarks/palloc_env_stress.cc b/benchmarks/palloc_env_stress.cc index a6fc2c1..d269f95 100644 --- a/benchmarks/palloc_env_stress.cc +++ b/benchmarks/palloc_env_stress.cc @@ -236,6 +236,9 @@ void RunEnvB(EnvReport* report) { size_t total_verified = 0; bool all_ok = true; int failed_cycle = -1; + double first_cycle_throughput = 0.0; + double last_cycle_throughput = 0.0; + double min_cycle_throughput = 0.0; try { for (int cycle = 0; cycle < num_cycles; ++cycle) { @@ -245,13 +248,17 @@ void RunEnvB(EnvReport* report) { IngestResult r = IngestAndVerify(path, per_cycle); total_ingested += r.ingested; total_verified += r.verified; + if (cycle == 0) first_cycle_throughput = r.throughput_vec_per_sec; + last_cycle_throughput = r.throughput_vec_per_sec; + if (r.throughput_vec_per_sec > 0.0 && (min_cycle_throughput == 0.0 || r.throughput_vec_per_sec < min_cycle_throughput)) + min_cycle_throughput = r.throughput_vec_per_sec; if (r.verified != per_cycle || r.ingested != per_cycle) { all_ok = false; if (failed_cycle < 0) failed_cycle = cycle; } if (cycle == 0) rss_after_first = GetPeakRssBytes(); rss_after_last = GetPeakRssBytes(); - printf("ingested=%zu verified=%zu\n", r.ingested, r.verified); + printf("ingested=%zu verified=%zu %.1f Vec/s\n", r.ingested, r.verified, r.throughput_vec_per_sec); fflush(stdout); } } catch (const std::exception& e) { @@ -281,6 +288,18 @@ void RunEnvB(EnvReport* report) { if (total_ingested > 0 && elapsed_ns > 0) report->throughput_vec_per_sec = static_cast(total_ingested) * 1e9 / static_cast(elapsed_ns); + // Report per-cycle ingestion rate so we can check it does not degrade over time (constant vector size). + if (num_cycles > 0 && first_cycle_throughput > 0.0) { + printf(" Per-cycle ingestion (Vec/s): first=%.1f last=%.1f min=%.1f", + first_cycle_throughput, last_cycle_throughput, min_cycle_throughput); + const double ratio = (first_cycle_throughput > 0.0) ? (last_cycle_throughput / first_cycle_throughput) : 0.0; + if (ratio < 0.75) + printf(" [WARN: last cycle %.0f%% of first — ingestion rate reduced over time]\n", ratio * 100.0); + else + printf(" [OK: rate stable]\n"); + fflush(stdout); + } + if (!all_ok) { report->passed = 0; static std::string fail_msg; @@ -299,6 +318,10 @@ void RunEnvB(EnvReport* report) { if (growth <= 0.15) { report->passed = 1; report->message = "Peak RSS stable (no leak); all cycles verified"; + if (first_cycle_throughput > 0.0 && last_cycle_throughput >= 0.75 * first_cycle_throughput) + report->message = "Peak RSS stable (no leak); all cycles verified; ingestion rate stable"; + else if (first_cycle_throughput > 0.0 && last_cycle_throughput < 0.75 * first_cycle_throughput) + report->message = "Peak RSS stable (no leak); all cycles verified; WARN: ingestion rate degraded over cycles"; } else { report->passed = 0; report->message = "FAIL: RSS growth suggests leak"; diff --git a/docs/EDGE_DEPLOYMENT.md b/docs/EDGE_DEPLOYMENT.md new file mode 100644 index 0000000..851025d --- /dev/null +++ b/docs/EDGE_DEPLOYMENT.md @@ -0,0 +1,184 @@ +## PomaiDB on Edge Devices: Recommended Settings & Failure Semantics + +PomaiDB is designed first for embedded / edge workloads: single-process, local storage, constrained memory, and frequent power loss. This guide summarizes **recommended configuration presets** and **what happens on failure** so you can reason about behavior on devices like Raspberry Pi, Jetson, or custom ARM boards. + +This document focuses on the **embedded `pomai::Database` API** (single-instance engine) and the **sharded `pomai::DB` API** where relevant. + +--- + +### 1. Build profile and compiler settings + +- **Edge build profile (size-optimized):** + - Configure CMake with: + - `-DPOMAI_EDGE_BUILD=ON` (enables `-Os -g0` and other size-focused flags) + - `-DCMAKE_BUILD_TYPE=Release` + - Recommended for production firmware images and containers where binary size and cold-start latency matter more than debug info. + +- **Strict warnings for development and CI:** + - Enable: + - `-DPOMAI_STRICT=ON` + - This turns most compiler warnings into errors for PomaiDB’s own code while keeping vendored dependencies (HNSW, SIMD kernels) lenient. + - Safe to combine with `POMAI_EDGE_BUILD` once your toolchain is stable; it helps surface misconfigurations early. + +--- + +### 2. Storage and durability settings + +PomaiDB stores all data under a **single directory** on local storage (e.g., SD card, eMMC, SSD). + +- **Filesystem & mount:** + - Prefer **ext4** or another journaling filesystem with barriers enabled. + - Avoid network filesystems for embedded use; PomaiDB assumes low-latency local I/O. + +- **Durability via `FsyncPolicy`:** + - For the sharded `pomai::DB` API (`pomai::DBOptions`): + - `FsyncPolicy::kNever`: + - Best for **cache-like or reconstructible** data. + - Power loss may drop recent writes still in OS buffers, but on-disk data remains self-consistent. + - `FsyncPolicy::kAlways`: + - Recommended when **data must survive power loss** and write rates are modest. + - Every WAL / manifest commit is fsynced; expect higher latency but strong durability. + - For the embedded `pomai::Database` API (`pomai::EmbeddedOptions`): + - Use `EmbeddedOptions::fsync` in the same way. + - On intermittently powered devices, prefer `kAlways` for critical logs and `kNever` where data can be rebuilt. + +- **Flush vs. Freeze:** + - `Flush()` ensures the **WAL is pushed to disk** according to `FsyncPolicy`. + - `Freeze()` moves the current memtable into an on-disk **segment** and updates manifests. + - On edge devices, a common pattern from an event loop or watchdog is: + - Periodically call `Flush()` and `Freeze()` on a timer (e.g., every N seconds) or after M ingests. + - On clean shutdown, issue `Flush()` and `Freeze()` before `Close()`. + +For the detailed atomic commit protocol and WAL / manifest guarantees, see `docs/FAILURE_SEMANTICS.md`. + +--- + +### 3. Memory limits and backpressure (embedded `pomai::Database`) + +`pomai::Database` exposes **explicit backpressure controls** in `EmbeddedOptions`: + +- **Key fields:** + - `max_memtable_mb`: + - Hard cap for the memtable (in MiB). `0` = use environment or default: + - Default is tuned for edge and may differ between low-memory and normal builds. + - `pressure_threshold_percent`: + - Soft threshold (percent of `max_memtable_mb`) where pressure handling kicks in. `0` = default (typically 80%). + - `auto_freeze_on_pressure`: + - If `true`, when the memtable exceeds the pressure threshold, the engine will **call `Freeze()` internally** rather than returning an error. + - `memtable_flush_threshold_mb`: + - Absolute size in MiB where `auto_freeze_on_pressure` triggers, overriding the percentage. `0` = derive from `pressure_threshold_percent`. + +- **Recommended presets for edge:** + - **Tiny devices (≤ 256 MiB RAM):** + - `max_memtable_mb = 32`–`64` + - `pressure_threshold_percent = 70`–`80` + - `auto_freeze_on_pressure = true` + - `memtable_flush_threshold_mb = 32` (optional override) + - **Moderate devices (512 MiB – 1 GiB RAM):** + - `max_memtable_mb = 128`–`256` + - `pressure_threshold_percent = 80` + - `auto_freeze_on_pressure = true` (recommended) or `false` if you want manual control via `TryFreezeIfPressured()`. + +- **Environment overrides:** + - The embedded engine also honors: + - `POMAI_MAX_MEMTABLE_MB` – caps memtable size if `max_memtable_mb` is `0`. + - `POMAI_MEMTABLE_PRESSURE_THRESHOLD` – overrides `pressure_threshold_percent` for defaults. + - `POMAI_BENCH_LOW_MEMORY` – switches to lower default memtable sizes for benchmarks / tests. + +- **Operational pattern:** + - In a single-threaded event loop, the typical pattern is: + - Call `AddVector()` / `AddVectorBatch()` for ingestion. + - Periodically call `TryFreezeIfPressured()` to keep memory use bounded. + - Inspect `GetMemTableBytesUsed()` for metrics / logging. + +--- + +### 4. Index and quantization presets for low memory + +PomaiDB’s `IndexParams` exposes presets tuned for edge workloads: + +- **Use `IndexParams::ForEdge()` wherever possible:** + - In `EmbeddedOptions`: + - `opt.index_params = pomai::IndexParams::ForEdge();` + - This preset reduces: + - IVF list count (`nlist`), probes (`nprobe`), + - HNSW degree / ef parameters, + - and other memory-heavy knobs. + - The goal is to keep index RAM usage predictable while still providing reasonable recall. + +- **Distance metric:** + - For most embedding-style workloads on edge devices: + - Use `MetricType::kL2` (squared L2) with SQ8 or FP16 quantization for compact storage. + - `MetricType::kInnerProduct` is also supported but may be more sensitive to quantization. + +- **Quantization knobs (when applicable):** + - Prefer SQ8 or FP16 quantization where your model tolerates some loss, especially for: + - Large corpora on devices with ≤ 512 MiB RAM. + - Scenarios where on-disk size is heavily constrained (e.g., SD cards with many tenants). + +--- + +### 5. Failure semantics on edge devices + +PomaiDB is built to **fail closed** rather than risking silent corruption. High-level behaviors (see `docs/FAILURE_SEMANTICS.md` for details): + +- **On `Open()` (embedded `Database::Open` / sharded `DB::Open`):** + - Invalid configuration (e.g., `dim == 0`, empty `path`) returns: + - `Status::InvalidArgument`. + - Filesystem errors (permissions, missing dirs that cannot be created) return: + - `Status::IOError`. + - WAL or manifest corruption: + - The engine attempts to **replay or recover**. + - If recovery is not possible, `Open()` returns a non-OK `Status` (e.g., `Corruption`, `Aborted`, or `Internal` depending on context) and **does not start** the engine. + +- **During ingestion / search:** + - **Backpressure (embedded engine):** + - If the memtable exceeds `max_memtable_mb` and `auto_freeze_on_pressure` is `false`: + - `AddVector` / `AddVectorBatch` will return `Status::ResourceExhausted` with a message instructing callers to `Freeze()` or `TryFreezeIfPressured()`. + - If `auto_freeze_on_pressure` is `true`: + - The engine attempts to `Freeze()` internally once pressure is detected. + - If freeze fails (e.g., I/O error), the operation returns the corresponding failure `Status`. + - **I/O failures (ENOSPC, EIO, etc.):** + - Write failures on WAL / segments propagate as: + - `Status::IOError` or `Status::Aborted` / `Status::Internal`, depending on the layer. + - After a serious I/O error, affected shards / the embedded engine will refuse further operations until reopened, to avoid compounding corruption. + +- **Crash and restart behavior:** + - On restart, both APIs: + - Re-open WALs and attempt **replay up to the last valid record**. + - Validate manifests and segment files; fall back from `manifest.current` to `manifest.prev` if needed. + - Tests such as `recovery_test`, `manifest_corruption_test`, and WAL corruption scenarios validate the following guarantees: + - No silent acceptance of corrupted manifests or WAL segments. + - Either **recover to a consistent state** (possibly losing a tail of recent writes) or **fail to open** with a non-OK `Status`. + +--- + +### 6. Operational recommendations for real devices + +- **Choose a failure policy per device class:** + - For sensor nodes with upstream replicas: + - Prefer `FsyncPolicy::kNever`, small `max_memtable_mb`, and `auto_freeze_on_pressure = true`. + - Rely on upstream for long-term durability. + - For gateway / aggregation devices: + - Prefer `FsyncPolicy::kAlways` for critical data. + - Use `IndexParams::ForEdge()` and conservative `max_memtable_mb` to bound RAM. + +- **Integrate health checks:** + - Treat **any non-OK `Status` from `Open()`** as a signal to: + - Log and raise an alert. + - Potentially rotate to a new storage path or device. + - Monitor: + - `GetMemTableBytesUsed()` + - Open / search error codes (e.g., `ResourceExhausted`, `IOError`, `Corruption`). + +- **Test on your actual target:** + - Run the existing integration, TSAN, and crash tests on: + - Your device type, filesystem, and kernel. + - Perform your own chaos test: + - Ingest + `Flush()` / `Freeze()` loop. + - Physically cut power or kill the process. + - Verify that: + - `Open()` either succeeds with intact historical data or fails with a clear error code. + +These guidelines are intentionally conservative: they aim to keep your edge deployments safe even under frequent power loss and tight memory budgets. + diff --git a/include/pomai/database.h b/include/pomai/database.h index 38eeb60..f84084d 100644 --- a/include/pomai/database.h +++ b/include/pomai/database.h @@ -30,6 +30,7 @@ struct EmbeddedOptions { std::uint32_t dim = 512; MetricType metric = MetricType::kL2; FsyncPolicy fsync = FsyncPolicy::kNever; + /** Index/quantization params. Use IndexParams::ForEdge() for low-memory edge devices. */ IndexParams index_params; /** Memtable backpressure: max size in MiB before rejecting Put (0 = from env or default). */ @@ -45,6 +46,11 @@ struct EmbeddedOptions { /** * Database: thin wrapper around one StorageEngine and one vector index. * Single-threaded only; caller serializes access or runs on one thread. + * + * Visibility and staleness: Search and Get use the latest published snapshot + * (updated after Freeze/Compact). During heavy ingestion, reads may be at most + * one freeze cycle behind the most recent Put. Use GetSnapshot + NewIterator + * for a fixed point-in-time view. Deletes are tombstones; "newest wins" per id. */ class Database { public: diff --git a/include/pomai/options.h b/include/pomai/options.h index fd243fb..48f94fb 100644 --- a/include/pomai/options.h +++ b/include/pomai/options.h @@ -55,6 +55,29 @@ namespace pomai // Default: 0 = always use HNSW when available (rely on ef_search for recall). uint32_t adaptive_threshold = 5000; QuantizationType quant_type = QuantizationType::kNone; + + /** Default index params (balanced quality/memory). */ + static IndexParams Default() { + return IndexParams{}; + } + + /** + * Low-memory preset for edge/embedded devices. + * Fewer IVF centroids, smaller HNSW graph and ef, lower adaptive threshold + * so more segments use brute-force (predictable, smaller index memory). + */ + static IndexParams ForEdge() { + IndexParams p; + p.type = IndexType::kIvfFlat; + p.nlist = 16; + p.nprobe = 4; + p.hnsw_m = 16; + p.hnsw_ef_construction = 100; + p.hnsw_ef_search = 32; + p.adaptive_threshold = 2000; + p.quant_type = QuantizationType::kNone; + return p; + } }; struct DBOptions diff --git a/src/core/distance.cc b/src/core/distance.cc index 9bb67a3..9b94d17 100644 --- a/src/core/distance.cc +++ b/src/core/distance.cc @@ -16,6 +16,19 @@ // SimSIMD: by default uses compile-time dispatch (best for current arch). // For portable binaries with runtime dispatch (AVX2/AVX512/NEON/SVE), build with // -DSIMSIMD_DYNAMIC_DISPATCH=1 and link SimSIMD's dynamic dispatch object. +#if !((defined(__GNUC__) || defined(__clang__)) && (defined(__ARM_ARCH) || defined(__aarch64__)) && defined(__ARM_FP16_FORMAT_IEEE)) && \ + !(((defined(__GNUC__) || defined(__clang__)) && (defined(__x86_64__) || defined(__i386__)) && defined(__AVX512FP16__))) +#ifndef SIMSIMD_NATIVE_F16 +#define SIMSIMD_NATIVE_F16 0 +#endif +#endif + +#if !((defined(__GNUC__) || defined(__clang__)) && (defined(__ARM_ARCH) || defined(__aarch64__)) && defined(__ARM_BF16_FORMAT_ALTERNATIVE)) && \ + !(((defined(__GNUC__) || defined(__clang__)) && (defined(__x86_64__) || defined(__i386__)) && defined(__AVX512BF16__))) +#ifndef SIMSIMD_NATIVE_BF16 +#define SIMSIMD_NATIVE_BF16 0 +#endif +#endif #include "simd/simsimd.h" namespace pomai::core { diff --git a/src/core/quantization/pomai_pq.cc b/src/core/quantization/pomai_pq.cc index 4cc15c3..8baeca8 100644 --- a/src/core/quantization/pomai_pq.cc +++ b/src/core/quantization/pomai_pq.cc @@ -6,7 +6,6 @@ #include "core/quantization/pomai_pq.h" #include -#include #include #include #include @@ -21,12 +20,14 @@ namespace pomai::core { // ── Constructor ──────────────────────────────────────────────────────────────── ProductQuantizer::ProductQuantizer(uint32_t dim, uint32_t M, uint32_t nbits) : dim_(dim), M_(M), nbits_(nbits), - ksub_(1u << nbits), - dsub_(dim / M), - code_size_((M * nbits + 7) / 8) + ksub_(nbits == 8 ? (1u << 8) : 0u), + dsub_(M != 0 ? dim / M : 0), + code_size_(M != 0 && nbits != 0 ? (M * nbits + 7) / 8 : 0) { - assert(dim_ % M_ == 0 && "dim must be divisible by M"); - assert(nbits_ == 8 && "only PQ8 (8-bit codes) supported"); + if (M_ == 0 || dim_ % M_ != 0 || nbits_ != 8) { + invalid_ = true; + return; + } centroids_.resize(static_cast(M_) * ksub_ * dsub_, 0.0f); } @@ -92,6 +93,8 @@ void KMeans(const float* data, std::size_t n, uint32_t d, // ── Training ────────────────────────────────────────────────────────────────── pomai::Status ProductQuantizer::Train(const float* data, std::size_t n, int max_iter) { + if (invalid_) + return pomai::Status::InvalidArgument("ProductQuantizer: dim must be divisible by M and nbits must be 8"); if (n < ksub_) return pomai::Status::InvalidArgument( "PQ training requires at least ksub=" + std::to_string(ksub_) + " vectors"); @@ -120,6 +123,7 @@ pomai::Status ProductQuantizer::Train(const float* data, std::size_t n, int max_ // ── Encoding ────────────────────────────────────────────────────────────────── void ProductQuantizer::Encode(const float* x, uint8_t* code) const { + if (invalid_) return; for (uint32_t m = 0; m < M_; ++m) { const float* xm = x + m * dsub_; float best_d = std::numeric_limits::max(); @@ -139,6 +143,7 @@ void ProductQuantizer::Encode(const float* x, uint8_t* code) const void ProductQuantizer::EncodeBatch(const float* x, std::size_t n, uint8_t* codes) const { + if (invalid_) return; for (std::size_t i = 0; i < n; ++i) Encode(x + i * dim_, codes + i * code_size_); } @@ -146,6 +151,7 @@ void ProductQuantizer::EncodeBatch(const float* x, std::size_t n, // ── Decoding ────────────────────────────────────────────────────────────────── void ProductQuantizer::Decode(const uint8_t* code, float* x) const { + if (invalid_) return; for (uint32_t m = 0; m < M_; ++m) std::memcpy(x + m * dsub_, GetCentroid(m, code[m]), dsub_ * sizeof(float)); } @@ -153,6 +159,7 @@ void ProductQuantizer::Decode(const uint8_t* code, float* x) const // ── ADC: precompute distance tables ────────────────────────────────────────── void ProductQuantizer::ComputeL2Table(const float* x, float* table) const { + if (invalid_) return; for (uint32_t m = 0; m < M_; ++m) { const float* xm = x + m * dsub_; float* tab_m = table + m * ksub_; @@ -169,6 +176,7 @@ void ProductQuantizer::ComputeL2Table(const float* x, float* table) const void ProductQuantizer::ComputeIPTable(const float* x, float* table) const { + if (invalid_) return; for (uint32_t m = 0; m < M_; ++m) { const float* xm = x + m * dsub_; float* tab_m = table + m * ksub_; @@ -184,6 +192,7 @@ void ProductQuantizer::ComputeIPTable(const float* x, float* table) const float ProductQuantizer::ScoreFromTable(const float* table, const uint8_t* code) const { + if (invalid_) return 0.0f; float s = 0.0f; for (uint32_t m = 0; m < M_; ++m) s += table[m * ksub_ + code[m]]; @@ -194,6 +203,7 @@ void ProductQuantizer::ScoreAllFromTable(const float* table, const uint8_t* codes, std::size_t n, float* out) const { + if (invalid_) return; for (std::size_t i = 0; i < n; ++i) out[i] = ScoreFromTable(table, codes + i * code_size_); } @@ -201,6 +211,8 @@ void ProductQuantizer::ScoreAllFromTable(const float* table, // ── Persistence ─────────────────────────────────────────────────────────────── pomai::Status ProductQuantizer::Save(const std::string& path) const { + if (invalid_) + return pomai::Status::InvalidArgument("ProductQuantizer: invalid configuration (dim % M != 0 or nbits != 8)"); std::ofstream f(path, std::ios::binary); if (!f) return pomai::Status::IOError("Cannot open PQ file for write: " + path); const uint32_t magic = 0x504D4151; // 'PMAQ' @@ -226,6 +238,8 @@ pomai::Status ProductQuantizer::Load(const std::string& path, f.read(reinterpret_cast(&nbits), sizeof(nbits)); if (magic != 0x504D4151u) return pomai::Status::Corruption("Bad PQ magic in " + path); + if (dim == 0 || M == 0 || (dim % M) != 0 || nbits != 8) + return pomai::Status::Corruption("Invalid PQ parameters in file (dim % M != 0 or nbits != 8)"); auto pq = std::make_unique(dim, M, nbits); const std::size_t cent_sz = pq->centroids_.size() * sizeof(float); f.read(reinterpret_cast(pq->centroids_.data()), cent_sz); diff --git a/src/core/quantization/pomai_pq.h b/src/core/quantization/pomai_pq.h index 939f4fb..a8a716d 100644 --- a/src/core/quantization/pomai_pq.h +++ b/src/core/quantization/pomai_pq.h @@ -35,6 +35,8 @@ class ProductQuantizer { uint32_t dsub() const { return dsub_; } uint32_t code_size() const { return code_size_; } bool trained() const { return trained_; } + /// False if constructor was given invalid args (dim not divisible by M, or nbits != 8). + bool is_valid() const { return !invalid_; } // ── Training ────────────────────────────────────────────────────────────── /// Train on `n` vectors. Each row is `dim` contiguous floats. @@ -76,6 +78,7 @@ class ProductQuantizer { private: uint32_t dim_, M_, nbits_, ksub_, dsub_, code_size_; bool trained_ = false; + bool invalid_ = false; // true when constructor args were invalid (no assert/abort) std::vector centroids_; // M × ksub × dsub (row-major: [m][k][d]) const float* GetCentroid(uint32_t m, uint32_t k) const { diff --git a/src/core/shard/runtime.cc b/src/core/shard/runtime.cc index c330ca0..9290ea1 100644 --- a/src/core/shard/runtime.cc +++ b/src/core/shard/runtime.cc @@ -32,7 +32,12 @@ namespace pomai::core namespace { constexpr std::chrono::milliseconds kBackgroundPoll{5}; + // Per-tick work budget for background freeze/compact (kept small so callers stay responsive). constexpr std::chrono::milliseconds kBackgroundBudget{2}; + // Upper bound on how long a single synchronous Freeze()/Compact() call will spend in background work + // before timing out and aborting the job. Prevents unbounded stalls on edge devices while leaving + // enough room for index builds on slower hardware. + constexpr std::chrono::seconds kBackgroundMaxSyncDuration{120}; constexpr std::size_t kBackgroundMaxEntriesPerTick = 2048; constexpr std::size_t kMaxSegmentEntries = 20000; constexpr std::size_t kMaxFrozenMemtables = 4; @@ -653,15 +658,10 @@ namespace pomai::core if (!st.ok()) return st; - // Need to mark delete in frozen? - // Frozen memtables are immutable. We can't delete in them. - // We add a "Delete" record to active memtable (tombstone). - // Since we search Active AFTER Frozen? No, we search Newer first. - // Order: Active -> Frozen (New->Old) -> Segments. - // But `Search` using `Snapshot` does NOT see Active. - // So `Search` will see the OLD value in Frozen/Segments if Active has Tombstone. - // This is STALENESS. "Reads may observe a slightly stale snapshot". - // This is consistent. + // Tombstones live in the active memtable only; frozen/segments are immutable. + // Search order: Active -> Frozen (newest first) -> Segments. Snapshot does not + // include the active memtable's tail, so reads may lag by at most one freeze + // cycle. Visibility: "newest wins" per id; deletes are applied as tombstones. (void)ivf_->Delete(c.id); return pomai::Status::Ok(); @@ -702,10 +702,18 @@ namespace pomai::core auto job = std::make_unique(BackgroundJob::Type::kFreeze, std::move(state)); background_job_ = std::move(job); last_background_result_.reset(); + const auto start = std::chrono::steady_clock::now(); while (background_job_) { - PumpBackgroundWork(std::chrono::hours(1)); + PumpBackgroundWork(kBackgroundBudget); + if (std::chrono::steady_clock::now() - start > kBackgroundMaxSyncDuration) { + CancelBackgroundJob("Freeze timed out"); + break; + } + } + if (last_background_result_.has_value()) { + return last_background_result_; } - return last_background_result_.has_value() ? last_background_result_ : std::optional(pomai::Status::Ok()); + return std::optional(pomai::Status::Aborted("Freeze timed out")); } // ------------------------- @@ -751,10 +759,18 @@ namespace pomai::core auto job = std::make_unique(BackgroundJob::Type::kCompact, std::move(state)); background_job_ = std::move(job); last_background_result_.reset(); + const auto start = std::chrono::steady_clock::now(); while (background_job_) { - PumpBackgroundWork(std::chrono::hours(1)); + PumpBackgroundWork(kBackgroundBudget); + if (std::chrono::steady_clock::now() - start > kBackgroundMaxSyncDuration) { + CancelBackgroundJob("Compaction timed out"); + break; + } } - return last_background_result_.has_value() ? last_background_result_ : std::optional(pomai::Status::Ok()); + if (last_background_result_.has_value()) { + return last_background_result_; + } + return std::optional(pomai::Status::Aborted("Compaction timed out")); } IteratorReply VectorRuntime::HandleIterator(IteratorCmd &c) @@ -1188,18 +1204,19 @@ namespace pomai::core } } - std::vector query_sums(queries.size() / dim_, 0.0f); + const std::size_t num_queries = queries.size() / dim_; + search_query_sums_scratch_.resize(num_queries, 0.0f); for (uint32_t q_idx : query_indices) { std::span q(queries.data() + q_idx * dim_, dim_); float s = 0.0f; for (float f : q) s += f; - query_sums[q_idx] = s; + search_query_sums_scratch_[q_idx] = s; } // Sequential path (single-threaded event loop). for (uint32_t q_idx : query_indices) { std::span single_query(queries.data() + (q_idx * dim_), dim_); - float q_sum = query_sums[q_idx]; + float q_sum = search_query_sums_scratch_[q_idx]; auto st = SearchLocalInternal(active, snap, single_query, q_sum, topk, opts, shared_policy, use_visibility, &(*out_results)[q_idx], false); if (!st.ok()) return st; } @@ -1245,9 +1262,10 @@ namespace pomai::core } // ------------------------- - // Phase 2: Parallel scoring over authoritative sources + // Phase 2: Parallel scoring over authoritative sources (reuse scratch to reduce allocations) // ------------------------- - std::vector candidates; + search_candidates_scratch_.clear(); + search_candidates_scratch_.reserve(std::min(static_cast(topk) * 4, static_cast(4096))); bool has_filters = !opts.filters.empty(); uint32_t effective_nprobe = index_params_.nprobe == 0 ? 1 : index_params_.nprobe; @@ -1359,9 +1377,11 @@ namespace pomai::core float score = 0.0f; const bool is_ip = (this->metric_ == pomai::MetricType::kInnerProduct || this->metric_ == pomai::MetricType::kCosine); if (quant_type == pomai::QuantizationType::kSq8) { - score = pomai::core::DotSq8(query, std::span(codes_ptr, dim_), q_min, q_inv_scale, query_sum); - if (!is_ip) { - // TODO: Implement L2 for SQ8 or decode. For now IP only in fast path. + if (is_ip) { + score = pomai::core::DotSq8(query, std::span(codes_ptr, dim_), q_min, q_inv_scale, query_sum); + } else { + const float q_max = q_min + 255.0f * q_inv_scale; + score = -pomai::core::L2SqSq8(query, std::span(codes_ptr, dim_), q_min, q_max); } } else if (quant_type == pomai::QuantizationType::kFp16) { if (is_ip) { @@ -1421,9 +1441,11 @@ namespace pomai::core float score = 0.0f; const bool is_ip = (this->metric_ == pomai::MetricType::kInnerProduct || this->metric_ == pomai::MetricType::kCosine); if (quant_type == pomai::QuantizationType::kSq8) { - score = pomai::core::DotSq8(query, codes, q_min, q_inv_scale, query_sum); - if (!is_ip) { - // IP only for SQ8 for now. + if (is_ip) { + score = pomai::core::DotSq8(query, codes, q_min, q_inv_scale, query_sum); + } else { + const float q_max = q_min + 255.0f * q_inv_scale; + score = -pomai::core::L2SqSq8(query, codes, q_min, q_max); } } else if (quant_type == pomai::QuantizationType::kFp16) { if (is_ip) { @@ -1509,38 +1531,39 @@ namespace pomai::core { auto [hits, scanned] = score_memtable(active); total_scanned += scanned; - candidates.insert(candidates.end(), hits.begin(), hits.end()); + search_candidates_scratch_.insert(search_candidates_scratch_.end(), hits.begin(), hits.end()); } for (auto it = snap->frozen_memtables.rbegin(); it != snap->frozen_memtables.rend(); ++it) { auto [hits, scanned] = score_memtable(*it); total_scanned += scanned; - candidates.insert(candidates.end(), hits.begin(), hits.end()); + search_candidates_scratch_.insert(search_candidates_scratch_.end(), hits.begin(), hits.end()); } - std::vector> segment_hits(snap->segments.size()); + search_segment_hits_scratch_.resize(snap->segments.size()); for (std::size_t i = 0; i < snap->segments.size(); ++i) { - segment_hits[i] = score_segment(snap->segments[i]); + search_segment_hits_scratch_[i].clear(); + search_segment_hits_scratch_[i] = score_segment(snap->segments[i]); } last_query_candidates_scanned_ += total_scanned; - for (const auto& hits : segment_hits) { - candidates.insert(candidates.end(), hits.begin(), hits.end()); + for (const auto& hits : search_segment_hits_scratch_) { + search_candidates_scratch_.insert(search_candidates_scratch_.end(), hits.begin(), hits.end()); } - std::sort(candidates.begin(), candidates.end(), [](const auto& a, const auto& b) { + std::sort(search_candidates_scratch_.begin(), search_candidates_scratch_.end(), [](const auto& a, const auto& b) { if (a.score != b.score) { return a.score > b.score; } return a.id < b.id; }); - if (candidates.size() > topk) { - candidates.resize(topk); + if (search_candidates_scratch_.size() > topk) { + search_candidates_scratch_.resize(topk); } - out->assign(candidates.begin(), candidates.end()); + out->assign(search_candidates_scratch_.begin(), search_candidates_scratch_.end()); return pomai::Status::Ok(); } } // namespace pomai::core diff --git a/src/core/shard/runtime.h b/src/core/shard/runtime.h index 9958c54..0cb265e 100644 --- a/src/core/shard/runtime.h +++ b/src/core/shard/runtime.h @@ -229,6 +229,11 @@ namespace pomai::core std::unique_ptr background_job_; std::optional last_background_result_; // Set when background job completes (single-threaded) std::uint64_t wal_epoch_{0}; + + // Reusable scratch buffers for search hot path (single-threaded; avoids per-query allocations). + mutable std::vector search_candidates_scratch_; + mutable std::vector> search_segment_hits_scratch_; + mutable std::vector search_query_sums_scratch_; }; } // namespace pomai::core diff --git a/src/storage/wal/wal.cc b/src/storage/wal/wal.cc index 65f6729..cfa25fd 100644 --- a/src/storage/wal/wal.cc +++ b/src/storage/wal/wal.cc @@ -83,7 +83,12 @@ namespace pomai::storage pomai::Status Wal::Open() { - fs::create_directories(db_path_); + // Ensure WAL directory exists. + std::error_code dir_ec; + fs::create_directories(db_path_, dir_ec); + if (dir_ec) { + return pomai::Status::IoError("WAL create_directories failed"); + } gen_ = 0; while (fs::exists(SegmentPath(gen_))) @@ -142,8 +147,21 @@ namespace pomai::storage if (impl_) { - (void)impl_->file.SyncData(); - (void)impl_->file.Close(); + // Best-effort durability on closing current segment; surface serious I/O errors. + auto st_sync = impl_->file.SyncData(); + auto st_close = impl_->file.Close(); + if (!st_sync.ok()) { + impl_->~Impl(); + palloc_free(impl_); + impl_ = nullptr; + return st_sync; + } + if (!st_close.ok()) { + impl_->~Impl(); + palloc_free(impl_); + impl_ = nullptr; + return st_close; + } impl_->~Impl(); palloc_free(impl_); impl_ = nullptr; diff --git a/src/util/logging.cc b/src/util/logging.cc index 7038715..b385016 100644 --- a/src/util/logging.cc +++ b/src/util/logging.cc @@ -2,9 +2,30 @@ #include #include #include +#include namespace pomai::util { + namespace + { + void DefaultFatalHandler(const std::string& /*message*/) + { + std::abort(); + } + + FatalHandler g_fatal_handler = DefaultFatalHandler; + } + + void SetFatalHandler(FatalHandler handler) + { + g_fatal_handler = handler ? handler : DefaultFatalHandler; + } + + FatalHandler GetFatalHandler() + { + return g_fatal_handler; + } + Logger::Logger() : min_level_(LogLevel::kWarn) { const char* env = std::getenv("POMAI_LOG_LEVEL"); @@ -36,13 +57,12 @@ namespace pomai::util void Logger::Write(LogLevel level, std::source_location loc, const std::string& message) { - // 1. Timestamp auto now = std::chrono::system_clock::now(); auto in_time_t = std::chrono::system_clock::to_time_t(now); auto ms = std::chrono::duration_cast(now.time_since_epoch()) % 1000; - std::cout << "[" << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S") + std::cout << "[" << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S") << "." << std::setfill('0') << std::setw(3) << ms.count() << "] "; // 2. Level with Color @@ -64,11 +84,8 @@ namespace pomai::util // 4. Message std::cout << message << std::endl; - - if (level == LogLevel::kFatal) std::abort(); - } - // Compat for older calls if any (manual Log function was in之前的 util/logging.h) - // We already updated the header, but if any .cc still calls the old Log(level, msg) - // we should bridge it or fix them. + if (level == LogLevel::kFatal) + GetFatalHandler()(message); + } } diff --git a/src/util/logging.h b/src/util/logging.h index c761793..dffbbd9 100644 --- a/src/util/logging.h +++ b/src/util/logging.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -17,6 +18,22 @@ namespace pomai::util kFatal }; + /** + * Fatal handler invoked when a message is logged at kFatal. + * Default behavior is to abort(); edge/embedded code can set a custom handler + * (e.g. log and return, or invoke a watchdog) instead of terminating. + */ + using FatalHandler = void (*)(const std::string& message); + + /** + * Set the fatal handler. Pass nullptr to restore the default (abort). + * Thread-safety: set before any fatal log; handler is read when fatal is logged. + */ + void SetFatalHandler(FatalHandler handler); + + /** Return the current fatal handler (never nullptr; default aborts). */ + FatalHandler GetFatalHandler(); + /** * @brief Premium Logger for PomaiDB. * Use POMAI_LOG_* macros for automatic file/line capture. diff --git a/src/util/posix_file.cc b/src/util/posix_file.cc index 70fd060..e900419 100644 --- a/src/util/posix_file.cc +++ b/src/util/posix_file.cc @@ -25,7 +25,7 @@ namespace pomai::util { if (this != &other) { - Close(); + (void)Close(); fd_ = other.fd_; other.fd_ = -1; } diff --git a/tests/integ/db_backpressure_test.cc b/tests/integ/db_backpressure_test.cc new file mode 100644 index 0000000..a3ebf8f --- /dev/null +++ b/tests/integ/db_backpressure_test.cc @@ -0,0 +1,85 @@ +// Integration tests for embedded Database backpressure: memtable limits, +// auto_freeze_on_pressure, TryFreezeIfPressured, and ResourceExhausted semantics. + +#include "tests/common/test_main.h" +#include "tests/common/test_tmpdir.h" + +#include +#include + +#include "pomai/database.h" +#include "pomai/status.h" +#include "pomai/types.h" + +namespace { + +std::vector MakeVec(std::uint32_t dim, float base) { + std::vector v(dim); + for (std::uint32_t i = 0; i < dim; ++i) + v[i] = base + static_cast(i) * 0.001f; + return v; +} + +POMAI_TEST(Embedded_Backpressure_AutoFreezeReducesPressure) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("backpressure_auto"); + opt.dim = 32; + opt.metric = pomai::MetricType::kL2; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.max_memtable_mb = 2; + opt.pressure_threshold_percent = 80; + opt.auto_freeze_on_pressure = true; + + pomai::Database db; + pomai::Status st = db.Open(opt); + POMAI_EXPECT_OK(st); + + for (pomai::VectorId id = 0; id < 500; ++id) { + auto v = MakeVec(opt.dim, static_cast(id) * 0.01f); + st = db.AddVector(id, v); + POMAI_EXPECT_TRUE(st.ok() || st.code() == pomai::ErrorCode::kResourceExhausted); + if (!st.ok()) break; + } + pomai::Status close_st = db.Close(); + POMAI_EXPECT_OK(close_st); +} + +POMAI_TEST(Embedded_Backpressure_TryFreezeIfPressured_UnderPressure) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("backpressure_try"); + opt.dim = 64; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.max_memtable_mb = 1; + opt.pressure_threshold_percent = 50; + opt.auto_freeze_on_pressure = true; + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + + for (pomai::VectorId id = 0; id < 200; ++id) { + auto v = MakeVec(opt.dim, static_cast(id)); + pomai::Status st = db.AddVector(id, v); + POMAI_EXPECT_TRUE(st.ok() || st.code() == pomai::ErrorCode::kResourceExhausted); + } + pomai::Status try_st = db.TryFreezeIfPressured(); + POMAI_EXPECT_OK(try_st); + POMAI_EXPECT_OK(db.Close()); +} + +POMAI_TEST(Embedded_Backpressure_GetMemTableBytesUsed) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("backpressure_bytes"); + opt.dim = 16; + opt.fsync = pomai::FsyncPolicy::kNever; + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + std::size_t used0 = db.GetMemTableBytesUsed(); + auto v = MakeVec(opt.dim, 1.0f); + POMAI_EXPECT_OK(db.AddVector(1, v)); + std::size_t used1 = db.GetMemTableBytesUsed(); + POMAI_EXPECT_TRUE(used1 >= used0); + POMAI_EXPECT_OK(db.Close()); +} + +} // namespace diff --git a/tests/integ/db_edge_workload_test.cc b/tests/integ/db_edge_workload_test.cc new file mode 100644 index 0000000..2ac32d4 --- /dev/null +++ b/tests/integ/db_edge_workload_test.cc @@ -0,0 +1,107 @@ +// Integration tests for edge-like workloads: IndexParams::ForEdge(), repeated +// freeze/compact cycles, and small resource limits. + +#include "tests/common/test_main.h" +#include "tests/common/test_tmpdir.h" + +#include +#include + +#include "pomai/database.h" +#include "pomai/options.h" +#include "pomai/search.h" +#include "pomai/status.h" +#include "pomai/types.h" + +namespace { + +std::vector MakeVec(std::uint32_t dim, float base) { + std::vector v(dim); + for (std::uint32_t i = 0; i < dim; ++i) + v[i] = base + static_cast(i) * 0.001f; + return v; +} + +POMAI_TEST(Embedded_Edge_ForEdgeParams_SearchCorrect) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("edge_for_edge"); + opt.dim = 24; + opt.metric = pomai::MetricType::kL2; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.index_params = pomai::IndexParams::ForEdge(); + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + + for (pomai::VectorId id = 0; id < 100; ++id) { + auto v = MakeVec(opt.dim, static_cast(id) * 0.1f); + POMAI_EXPECT_OK(db.AddVector(id, v)); + } + POMAI_EXPECT_OK(db.Freeze()); + + pomai::SearchResult out; + std::vector q = MakeVec(opt.dim, 5.0f); + POMAI_EXPECT_OK(db.Search(q, 10, &out)); + POMAI_EXPECT_TRUE(!out.hits.empty()); + POMAI_EXPECT_TRUE(out.hits.size() <= 10); + + POMAI_EXPECT_OK(db.Close()); +} + +POMAI_TEST(Embedded_Edge_RepeatedFreezeCycles) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("edge_freeze_cycles"); + opt.dim = 16; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.index_params = pomai::IndexParams::ForEdge(); + opt.max_memtable_mb = 1; + opt.auto_freeze_on_pressure = true; + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + + for (int cycle = 0; cycle < 5; ++cycle) { + for (pomai::VectorId i = 0; i < 30; ++i) { + pomai::VectorId id = static_cast(cycle * 1000 + i); + auto v = MakeVec(opt.dim, static_cast(id)); + POMAI_EXPECT_OK(db.AddVector(id, v)); + } + POMAI_EXPECT_OK(db.Freeze()); + } + + pomai::SearchResult out; + std::vector q = MakeVec(opt.dim, 0.0f); + POMAI_EXPECT_OK(db.Search(q, 20, &out)); + POMAI_EXPECT_TRUE(out.hits.size() <= 20); + POMAI_EXPECT_OK(db.Close()); +} + +POMAI_TEST(Embedded_Edge_SmallDimSmallBatch) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("edge_small"); + opt.dim = 8; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.index_params = pomai::IndexParams::ForEdge(); + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + + std::vector ids; + std::vector> vecs; + for (int i = 0; i < 20; ++i) { + ids.push_back(static_cast(i)); + vecs.push_back(MakeVec(opt.dim, static_cast(i))); + } + std::vector> spans; + for (const auto& v : vecs) spans.push_back(v); + POMAI_EXPECT_OK(db.AddVectorBatch(ids, spans)); + POMAI_EXPECT_OK(db.Freeze()); + + pomai::SearchResult out; + std::vector q = MakeVec(opt.dim, 10.0f); + POMAI_EXPECT_OK(db.Search(q, 5, &out)); + POMAI_EXPECT_TRUE(out.hits.size() <= 5); + POMAI_EXPECT_OK(db.Close()); +} + +} // namespace diff --git a/tests/integ/db_error_paths_test.cc b/tests/integ/db_error_paths_test.cc new file mode 100644 index 0000000..9a4c869 --- /dev/null +++ b/tests/integ/db_error_paths_test.cc @@ -0,0 +1,100 @@ +// Integration tests for embedded Database error paths: invalid arguments, +// null outputs, and not-opened semantics. + +#include "tests/common/test_main.h" +#include "tests/common/test_tmpdir.h" + +#include +#include +#include + +#include "pomai/database.h" +#include "pomai/status.h" +#include "pomai/types.h" + +namespace { + +std::vector MakeVec(std::uint32_t dim, float base) { + std::vector v(dim); + for (std::uint32_t i = 0; i < dim; ++i) + v[i] = base + static_cast(i) * 0.001f; + return v; +} + +POMAI_TEST(Embedded_Error_OpenDimZero) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("err_dim0"); + opt.dim = 0; + pomai::Database db; + pomai::Status st = db.Open(opt); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +POMAI_TEST(Embedded_Error_OpenPathEmpty) { + pomai::EmbeddedOptions opt; + opt.path = ""; + opt.dim = 8; + pomai::Database db; + pomai::Status st = db.Open(opt); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +POMAI_TEST(Embedded_Error_AddVectorNotOpened) { + pomai::Database db; + std::vector v(8, 1.0f); + pomai::Status st = db.AddVector(1, v); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +POMAI_TEST(Embedded_Error_SearchNotOpened) { + pomai::Database db; + std::vector q(8, 0.0f); + pomai::SearchResult out; + pomai::Status st = db.Search(q, 10, &out); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +POMAI_TEST(Embedded_Error_SearchNullOut) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("err_search_null"); + opt.dim = 8; + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + std::vector q(8, 0.0f); + pomai::Status st = db.Search(q, 10, nullptr); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); + POMAI_EXPECT_OK(db.Close()); +} + +POMAI_TEST(Embedded_Error_GetSnapshotNullOut) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("err_snap_null"); + opt.dim = 8; + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + pomai::Status st = db.GetSnapshot(nullptr); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); + POMAI_EXPECT_OK(db.Close()); +} + +POMAI_TEST(Embedded_Error_TryFreezeNotOpened) { + pomai::Database db; + pomai::Status st = db.TryFreezeIfPressured(); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +POMAI_TEST(Embedded_Error_FlushNotOpened) { + pomai::Database db; + pomai::Status st = db.Flush(); + POMAI_EXPECT_TRUE(!st.ok()); + POMAI_EXPECT_TRUE(st.code() == pomai::ErrorCode::kInvalidArgument); +} + +} // namespace diff --git a/tests/tsan/backpressure_tsan_test.cc b/tests/tsan/backpressure_tsan_test.cc new file mode 100644 index 0000000..addedc4 --- /dev/null +++ b/tests/tsan/backpressure_tsan_test.cc @@ -0,0 +1,55 @@ +// TSAN test: stress embedded Database with backpressure (low memtable limit) +// to ensure no data races when auto-freeze and TryFreezeIfPressured run. + +#include "tests/common/test_main.h" +#include "tests/common/test_tmpdir.h" + +#include +#include + +#include "pomai/database.h" +#include "pomai/search.h" +#include "pomai/status.h" +#include "pomai/types.h" + +namespace { + +std::vector MakeVec(std::uint32_t dim, float base) { + std::vector v(dim); + for (std::uint32_t i = 0; i < dim; ++i) + v[i] = base + static_cast(i) * 0.001f; + return v; +} + +POMAI_TEST(Tsan_EmbeddedBackpressure_WriteAndFreeze) { + pomai::EmbeddedOptions opt; + opt.path = pomai::test::TempDir("tsan_backpressure"); + opt.dim = 32; + opt.fsync = pomai::FsyncPolicy::kNever; + opt.max_memtable_mb = 1; + opt.pressure_threshold_percent = 70; + opt.auto_freeze_on_pressure = true; + opt.index_params = pomai::IndexParams::ForEdge(); + + pomai::Database db; + POMAI_EXPECT_OK(db.Open(opt)); + + for (int round = 0; round < 3; ++round) { + for (pomai::VectorId id = 0; id < 80; ++id) { + pomai::VectorId gid = static_cast(round * 1000 + id); + auto v = MakeVec(opt.dim, static_cast(gid)); + pomai::Status st = db.AddVector(gid, v); + POMAI_EXPECT_TRUE(st.ok() || st.code() == pomai::ErrorCode::kResourceExhausted); + } + POMAI_EXPECT_OK(db.TryFreezeIfPressured()); + POMAI_EXPECT_OK(db.Freeze()); + } + + pomai::SearchResult out; + std::vector q = MakeVec(opt.dim, 0.0f); + POMAI_EXPECT_OK(db.Search(q, 10, &out)); + POMAI_EXPECT_TRUE(out.hits.size() <= 10); + POMAI_EXPECT_OK(db.Close()); +} + +} // namespace diff --git a/tests/unit/logging_test.cc b/tests/unit/logging_test.cc index 04ed3fd..65d7ec0 100644 --- a/tests/unit/logging_test.cc +++ b/tests/unit/logging_test.cc @@ -1,5 +1,12 @@ #include "tests/common/test_main.h" #include "util/logging.h" +#include + +static std::string g_fatal_message; + +static void NoAbortFatalHandler(const std::string& message) { + g_fatal_message = message; +} POMAI_TEST(Logging_Basic) { // This test primarily verifies that the logging macros compile and run. @@ -17,3 +24,15 @@ POMAI_TEST(Logging_Basic) { POMAI_LOG_INFO("This should NOT be visible"); POMAI_LOG_WARN("This SHOULD be visible"); } + +POMAI_TEST(Logging_FatalHandler) { + // Verify configurable fatal handler: use a handler that does not abort. + pomai::util::FatalHandler prev = pomai::util::GetFatalHandler(); + pomai::util::SetFatalHandler(NoAbortFatalHandler); + g_fatal_message.clear(); + pomai::util::Logger::Instance().SetLevel(pomai::util::LogLevel::kFatal); + POMAI_LOG_FATAL("Fatal test message {}", 123); + POMAI_EXPECT_TRUE(!g_fatal_message.empty()); + POMAI_EXPECT_TRUE(g_fatal_message.find("123") != std::string::npos); + pomai::util::SetFatalHandler(prev); +}