diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d6fc30fa..6363e05f0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -239,7 +239,7 @@ message(STATUS "Binary name: ${NDD_BINARY_NAME}") # Create the target -add_executable(${NDD_BINARY_NAME} src/main.cpp ${LMDB_SOURCES} third_party/roaring_bitmap/roaring.c) +add_executable(${NDD_BINARY_NAME} src/main.cpp src/core/ndd.cpp src/storage/id_mapper.cpp ${LMDB_SOURCES} third_party/roaring_bitmap/roaring.c) # Set MDBX-specific compile flags set_source_files_properties(${LMDB_SOURCES} PROPERTIES diff --git a/src/core/ndd.cpp b/src/core/ndd.cpp new file mode 100644 index 000000000..ccf29e866 --- /dev/null +++ b/src/core/ndd.cpp @@ -0,0 +1,525 @@ +#include +#include "ndd.hpp" + +template +void insert_or_throw(Map& map, Key&& key, Value&& value) { + auto [it, inserted] = map.try_emplace( + std::forward(key), std::forward(value)); + if (!inserted) { + throw std::runtime_error("Duplicate key: " + it->first); + } +} + +/*TODO: Critical*/ +void IndexManager::newloadIndex(const std::string& index_id){ + std::runtime_error("IndexManager::newloadIndex is not implemented"); +} + +std::pair IndexManager::newcreateIndex(std::string& username, + UserType user_type, std::string& index_name, + std::vector dense_indexes, + std::vector sparse_indexes) +{ + std::pair ret; + ret.first = true; + ret.second = ""; + + std::filesystem::space_info space_info; + std::error_code ec; + bool committed = false; + std::string index_path = ""; + std::unordered_map> dense_cache_map; + std::unordered_map> sparse_cache_map; + std::shared_ptr dense_sub_index_cache; + std::shared_ptr sparse_sub_index_cache; + + // Cleanup guard — removes partial artifacts if we don't reach commit + // TODO: This function is not complete + auto cleanup = [&]() { + if (committed) + return; + printf("%s: cleanup triggered\n", __func__); + // { + // std::unique_lock lock(indices_mutex_); + // indices_.erase(index_id); + // indices_list_.remove(index_id); + // } + // // Remove partial directories + std::error_code ec; + // std::filesystem::remove_all(index_path, ec); + // // Log ec if needed, but don't throw + }; + + /** + * Check if index name already exists + */ + std::string index_id = username + "/" + index_name; + auto existing_indices = metadata_manager_->listUserIndexes(username); + for(const auto& existing : existing_indices) { + if(existing.first == index_name) { + ret.first = false; + ret.second = "index_name: " + index_name + " already exists."; + goto exit_newcreateIndex; + } + } + // check if it exists in the filesystem + index_path = data_dir_ + "/" + index_id; + if(std::filesystem::exists(index_path)) { + throw std::runtime_error("index_name: " + index_name + " already exists."); + } + + // Check if there is enough space on the disk + space_info = std::filesystem::space(data_dir_, ec); + // std::cout << "space available: " << space_info.available/GB << "GB \n"; + if (!ec && space_info.available < settings::MINIMUM_REQUIRED_FS_BYTES) { + throw std::runtime_error("Insufficient disk space to create index"); + } + + // Check if there exist any sub indexes + if(dense_indexes.size() == 0 && sparse_indexes.size() == 0){ + throw std::runtime_error("No dense or sparse indexes passed"); + } + + // LOG_INFO("Creating IDMapper for index " + // << index_id << " with user type: " << userTypeToString(user_type)); + + try{ + std::string lmdb_dir = index_path + "/ids"; + std::string vec_data_dir = index_path + "/vectors"; + + /** + * TODO: add error handing while creating directories here. + * check duplicate creates for the same name. + */ + + std::filesystem::create_directory(index_path); + std::filesystem::create_directory(vec_data_dir); + + auto id_mapper = std::make_shared(lmdb_dir, true, user_type); + + for(int i=0; i< dense_indexes.size(); i++){ + auto& dense_sub_index = dense_indexes[i]; + dense_sub_index_cache = std::make_shared(); + + /** + * Check limits for this user's type + */ + if(dense_sub_index.size_in_millions > getMaxVectorsPerIndex(user_type)){ + ret.first = false; + ret.second = "Size in millions is greater than max allowed : " + std::to_string(dense_sub_index.size_in_millions); + goto exit_newcreateIndex_cleanup; + } + + std::cout << "space type: " << dense_sub_index.space_type_str << "\n"; + hnswlib::SpaceType space_type = hnswlib::getSpaceType(dense_sub_index.space_type_str); + + dense_sub_index_cache->vector_store = std::make_shared( + vec_data_dir + "/vectors_" + dense_sub_index.sub_index_name, + dense_sub_index.dim, dense_sub_index.quant_level); + + + dense_sub_index_cache->alg = std::make_unique>( + dense_sub_index.max_elements, + space_type, + dense_sub_index.dim, + dense_sub_index.M, + dense_sub_index.ef_construction, + settings::RANDOM_SEED, + dense_sub_index.quant_level, + dense_sub_index.checksum); + + dense_sub_index_cache->alg->setVectorFetcher([vs = dense_sub_index_cache->vector_store] + (ndd::idInt label, uint8_t* buffer) { + return vs->get_vector_bytes(label, buffer); + } + ); + + /* add this dense_sub_index_cache entry to dense_map*/ + auto[it, inserted] = dense_cache_map.insert({dense_sub_index.sub_index_name, std::move(dense_sub_index_cache)}); + if(!inserted){ + LOG_ERROR("Duplicate sub index name: " + dense_sub_index.sub_index_name); + ret.first = false; + ret.second = "duplicate sub index_name: " + dense_sub_index.sub_index_name; + goto exit_newcreateIndex_cleanup; + } + } + + /** + * TODO: Do a for loop for all sparse vectors + */ + // for(int i=0; i< sparse_indexes.size(); i++){ + // } + + + //add NewCacheEntry against index name to IndexManager.newindices_ + { + auto cache_entry = NewCacheEntry::create(index_path, index_id, id_mapper, + std::move(dense_cache_map), + std::move(sparse_cache_map), + std::chrono::system_clock::now()); + if(!cache_entry){ + ret.first = false; + ret.second = "unable to allocate NewCacheEntry"; + goto exit_newcreateIndex_cleanup; + } + + std::unique_lock lock(indices_mutex_); + + auto[it, inserted] = newindices_.emplace(index_id, std::move(cache_entry)); + if(!inserted){ + ret.first = false; + ret.second = "found a duplicate entry in newindices_"; + goto exit_newcreateIndex_cleanup; + } + it->second->markUpdated(); + indices_list_.push_front(index_id); + } + + /* + // TESTING CODE ONLY: Print dense sub-indexes + for (const auto& [idx_id, entry] : newindices_) { + LOG_INFO("Index: " << entry->index_id); + + // Print dense sub-indexes + LOG_INFO(" Dense sub-indexes (" << entry->dense_vectors.size() << "):"); + for (const auto& [sub_name, dense_entry] : entry->dense_vectors) { + LOG_INFO(" subvec_name: [" << sub_name << "]" + << " dim=" << dense_entry->alg->getDimension() + << " max_elements=" << dense_entry->alg->getMaxElements() + << " elements=" << dense_entry->alg->getElementsCount() + << " M=" << dense_entry->alg->getM() + << " ef_construction=" << dense_entry->alg->getEfConstruction() + << " space=" << dense_entry->alg->getSpaceTypeStr() + << " quant=" << static_cast(dense_entry->alg->getQuantLevel()) + << " checksum=" << dense_entry->alg->getChecksum() + << " remaining_capacity=" << dense_entry->alg->getRemainingCapacity() + ); + } + } + */ + + goto exit_newcreateIndex; + } catch (...){ + cleanup(); + throw; + } + +exit_newcreateIndex_cleanup: +cleanup(); + +exit_newcreateIndex: + return ret; +} + + +/** + * new impl. of getIndexEntry. Copies the logic as is + * XXX: Logic is incomplete. Check the function impl. + */ +std::shared_ptr IndexManager::newgetIndexEntry(std::string &index_id){ + + /*First try with reader's lock*/ + { + //std::shared_lock read_lock(indices_mutex_); + auto it = newindices_.find(index_id); + if(it != newindices_.end()) { + return it->second; + } + } + + /*Try with writer's lock*/ + { + /** + * XXX: incomplete IMPL. phase. This code snippet should not be called right now. + * This is because all the required functions are not implemented. + */ + return nullptr; + + std::unique_lock write_lock(indices_mutex_); + auto it = newindices_.find(index_id); + if(it == newindices_.end()) { + newloadIndex(index_id); // modifies indices_ [NOT IMPLEMENTED] + evictIfNeeded(); // Clean eviction only + } + it = newindices_.find(index_id); + if(it == newindices_.end()) { + return nullptr; + // throw std::runtime_error("[ERROR] Failed to load index"); + } + return it->second; + } +} + +/** + * Adds list of named vectors. + * XXX: Things that are omitted for now: + * 1. WAL + * 2. Sparse vector support + */ +std::pair IndexManager::addNamedVectors(std::string& index_id, + std::vector& vectors) +{ + std::pair ret; + std::shared_ptr index_cache_entry = nullptr; + std::vector> meta_batch; + std::vector> filter_batch; + ret.first = true; + ret.second = ""; + + if(vectors.empty()) { + ret.first = false; + ret.second = "no vectors to add"; + LOG_ERROR(ret.second); + goto exit_addNamedVectors; + } + +#if 0 + std::cout << "index_id: " << index_id << std::endl; + + // Debug: print all inserted vectors + for (const auto& gvo : vectors) { + std::cout << "=== Vector ID: " << gvo.id << " ===" << std::endl; + std::cout << " Filter: " << gvo.filter << std::endl; + std::cout << " Meta: " << std::string(gvo.meta.begin(), gvo.meta.end()) << std::endl; + + for (const auto& [name, dvo] : gvo.dense_vectors) { + std::cout << " Dense [" << name << "] norm=" << dvo.norm << " vector=["; + for (size_t i = 0; i < dvo.vector.size(); ++i) { + if (i > 0) std::cout << ", "; + std::cout << dvo.vector[i]; + } + std::cout << "]" << std::endl; + } + + for (const auto& [name, svo] : gvo.sparse_vectors) { + std::cout << " Sparse [" << name << "] indices=["; + for (size_t i = 0; i < svo.sparse_ids.size(); ++i) { + if (i > 0) std::cout << ", "; + std::cout << svo.sparse_ids[i] << ":" << svo.sparse_values[i]; + } + std::cout << "]" << std::endl; + } + } + std::cout << "Total vectors inserted: " << vectors.size() << std::endl; + +#endif //if 0 + + /* Get index from index_id*/ + index_cache_entry = newgetIndexEntry(index_id); + if(!index_cache_entry){ + ret.first = false; + ret.second = "Could not find index: " + index_id; + + /*For now*/ + ret.second += " XXXX: THIS IS BECAUSE the IMPLEMENTATION OF newgetIndexEntry is incomplete"; + LOG_INFO(ret.second); + goto exit_addNamedVectors; + } + + std::cout << "index_cache_entry.index_id - " << index_cache_entry->index_id << std::endl; + + /** + * TODO: Critical + * Skipping usage of WAL right now. + * We have to decide if WAL needs to be a per sub-index concept or not + */ + + + /* Create intID for each StringID using IDMapper*/ + /** + * DELETES NOT SUPPORTED + * XXX: Here we check if there have been deletes before calling + * create_ids_batch appropriately. Right now it is not clear how + * deletes will be done - hence we aren't checking deletes, just creating ids. + */ + if(!index_cache_entry->id_mapper->newcreate_ids_batch(vectors, nullptr)){ + ret.first = false; + ret.second = "Could not create IDs for: " + index_id; + goto exit_addNamedVectors; + } + + /*DEBUGGING ONLY */ +#if 0 + for (size_t i = 0; i < vectors.size(); ++i) { + const auto& obj = vectors[i]; + + std::cout << "vector[" << i << "] " + << "id: " << obj.id + << " numeric_id: (" + << obj.numeric_id.first << ", " + << std::boolalpha << obj.numeric_id.second << ")\n"; + } +#endif //if 0 + + for (size_t i = 0; i < vectors.size(); ++i) { + const auto& obj = vectors[i]; + ndd::VectorMeta meta; + + meta.id = obj.id; //string id + meta.filter = obj.filter; + meta.meta = obj.meta; + + meta_batch.emplace_back(obj.numeric_id.first, std::move(meta)); + + /*populate the filter*/ + if(!obj.filter.empty()){ + filter_batch.emplace_back(obj.numeric_id.first, obj.filter); + } + + // Print vector object + std::cout << "=== vector[" << i << "] ===" << std::endl; + std::cout << " id: " << obj.id + << " numeric_id: (" << obj.numeric_id.first + << ", " << std::boolalpha << obj.numeric_id.second << ")" + << std::endl; + std::cout << " filter: " << obj.filter << std::endl; + + // Print corresponding meta_batch entry + auto& mb = meta_batch.back(); + std::cout << " meta_batch -> numeric_id: " << mb.first + << ", meta.id: " << mb.second.id + << ", meta.filter: " << mb.second.filter + << std::endl; + + // Print filter_batch entry (if added this iteration) + if (!obj.filter.empty()) { + auto& fb = filter_batch.back(); + std::cout << " filter_batch -> numeric_id: " << fb.first + << ", filter: " << fb.second + << std::endl; + } + } + + // /** + // * Add filter and metadata [it is a per index property] + // */ + index_cache_entry->meta_store_->store_meta_batch(meta_batch); + + if(!filter_batch.empty()) { + index_cache_entry->filter_store_->add_filters_from_json_batch(filter_batch); + } + + /*RESTART FROM HERE*/ + + /** + * create a per-subindex list and then do for every. + */ + + /** + * Now iterate over each named sub index and save them individually + */ + + /** + * for each subindex: + * 1. quantize the vector based on its individual quantization level + * 2. TODO ... + */ + + /*TODO: Sparse Vectors support*/ + +exit_addNamedVectors: + return ret; +} + + +/** + * returns if index config is sane + */ +std::pair check_index_config_sanity(struct NewIndexConfig index_config){ + std::pair ret; + ret.first = true; + ret.second = ""; + + if(index_config.dim < settings::MIN_DIMENSION || index_config.dim > settings::MAX_DIMENSION) { + ret.first = false; + ret.second += "Invalid dimension: " + std::to_string(index_config.dim) + + ". Should be between " + std::to_string(settings::MIN_DIMENSION) + + " and " + std::to_string(settings::MAX_DIMENSION); + LOG_ERROR(ret.second); + return ret; + } + + if(index_config.M < settings::MIN_M || index_config.M > settings::MAX_M) { + ret.first = false; + ret.second += "Invalid M: " + std::to_string(index_config.M) + + ". Should be between " + std::to_string(settings::MIN_M) + + " and " + std::to_string(settings::MAX_M); + LOG_ERROR(ret.second); + return ret; + } + + if(index_config.ef_construction < settings::MIN_EF_CONSTRUCT || + index_config.ef_construction > settings::MAX_EF_CONSTRUCT) + { + ret.first = false; + ret.second += "Invalid ef_con: " + std::to_string(index_config.ef_construction) + + ". Should be between " + std::to_string(settings::MIN_EF_CONSTRUCT) + + " and " + std::to_string(settings::MAX_EF_CONSTRUCT); + LOG_ERROR(ret.second); + return ret; + } + + if(index_config.quant_level == ndd::quant::QuantizationLevel::UNKNOWN){ + ret.first = false; + ret.second += "Invalid precision"; + LOG_ERROR(ret.second); + return ret; + } + + /** + * TODO: Check its need and update this as required + */ + // if(index_config.size_in_millions == 0 || + // index_config.size_in_millions > settings::MAX_SIZE_IN_MILLIONS) + // { + // ret.first = false; + // ret.second += "Invalid size_in_millions: " + std::to_string(index_config.size_in_millions) + // + ". Should be > 0 and < " + std::to_string(settings::MAX_SIZE_IN_MILLIONS); + // LOG_ERROR(ret.second); + // return ret; + // } + + /** + * TODO: Check the following: + * sub_index_name needs to be sane. + * sparse_dim needs to be of a certain max dimension. + * space type needs to be checked to a certain strings only + * what is the difference max_elements and size_in_millions ? + */ + + return ret; +} + +/** + * Check if this is okay for validating index name +std::pair validate_index_name(const std::string& name) { + // Not empty + if (name.empty()) { + return {false, "Index name cannot be empty"}; + } + + // Length limit + if (name.size() > 128) { + return {false, "Index name too long (max 128 characters)"}; + } + + // Only allow alphanumeric, hyphens, underscores + for (char c : name) { + if (!std::isalnum(c) && c != '-' && c != '_') { + return {false, "Index name contains invalid character: '" + std::string(1, c) + "'. Only alphanumeric, hyphens, and underscores allowed"}; + } + } + + // Don't allow starting with a hyphen or underscore + if (name[0] == '-' || name[0] == '_') { + return {false, "Index name must start with an alphanumeric character"}; + } + + // Block path traversal attempts + if (name.find("..") != std::string::npos || name.find('/') != std::string::npos || name.find('\\') != std::string::npos) { + return {false, "Index name contains illegal sequence"}; + } + + return {true, ""}; +} +*/ diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 9231416c2..b8d64ef5e 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -12,6 +12,7 @@ #include "msgpack_ndd.hpp" #include "quant_vector.hpp" #include "wal.hpp" +#include "log.hpp" #include "../quant/dispatch.hpp" #include "../utils/archive_utils.hpp" #include @@ -31,6 +32,24 @@ #define MAX_BACKUP_NAME_LENGTH 200 +struct NewIndexConfig { + std::string sub_index_name; + size_t dim; + size_t max_elements; + std::string space_type_str; + size_t M; + size_t ef_construction; + ndd::quant::QuantizationLevel quant_level; + int32_t checksum; + size_t size_in_millions; +}; + +struct SparseIndexConfig { + std::string sub_index_name; + size_t sparse_dim; + int32_t checksum; +}; + struct IndexConfig { size_t dim; size_t sparse_dim = 0; // 0 means dense-only @@ -55,6 +74,138 @@ struct IndexInfo { size_t ef_con; }; + +struct DenseCacheSubEntry{ + // struct CacheEntry* cache_entry; //back connection if required + std::shared_ptr vector_store; + std::unique_ptr> alg; + // Number of searches performed on this sub index. For a search with k=10 it will be 10 + size_t searchCount{0}; + // Per-sub-index operation mutex for coordinating addVectors, saveIndex, deleteVectors + std::mutex operation_mutex; + + VectorStore::Cursor getCursor(){ + return vector_store->getCursor(); + } + + ndd::quant::QuantizationLevel getQuantLevel() const { + return vector_store->getQuantLevel(); + } + + size_t dimension() const { + return vector_store->dimension(); + } + size_t get_vector_size() const { + return vector_store->get_vector_size(); + } +}; + +struct SubSparseCacheEntry{ + std::unique_ptr sparse_storage; + // Number of searches performed on this sub index. For a search with k=10 it will be 10 + size_t searchCount{0}; + // Per-sub-index operation mutex for coordinating addVectors, saveIndex, deleteVectors + std::mutex operation_mutex; +}; + +struct NewCacheEntry { + std::string index_id; //of the form "username/indexname" + std::shared_ptr id_mapper; + std::unordered_map> dense_vectors; + std::unordered_map> sparse_vectors; + std::chrono::system_clock::time_point last_access; + std::chrono::system_clock::time_point last_saved_at; + std::chrono::system_clock::time_point updated_at; + + std::unique_ptr filter_store_; + std::unique_ptr meta_store_; + + // Flag to indicate if the index has been updated + bool updated{false}; + + // Number of searches performed on this index. For a search with k=10 it will be 10 + size_t searchCount{0}; + + // Per-index operation mutex for coordinating + std::mutex operation_mutex; + + // Delete copy and move (mutex is non-movable) + NewCacheEntry(const NewCacheEntry&) = delete; + NewCacheEntry& operator=(const NewCacheEntry&) = delete; + NewCacheEntry(NewCacheEntry&&) = delete; + NewCacheEntry& operator=(NewCacheEntry&&) = delete; + + // Factory method — returns nullptr on validation failure + [[nodiscard]] static std::shared_ptr create( + std::string base_path, + std::string index_id_, + std::shared_ptr id_mapper_, + std::unordered_map> dense_, + std::unordered_map> sparse_, + std::chrono::system_clock::time_point access_time_) + { + if (!id_mapper_) { + LOG_ERROR("ID Mapper is null for index: " << index_id_); + return nullptr; + } + if (dense_.empty()) { + LOG_ERROR("Must have at least one dense sub-index for index: " << index_id_); + return nullptr; + } + // Private constructor — only accessible via this factory + return std::shared_ptr( + new NewCacheEntry(std::move(base_path), std::move(index_id_), + std::move(id_mapper_), + std::move(dense_), + std::move(sparse_), + access_time_)); + } + + void markUpdated() { + updated = true; + updated_at = std::chrono::system_clock::now(); + } + + void resetSearchCount() { searchCount = 0; } + + void updateFilter(ndd::idInt numeric_id, const std::string& new_filter_json) { + // Get existing meta + auto meta = meta_store_->get_meta(numeric_id); + + // Remove old filters + if(!meta.filter.empty()) { + filter_store_->remove_filters_from_json(numeric_id, meta.filter); + } + + // Update meta + meta.filter = new_filter_json; + meta_store_->store_meta(numeric_id, meta); + + // Add new filters + if(!new_filter_json.empty()) { + filter_store_->add_filters_from_json(numeric_id, new_filter_json); + } + } + +private: + NewCacheEntry(std::string base_path, std::string index_id_, + std::shared_ptr id_mapper_, + std::unordered_map> dense_, + std::unordered_map> sparse_, + std::chrono::system_clock::time_point access_time_) + : index_id(std::move(index_id_)) + , id_mapper(std::move(id_mapper_)) + , dense_vectors(std::move(dense_)) + , sparse_vectors(std::move(sparse_)) + , last_access(access_time_) + , last_saved_at(std::chrono::system_clock::now()) + { + meta_store_ = std::make_unique(base_path + "/meta"); + filter_store_ = std::make_unique(base_path + "/filters"); + } +}; + + struct CacheEntry { std::string index_id; size_t sparse_dim = 0; @@ -139,10 +290,14 @@ struct PersistenceConfig { bool save_on_shutdown{true}; }; + +std::pair check_index_config_sanity(struct NewIndexConfig index_config); + class IndexManager { private: std::deque indices_list_; std::unordered_map indices_; + std::unordered_map> newindices_; //index name -> its cache entry std::shared_mutex indices_mutex_; std::string data_dir_; // This is for locking the LRU @@ -291,6 +446,8 @@ class IndexManager { } } + std::shared_ptr newgetIndexEntry(std::string& index_id); + // Get index entry with proper lock management - does NOT hold locks after return CacheEntry& getIndexEntry(const std::string& index_id) { // First try to find the index without write lock @@ -332,7 +489,6 @@ class IndexManager { saveIndexInternal(entry); } -private: // Internal saveIndex implementation that doesn't call getIndexEntry // Used by functions that already have the entry and mutex void saveIndexInternal(CacheEntry& entry) { @@ -463,7 +619,6 @@ class IndexManager { return getUserPath(username) + "/" + index_name; } -public: IndexManager(size_t max_indices, const std::string& data_dir, const PersistenceConfig& persistence_config = PersistenceConfig{}) : @@ -793,6 +948,12 @@ class IndexManager { } } + std::pair newcreateIndex(std::string& username, + UserType user_type, std::string& index_name, + std::vector dense_indexes, + std::vector sparse_indexes); + + bool createIndex(const std::string& index_id, const IndexConfig& config, UserType user_type = UserType::Admin, @@ -925,11 +1086,14 @@ class IndexManager { // Use the metadata manager directly to get the list of indexes return metadata_manager_->listUserIndexes(username); } + std::vector> listAllIndexes() { // Use the metadata manager directly to get the list of indexes return metadata_manager_->listAllIndexes(); } + void newloadIndex(const std::string& index_id); + void loadIndex(const std::string& index_id) { std::string index_path = data_dir_ + "/" + index_id + "/main.idx"; std::string lmdb_dir = data_dir_ + "/" + index_id + "/ids"; @@ -1074,6 +1238,8 @@ class IndexManager { entry.alg = std::move(new_alg); } + std::pair addNamedVectors(std::string& index_id, std::vector& vectors); + template bool addVectors(const std::string& index_id, const std::vector& vectors) { try { @@ -1184,8 +1350,8 @@ class IndexManager { // Add to HNSW index in parallel using pre-quantized data from QuantVectorObject size_t available_threads = settings::NUM_PARALLEL_INSERTS; const size_t num_threads = (available_threads < quantized_vectors.size()) - ? available_threads - : quantized_vectors.size(); + ? available_threads + : quantized_vectors.size(); std::vector threads; const size_t chunk_size = (quantized_vectors.size() + num_threads - 1) / num_threads; // Ceiling division @@ -1196,8 +1362,8 @@ class IndexManager { // Calculate start and end indices for this thread size_t start_idx = t * chunk_size; size_t end_idx = (start_idx + chunk_size < quantized_vectors.size()) - ? (start_idx + chunk_size) - : quantized_vectors.size(); + ? (start_idx + chunk_size) + : quantized_vectors.size(); // Process assigned chunk of vectors for(size_t i = start_idx; i < end_idx; i++) { diff --git a/src/filter/filter.hpp b/src/filter/filter.hpp index 35bc1b5bc..34d7d365d 100644 --- a/src/filter/filter.hpp +++ b/src/filter/filter.hpp @@ -375,7 +375,7 @@ class Filter { // Batch add operation for filters void add_to_filter_batch(const std::string& filter_key, - const std::vector& numeric_ids) { + const std::vector& numeric_ids) { if(numeric_ids.empty()) { return; } @@ -518,6 +518,16 @@ class Filter { } } + /** + * Deletes filter only. + * This duplicate function is added here from its misplaced implementation in + * class VectorStorage. + * XXX: Should be removed later for code readability + */ + void deleteFilter(ndd::idInt numeric_id, std::string filter) { + remove_filters_from_json(numeric_id, filter); + } + // Combine multiple filters using AND operation ndd::RoaringBitmap combine_filters_and(const std::vector>& filters) const { @@ -546,9 +556,9 @@ class Filter { // Check if ID satisfies a numeric condition using Forward Index bool check_numeric(const std::string& field, - ndd::idInt id, - const std::string& op, - const nlohmann::json& val) const { + ndd::idInt id, + const std::string& op, + const nlohmann::json& val) const { if(op == "$eq") { uint32_t sortable_val; if(val.is_number_integer()) { diff --git a/src/main.cpp b/src/main.cpp index 40ae26b86..adfeff26f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -286,6 +286,212 @@ int main(int argc, char** argv) { return crow::response(200, response.dump()); }); + // new create index + CROW_ROUTE(app, "/api/v1/index/newcreate") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("POST"_method)([&index_manager, &app](const crow::request& req) { + + AuthMiddleware::context& ctx = app.get_context(req);; + try{ + auto body = crow::json::load(req.body); + if(!body) { + return json_error(400, "Invalid JSON"); + } + + if(!body.has("index_name") || body["index_name"].t() != crow::json::type::String){ + return json_error(400, "Parameters error: 'index_name'"); + } + + /** + * TODO:CRITICAL add an index name restriction check here + * validate_index_name in ndd.cpp + */ + std::string index_name = std::string(body["index_name"].s()); + + /** + * TODO: add a simple case where no named vectors are present. + * Only one dense vector. + * In that case, the user will not provide a name to the vector during creating + * and searching. So, there should be a way to store that this is an unnamed vector. + * + * This can be implemented once create index is written well and tested. + */ + + std::vector dense_indexes; + + if(body.has("dense_vectors")){ + auto& dense_blocks = body["dense_vectors"]; + + if (dense_blocks.t() != crow::json::type::Object) { + return json_error(400, "'dense_vectors' must be an object"); + } + + + for(auto& key: dense_blocks.keys()){ + struct NewIndexConfig index_config; + auto& config = dense_blocks[key]; + + // dim is mandatory + size_t dim; + long raw_dim; + if(!config.has("dim") || config["dim"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'dim'"); + } + else{ + raw_dim = (long)config["dim"].i(); + if(raw_dim <= 0){ + return json_error(400, "Parameters error: negative 'dim'"); + } + } + dim = raw_dim; + // std::cout << "dim: " << dim << "\n"; + + // Space_type is mandatory + std::string space_type; + if(!config.has("space_type") || config["space_type"].t() != crow::json::type::String){ + return json_error(400, "Parameters error: 'space_type'"); + } + space_type = (std::string)config["space_type"].s(); + // std::cout << "space_type: " << space_type << "\n"; + + size_t m = settings::DEFAULT_M; + if(config.has("M")){ + if(config["M"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'M'"); + } + m = (size_t)config["M"].i(); + } + // std::cout << "m: " << m << "\n"; + + size_t ef_con = settings::DEFAULT_EF_CONSTRUCT; + if(config.has("ef_con")){ + if(config["ef_con"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'ef_con'"); + } + ef_con = (size_t)config["ef_con"].i(); + } + // std::cout << "ef_con: " << ef_con << "\n"; + + ndd::quant::QuantizationLevel quant_level = ndd::quant::QuantizationLevel::INT8; + if(config.has("precision")){ + if(config["precision"].t() != crow::json::type::String){ + return json_error(400, "Parameters error: 'precision'"); + } + quant_level = stringToQuantLevel(config["precision"].s()); + } + // std::cout << "quant level: " << quantLevelToString(quant_level) << "\n"; + + int32_t checksum = -1; + if(config.has("checksum")){ + if(config["checksum"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'checksum'"); + } + checksum = config["checksum"].i(); + } + + size_t size_in_millions = 0; + if(config.has("size_in_millions")){ + if(config["size_in_millions"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'size_in_millions'"); + } + size_in_millions = config["size_in_millions"].i(); + } + + index_config = NewIndexConfig { + key, + dim, //dense_dim + settings::MAX_ELEMENTS, // max elements + space_type, + m, + ef_con, + quant_level, + checksum, + size_in_millions + }; + + std::pair sanity_ret = check_index_config_sanity(index_config); + if(!sanity_ret.first){ + return json_error(400, sanity_ret.second); + } + dense_indexes.push_back(index_config); + } + } + + // for(int i=0; i sparse_indexes; + + if(body.has("sparse_vectors")){ + auto& sparse_blocks = body["sparse_vectors"]; + + if (sparse_blocks.t() != crow::json::type::Object) { + return json_error(400, "'sparse_vectors' must be an object"); + } + + + for(auto& key: sparse_blocks.keys()){ + struct SparseIndexConfig sparse_index_config; + + auto& sparse_config = sparse_blocks[key]; + + // sparse_dim is mandatory + size_t sparse_dim; + if(!sparse_config.has("sparse_dim") || sparse_config["sparse_dim"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'sparse_dim'"); + } + sparse_dim = (size_t)sparse_config["sparse_dim"].i(); + + int32_t checksum = -1; + if(sparse_config.has("checksum")){ + if(sparse_config["checksum"].t() != crow::json::type::Number){ + return json_error(400, "Parameters error: 'checksum'"); + } + checksum = sparse_config["checksum"].i(); + } + + sparse_index_config = SparseIndexConfig{ + key, + sparse_dim, + checksum + }; + + //TODO: Add a sanity check for sparse vectors here + + sparse_indexes.push_back(sparse_index_config); + } + } + + /** + * TODO:CRITICAL Add a sparse sanity test here. + */ + + // for(int i=0; i create_index_ret = + index_manager.newcreateIndex(ctx.username, + UserType::Admin, + index_name, + dense_indexes, + sparse_indexes); + if(!create_index_ret.first){ + return json_error(400, "failed createNewIndex: " + create_index_ret.second); + } + return crow::response(200, "Index created successfully"); + + } catch(const std::runtime_error& e){ + return json_error(409, e.what()); + } catch(const std::exception& e){ + return json_error_500(ctx.username, req.url, std::string("Error: ") + e.what()); + } catch (...){ + return json_error(500, "Unknown internal error"); + } + }); + // Create index CROW_ROUTE(app, "/api/v1/index/create") .CROW_MIDDLEWARES(app, AuthMiddleware) @@ -641,7 +847,7 @@ int main(int argc, char** argv) { CROW_ROUTE(app, "/api/v1/index//search") .CROW_MIDDLEWARES(app, AuthMiddleware) .methods("POST"_method)([&index_manager, &app](const crow::request& req, - std::string index_name) { + std::string index_name) { auto& ctx = app.get_context(req); // Format full index_id std::string index_id = ctx.username + "/" + index_name; @@ -751,11 +957,149 @@ int main(int argc, char** argv) { } }); + // newinsert a list of vectors + CROW_ROUTE(app, "/api/v1/index//vector/newinsert") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("POST"_method)([&index_manager, &app](const crow::request& req, + std::string index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + auto content_type = req.get_header_value("Content-Type"); + + std::vector vectors; + + if(content_type == "application/json"){ + auto body = crow::json::load(req.body); + if(!body) { + return json_error(400, "Invalid JSON"); + } + + if(body.t() == crow::json::type::List) { + for(const auto& item : body) { + ndd::GenericVectorObject gvo; + + for(const auto& kv : item){ + std::string key = kv.key(); + if(key == "id"){ + if(kv.t() != crow::json::type::String) { + return json_error(400, "Parameter error: 'id' must be a string"); + } + gvo.id = kv.s(); + continue; + } + + if(key == "meta") { + if(kv.t() != crow::json::type::String) { + return json_error(400, "Parameter error: 'meta' must be a string"); + } + std::string meta_str = kv.s(); + gvo.meta.assign(meta_str.begin(), meta_str.end()); + continue; + } + + if(key == "filter") { + if(kv.t() != crow::json::type::String) { + return json_error(400, "Parameter error: 'filter' must be a string"); + } + gvo.filter = kv.s(); + continue; + } + + /*This should be a named vector*/ + + if(kv.t() != crow::json::type::Object) { + return json_error(400, "Field '" + key + "' must be an object"); + } + + /*Dense vector*/ + if(kv.has("vector") && kv.has("norm")) { + if(kv["vector"].t() != crow::json::type::List) { + return json_error(400, "Dense vector '" + key + "': 'vector' must be an array"); + } + ndd::DenseVectorObject dvo; + dvo.norm = static_cast(kv["norm"].d()); + for(const auto& val : kv["vector"]) { + dvo.vector.push_back(static_cast(val.d())); + } + /** + * TODO: Add a check for duplicate keys here + */ + gvo.dense_vectors[key] = std::move(dvo); + continue; + } + + /*Sparse vector*/ + if(kv.has("sparse_indices") && kv.has("sparse_values")) { + if(kv["sparse_indices"].t() != crow::json::type::List || + kv["sparse_values"].t() != crow::json::type::List) { + return json_error(400, + "Sparse vector '" + key + "': 'sparse_indices' and 'sparse_values' must be arrays"); + } + if(kv["sparse_indices"].size() != kv["sparse_values"].size()) { + return json_error(400, + "Sparse vector '" + key + "': sparse_indices and sparse_values must have same length"); + } + + ndd::SparseVectorObject svo; + for(const auto& idx : kv["sparse_indices"]) { + svo.sparse_ids.push_back(static_cast(idx.i())); + } + for(const auto& val : kv["sparse_values"]) { + svo.sparse_values.push_back(static_cast(val.d())); + } + /** + * TODO: Add a check for duplicate keys here + */ + gvo.sparse_vectors[key] = std::move(svo); + continue; + } + + /** + * At this point the request hasn't + * fallen in any of the categories. + */ + return json_error(400, "Format error"); + } + vectors.push_back(std::move(gvo)); + } + }else{ + return crow::response(400, "Body should be a list."); + } + // return crow::response(400, + // "Content-Type application/json not implemented yet"); + } + else if(content_type == "application/msgpack"){ + return crow::response(400, + "Content-Type application/msgpack not implemented yet"); + } + else{ + return crow::response(400, + "Content-Type must be application/msgpack or application/json"); + } + + /* Insert the batch of vectors to the index */ + try{ + auto ret = index_manager.addNamedVectors(index_id, vectors); + + if(!ret.first){ + return json_error(400, ret.second); + } + return crow::response(200); + + } catch(const std::runtime_error& e) { + return json_error(400, e.what()); + } catch(const std::exception& e) { + LOG_DEBUG("Batch insertion failed: " << e.what()); + return json_error_500(ctx.username, req.url, e.what()); + } + + }); + // Insert a list of vectors CROW_ROUTE(app, "/api/v1/index//vector/insert") .CROW_MIDDLEWARES(app, AuthMiddleware) .methods("POST"_method)([&index_manager, &app](const crow::request& req, - std::string index_name) { + std::string index_name) { auto& ctx = app.get_context(req); std::string index_id = ctx.username + "/" + index_name; diff --git a/src/server/auth.hpp b/src/server/auth.hpp index 0d6cdd4f5..758354abf 100644 --- a/src/server/auth.hpp +++ b/src/server/auth.hpp @@ -27,7 +27,7 @@ inline int getMaxAllowedIndices(UserType type) { // Get max vectors per index - No limits in open-source mode inline size_t getMaxVectorsPerIndex(UserType type) { - return settings::MAX_VECTORS_ADMIN; // 1 billion vectors + return settings::MAX_VECTORS_MILLION; // 1 billion vectors } struct User { diff --git a/src/storage/id_mapper.cpp b/src/storage/id_mapper.cpp new file mode 100644 index 000000000..bab90f0ab --- /dev/null +++ b/src/storage/id_mapper.cpp @@ -0,0 +1,257 @@ +#include +#include "id_mapper.hpp" + +template +bool IDMapper::newcreate_ids_batch(std::vector& vectors, void* wal_ptr) +{ + bool ret = false; + constexpr idInt INVALID_LABEL = static_cast(-1); + + if(vectors.empty()){ + return ret; + } + + LOG_DEBUG("=== create_ids_batch START ==="); + + std::vector> id_tuples; + + id_tuples.reserve(vectors.size()); + for(const auto& vec : vectors) { + // true means that the ID is new and false means that the ID already exists + // is_reused defaults to false + id_tuples.emplace_back(vec.id, INVALID_LABEL, true, false); + } + + LOG_DEBUG("--- STEP 2: LMDB database check ---"); + { + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_RDONLY, &txn); + if(rc != MDBX_SUCCESS) { + LOG_DEBUG("ERROR: Failed to begin read-only transaction: " << mdbx_strerror(rc)); + throw std::runtime_error("Failed to begin read-only transaction: " + + std::string(mdbx_strerror(rc))); + } + LOG_DEBUG("LMDB read-only transaction started successfully"); + + try { + int keys_checked = 0; + for(auto& tup : id_tuples) { + if(std::get<1>(tup) == INVALID_LABEL) { + const std::string& str_id = std::get<0>(tup); + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data; + + // Add debug logging + LOG_DEBUG("LMDB: Checking key[" << keys_checked << "]: [" << str_id + << "] size: " << str_id.size()); + keys_checked++; + + rc = mdbx_get(txn, dbi_, &key, &data); + if(rc == MDBX_SUCCESS) { + idInt existing_id = *(idInt*)data.iov_base; + LOG_DEBUG("LMDB: ✓ FOUND existing ID: " << existing_id << " for key: [" + << str_id << "]"); + std::get<1>(tup) = existing_id; + std::get<2>(tup) = false; // ID already exists + } else if(rc == MDBX_NOTFOUND) { + LOG_DEBUG("LMDB: ✗ NOT FOUND: [" << str_id << "]"); + std::get<1>(tup) = 0; + } else { + LOG_DEBUG("LMDB: ERROR for key: [" << str_id + << "] error: " << mdbx_strerror(rc)); + mdbx_txn_abort(txn); + throw std::runtime_error("Database error checking ID: " + + std::string(mdbx_strerror(rc))); + } + } + } + LOG_DEBUG("LMDB: Checked " << keys_checked << " keys in database"); + mdbx_txn_abort(txn); + LOG_DEBUG("LMDB check done"); + } catch(...) { + mdbx_txn_abort(txn); + throw; + } + } + + //Count and generate new IDs + LOG_DEBUG("--- STEP 3: Count and generate new IDs ---"); + size_t total_new_ids_needed = + std::count_if(id_tuples.begin(), id_tuples.end(), [](const auto& t) { + return std::get<1>(t) == 0; + }); + LOG_DEBUG("Total new IDs needed: " << total_new_ids_needed); + + size_t fresh_ids_count = total_new_ids_needed; + size_t deleted_index = 0; + + if(use_deleted_ids) { + // Use deleted IDs first, but ONLY for entries that are actually new (not found in DB) + std::vector deletedIds = getDeletedIds(fresh_ids_count); + + for(auto& tup : id_tuples) { + // Only assign deleted IDs to entries that are new (id=0 and is_new=true) + if(std::get<1>(tup) == 0 && std::get<2>(tup) == true + && deleted_index < deletedIds.size()) { + std::get<1>(tup) = deletedIds[deleted_index++]; + std::get<3>(tup) = true; // Mark as reused + // Keep std::get<2>(tup) as true because this still needs to be written to DB + } + } + fresh_ids_count -= deleted_index; // Reduce by actual number of deleted IDs used + } + + if(total_new_ids_needed > 0) { + LOG_DEBUG("Generating " << fresh_ids_count << " fresh IDs"); + + std::vector new_ids; + if(fresh_ids_count > 0) { + new_ids = get_next_ids(fresh_ids_count); + } + + // CRITICAL FIX: Log to WAL AFTER generating IDs (minimal risk window) + if(wal_ptr) { + WriteAheadLog* wal = static_cast(wal_ptr); + std::vector wal_entries; + + // Log reused IDs + for(const auto& tup : id_tuples) { + if(std::get<2>(tup) && std::get<1>(tup) != 0) { + wal_entries.push_back({WALOperationType::VECTOR_ADD, std::get<1>(tup)}); + } + } + + // Log fresh IDs + for(idInt id : new_ids) { + wal_entries.push_back({WALOperationType::VECTOR_ADD, id}); + } + + if(!wal_entries.empty()) { + wal->log(wal_entries); + } + } + + if(fresh_ids_count > 0 && new_ids.size() != fresh_ids_count) { + throw std::runtime_error("Mismatch: get_next_ids returned " + + std::to_string(new_ids.size()) + " but expected " + + std::to_string(fresh_ids_count)); + } + + size_t new_id_index = 0; + + // Step 4: Write txn with auto-resize retry + LOG_DEBUG("--- STEP 4: Writing to database ---"); + auto try_write = [&](MDBX_txn* txn) -> int { + int writes_attempted = 0; + for(auto& tup : id_tuples) { + // Write entries that need to be written to DB (is_new=true) but don't have ID=0 + if(std::get<2>(tup) == true && std::get<1>(tup) != 0) { + const std::string& str_id = std::get<0>(tup); + idInt id = std::get<1>(tup); + + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data{&id, sizeof(idInt)}; + + // Add debug logging for write operations + LOG_DEBUG("WRITE[" << writes_attempted << "]: key=[" << str_id + << "] size=" << str_id.size() << " ID=" << id); + writes_attempted++; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc == MDBX_MAP_FULL) { + LOG_DEBUG("WRITE ERROR: MDBX_MAP_FULL for key=[" << str_id << "]"); + return MDBX_MAP_FULL; + } + if(rc != MDBX_SUCCESS) { + LOG_DEBUG("WRITE ERROR: [" << str_id + << "] error: " << mdbx_strerror(rc)); + return rc; + } + + LOG_DEBUG("WRITE SUCCESS: [" << str_id << "] with ID: " << id); + + } else if(std::get<1>(tup) == 0) { + // Handle remaining entries that still need new IDs + if(new_id_index >= new_ids.size()) { + LOG_DEBUG("ERROR: new_id_index (" + << new_id_index << ") >= new_ids.size() (" << new_ids.size() + << ")"); + return MDBX_PROBLEM; // Internal error + } + idInt new_id = new_ids[new_id_index++]; + const std::string& str_id = std::get<0>(tup); + + MDBX_val key{(void*)str_id.c_str(), str_id.size()}; + MDBX_val data{&new_id, sizeof(idInt)}; + + writes_attempted++; + + int rc = mdbx_put(txn, dbi_, &key, &data, MDBX_UPSERT); + if(rc == MDBX_MAP_FULL) { + LOG_DEBUG("WRITE_NEW ERROR: MDBX_MAP_FULL for key=[" << str_id << "]"); + return MDBX_MAP_FULL; + } + if(rc != MDBX_SUCCESS) { + LOG_DEBUG("WRITE_NEW ERROR: [" << str_id + << "] error: " << mdbx_strerror(rc)); + return rc; + } + + std::get<1>(tup) = new_id; + } + } + return MDBX_SUCCESS; + }; + + MDBX_txn* txn; + int rc = mdbx_txn_begin(env_, nullptr, MDBX_TXN_READWRITE, &txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to begin write transaction: " + + std::string(mdbx_strerror(rc))); + } + + rc = try_write(txn); + // MDBX auto-grows, no manual resize needed + if(rc != MDBX_SUCCESS) { + mdbx_txn_abort(txn); + throw std::runtime_error("Failed to insert new IDs: " + + std::string(mdbx_strerror(rc))); + } + + rc = mdbx_txn_commit(txn); + if(rc != MDBX_SUCCESS) { + throw std::runtime_error("Failed to commit transaction: " + + std::string(mdbx_strerror(rc))); + } + LOG_DEBUG("Write transaction committed successfully"); + } else { + LOG_DEBUG("No new IDs needed, skipping write transaction"); + } + + // Final state logging + LOG_DEBUG("--- FINAL RESULTS ---"); + std::vector> result; + result.reserve(id_tuples.size()); + for(size_t i = 0; i < id_tuples.size(); i++) { + const auto& tup = id_tuples[i]; + bool is_new_to_hnsw = std::get<2>(tup); + // If the ID was reused from deleted list, treat it as an update (not new to HNSW) + if(std::get<3>(tup)) { + is_new_to_hnsw = false; + } + vectors[i].numeric_id.first = std::get<1>(tup); + vectors[i].numeric_id.second = is_new_to_hnsw; + } + ret = true; + + LOG_DEBUG("=== create_ids_batch END ==="); + +exit_newcreate_ids_batch: + return ret; +} + +template bool IDMapper::newcreate_ids_batch( + std::vector&, void*); + +template bool IDMapper::newcreate_ids_batch( + std::vector&, void*); \ No newline at end of file diff --git a/src/storage/id_mapper.hpp b/src/storage/id_mapper.hpp index eff79b30e..376673dcc 100644 --- a/src/storage/id_mapper.hpp +++ b/src/storage/id_mapper.hpp @@ -13,6 +13,7 @@ #include #include #include "../core/types.hpp" +#include "../utils/msgpack_ndd.hpp" #include "../utils/settings.hpp" using ndd::idInt; @@ -79,11 +80,18 @@ class IDMapper { mdbx_env_close(env_); } + /** + * is same as create_ids_batch + * Takes a vector of ndd::GenericVectorObject and adds the IDs to GenericVectorObject's pair + */ + template bool newcreate_ids_batch(std::vector& vectors, void* wal_ptr = nullptr); + + // Create string ID to numeric ID mapping. If string ids exists in the database, it will return // the existing numeric ID along with flag It will also use old numeric IDs of deleted points template std::vector> create_ids_batch(const std::vector& str_ids, - void* wal_ptr = nullptr) { + void* wal_ptr = nullptr) { if(str_ids.empty()) { return {}; } diff --git a/src/storage/vector_storage.hpp b/src/storage/vector_storage.hpp index 993c63bd7..1455635ef 100644 --- a/src/storage/vector_storage.hpp +++ b/src/storage/vector_storage.hpp @@ -396,7 +396,7 @@ class MetaStore { int rc = mdbx_txn_commit(txn); if(rc != MDBX_SUCCESS) { throw std::runtime_error("Failed to commit transaction: " - + std::string(mdbx_strerror(rc))); + + std::string(mdbx_strerror(rc))); } }; @@ -494,136 +494,14 @@ class VectorStorage { std::unique_ptr filter_store_; VectorStorage(const std::string& base_path, - size_t vector_dim, - ndd::quant::QuantizationLevel quant_level) { + size_t vector_dim, + ndd::quant::QuantizationLevel quant_level) + { vector_store_ = std::make_unique(base_path + "/vectors", vector_dim, quant_level); meta_store_ = std::make_unique(base_path + "/meta"); filter_store_ = std::make_unique(base_path + "/filters"); } - VectorStore::Cursor getCursor() { return vector_store_->getCursor(); } - // Get numeric ids of matching filters - std::vector getIdsMatchingFilters( - const std::vector>& filter_pairs) const { - auto bitmap = filter_store_->combine_filters_and(filter_pairs); - std::vector numeric_ids; - bitmap.iterate( - [](ndd::idInt value, void* ptr) -> bool { - auto* ids = static_cast*>(ptr); - ids->push_back(value); - return true; - }, - &numeric_ids); - return numeric_ids; - } - - bool matches_filter(ndd::idInt numeric_id, - const ndd::VectorMeta& meta, - const nlohmann::json& filter_query) { - if(filter_query.empty()) { - return true; - } - - // 1. Fast Pass: Check Numeric Filters using Index - bool has_non_numeric = false; - - for(const auto& condition : filter_query) { - if(!condition.is_object() || condition.size() != 1) { - continue; - } - const auto& field = condition.begin().key(); - const auto& expr = condition.begin().value(); - if(!expr.is_object() || expr.size() != 1) { - continue; - } - - const std::string op = expr.begin().key(); - const auto& val = expr.begin().value(); - - bool is_numeric_query = false; - if(op == "$range") { - is_numeric_query = true; - } else if(op == "$eq" && (val.is_number())) { - is_numeric_query = true; - } else if(op == "$in" && val.is_array() && !val.empty() && val[0].is_number()) { - is_numeric_query = true; - } - - if(is_numeric_query) { - if(!filter_store_->check_numeric(field, numeric_id, op, val)) { - return false; - } - } else { - has_non_numeric = true; - } - } - - if(!has_non_numeric) { - return true; - } - - try { - // Parse the metadata associated with the vector - nlohmann::json meta_filter = nlohmann::json::parse(meta.filter); - - // Each filter clause is ANDed - for(const auto& condition : filter_query) { - if(!condition.is_object() || condition.size() != 1) { - continue; // Skip malformed conditions - } - - const auto& field = condition.begin().key(); - const auto& expr = condition.begin().value(); - - if(!expr.is_object() || expr.size() != 1) { - continue; - } - - const std::string op = expr.begin().key(); - const auto& val = expr.begin().value(); - - // Skip numeric queries as they are already checked - bool is_numeric_query = false; - if(op == "$range") { - is_numeric_query = true; - } else if(op == "$eq" && (val.is_number())) { - is_numeric_query = true; - } else if(op == "$in" && val.is_array() && !val.empty() && val[0].is_number()) { - is_numeric_query = true; - } - - if(is_numeric_query) { - continue; - } - - // If field is not present in the vector's metadata - if(!meta_filter.contains(field)) { - return false; - } - - const auto& actual_value = meta_filter[field]; - - if(op == "$eq") { - if(actual_value != val) { - return false; - } - } else if(op == "$in") { - if(!val.is_array() - || std::find(val.begin(), val.end(), actual_value) == val.end()) { - return false; - } - } else { - continue; - } - } - - return true; - - } catch(const std::exception& e) { - // std::cerr << "Error matching filter: " << e.what() << std::endl; - return false; - } - } // Optimized batch operation using pre-quantized QuantVectorObject // This avoids double quantization by using already quantized data @@ -687,24 +565,6 @@ class VectorStorage { return meta_store_->get_meta(numeric_id); } - // NOT used anymore. Deletes filter, meta and vector data. - void deletePoint(ndd::idInt numeric_id) { - try { - // Get metadata first to get filter info - auto meta = meta_store_->get_meta(numeric_id); - - // Remove filter entries if they exist - if(!meta.filter.empty()) { - filter_store_->remove_filters_from_json(numeric_id, meta.filter); - } - // Try to remove both vector and meta data - vector_store_->remove(numeric_id); - meta_store_->remove(numeric_id); - } catch(const std::exception& e) { - throw std::runtime_error(std::string("Failed to remove vector and metadata: ") - + e.what()); - } - } // Deletes filter only. void deleteFilter(ndd::idInt numeric_id, std::string filter) { filter_store_->remove_filters_from_json(numeric_id, filter); @@ -730,7 +590,9 @@ class VectorStorage { } } + VectorStore::Cursor getCursor(){ return vector_store_->getCursor();} ndd::quant::QuantizationLevel getQuantLevel() const { return vector_store_->getQuantLevel(); } size_t dimension() const { return vector_store_->dimension(); } size_t get_vector_size() const { return vector_store_->get_vector_size(); } -}; \ No newline at end of file + +}; diff --git a/src/storage/wal.hpp b/src/storage/wal.hpp index 08cebe0c3..88463244f 100644 --- a/src/storage/wal.hpp +++ b/src/storage/wal.hpp @@ -35,7 +35,7 @@ class WriteAheadLog { if(!log_file_) { std::string err_string; err_string = "Failed to open WAL file: " + log_path_ - + " errno: " + std::to_string(errno) + " errcode: " + std::strerror(errno); + + " errno: " + std::to_string(errno) + " errcode: " + std::strerror(errno); LOG_ERROR(err_string); throw std::runtime_error(err_string); diff --git a/src/utils/msgpack_ndd.hpp b/src/utils/msgpack_ndd.hpp index d0891daa7..9107ac65d 100644 --- a/src/utils/msgpack_ndd.hpp +++ b/src/utils/msgpack_ndd.hpp @@ -54,6 +54,37 @@ namespace ndd { MSGPACK_DEFINE(id, meta, filter, norm, vector, sparse_ids, sparse_values) }; + struct DenseVectorObject{ + float norm; // Vector norm (only for cosine distance) + std::vector vector; // Vector data + + MSGPACK_DEFINE(norm, vector); + }; + + struct SparseVectorObject{ + std::vector sparse_ids; // Sparse vector indices + std::vector sparse_values; // Sparse vector values + + MSGPACK_DEFINE(sparse_ids, sparse_values); + }; + + struct GenericVectorObject { + std::string id; // String identifier + std::vector meta; // Binary metadata (zipped) + std::string filter; // Filter as JSON string + + std::unordered_map dense_vectors; + std::unordered_map sparse_vectors; + + /** + * Ignored by msgpack. Dont include in MSGPACK_DEFINE. + * this is populated later by the IDMapper. + */ + std::pair numeric_id {static_cast(-1), false}; //setting it to default + + // MSGPACK_DEFINE(id, meta, filter, norm, vector, sparse_ids, sparse_values) + }; + // Search result structure struct VectorResult { float similarity; // Similarity from query (1-distance) diff --git a/src/utils/settings.hpp b/src/utils/settings.hpp index c84ad89a1..9512ab51f 100644 --- a/src/utils/settings.hpp +++ b/src/utils/settings.hpp @@ -31,6 +31,7 @@ namespace settings { constexpr size_t BACKFILL_BUFFER = 2; // Keep 2 slots free for high quality neighbors constexpr size_t MAX_EF_CONSTRUCT = 4096; constexpr size_t DEFAULT_EF_SEARCH = 128; + constexpr size_t MAX_SIZE_IN_MILLIONS = 10'000; constexpr size_t MIN_K = 1; constexpr size_t MAX_K = 4096; constexpr size_t RANDOM_SEED = 100; @@ -68,8 +69,11 @@ namespace settings { constexpr uint16_t BLOCK_SPLIT_THRESHOLD = 160; // Bloc will be split if more than this many elements (including tombstones) - // Maximum number of elements in the index - constexpr size_t MAX_VECTORS_ADMIN = 1'000'000'000; + // Maximum number of elements in an index in millions + constexpr size_t MAX_VECTORS_MILLION = 10'000; //1 billion + + //minimum bytes in filesystem before triggering out of storage sequence + constexpr size_t MINIMUM_REQUIRED_FS_BYTES = (1 * GB); // Buffer for early exit in search base layer constexpr int EARLY_EXIT_BUFFER_INSERT = 16;