diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d3d2b860..dcc39decb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -261,6 +261,7 @@ endif() if (NOT MSVC) set(DISKANN_ASYNC_LIB aio) + set(DISKANN_ASYNC_LIB_URING uring) endif() #Main compiler/linker settings diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index e42c0b6cb..425fcbffc 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -11,16 +11,16 @@ add_executable(build_stitched_index build_stitched_index.cpp) target_link_libraries(build_stitched_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) add_executable(search_memory_index search_memory_index.cpp) -target_link_libraries(search_memory_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) +target_link_libraries(search_memory_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) add_executable(build_disk_index build_disk_index.cpp) -target_link_libraries(build_disk_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} Boost::program_options) +target_link_libraries(build_disk_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options) add_executable(search_disk_index search_disk_index.cpp) -target_link_libraries(search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) +target_link_libraries(search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) add_executable(range_search_disk_index range_search_disk_index.cpp) -target_link_libraries(range_search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) +target_link_libraries(range_search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) add_executable(test_streaming_scenario test_streaming_scenario.cpp) target_link_libraries(test_streaming_scenario ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options) diff --git a/apps/utils/CMakeLists.txt b/apps/utils/CMakeLists.txt index 3b8cf223c..4a7143dcc 100644 --- a/apps/utils/CMakeLists.txt +++ b/apps/utils/CMakeLists.txt @@ -44,16 +44,16 @@ target_link_libraries(gen_random_slice ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_ add_executable(simulate_aggregate_recall simulate_aggregate_recall.cpp) add_executable(calculate_recall calculate_recall.cpp) -target_link_libraries(calculate_recall ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}) +target_link_libraries(calculate_recall ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}) # Compute ground truth thing outside of DiskANN main source that depends on MKL. add_executable(compute_groundtruth compute_groundtruth.cpp) target_include_directories(compute_groundtruth PRIVATE ${DISKANN_MKL_INCLUDE_DIRECTORIES}) -target_link_libraries(compute_groundtruth ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} Boost::program_options) +target_link_libraries(compute_groundtruth ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options) add_executable(compute_groundtruth_for_filters compute_groundtruth_for_filters.cpp) target_include_directories(compute_groundtruth_for_filters PRIVATE ${DISKANN_MKL_INCLUDE_DIRECTORIES}) -target_link_libraries(compute_groundtruth_for_filters ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} Boost::program_options) +target_link_libraries(compute_groundtruth_for_filters ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options) add_executable(generate_pq generate_pq.cpp) @@ -67,10 +67,10 @@ add_executable(partition_with_ram_budget partition_with_ram_budget.cpp) target_link_libraries(partition_with_ram_budget ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}) add_executable(merge_shards merge_shards.cpp) -target_link_libraries(merge_shards ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB}) +target_link_libraries(merge_shards ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING}) add_executable(create_disk_layout create_disk_layout.cpp) -target_link_libraries(create_disk_layout ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}) +target_link_libraries(create_disk_layout ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}) add_executable(generate_synthetic_labels generate_synthetic_labels.cpp) target_link_libraries(generate_synthetic_labels ${PROJECT_NAME} Boost::program_options) diff --git a/include/aligned_file_reader.h b/include/aligned_file_reader.h index f39d5da39..ee801a3f8 100644 --- a/include/aligned_file_reader.h +++ b/include/aligned_file_reader.h @@ -10,10 +10,18 @@ #ifndef _WINDOWS #include +#ifndef IOURING #include +#else +#include +#endif #include +#ifndef IOURING typedef io_context_t IOContext; #else +typedef struct io_uring *IOContext; +#endif +#else #include #include diff --git a/include/linux_aligned_file_reader_uring.h b/include/linux_aligned_file_reader_uring.h new file mode 100644 index 000000000..bae0eafbb --- /dev/null +++ b/include/linux_aligned_file_reader_uring.h @@ -0,0 +1,40 @@ +// Copyright (c) KIOXIA Corporation. All rights reserved. +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#pragma once +#ifndef _WINDOWS + +#include "aligned_file_reader.h" + +class LinuxAlignedFileReader : public AlignedFileReader +{ + private: + uint64_t file_sz; + FileHandle file_desc; + IOContext bad_ctx = (IOContext)-1; + + public: + LinuxAlignedFileReader(); + ~LinuxAlignedFileReader(); + + IOContext &get_ctx(); + + // register thread-id for a context + void register_thread(); + + // de-register thread-id for a context + void deregister_thread(); + void deregister_all_threads(); + + // Open & close ops + // Blocking calls + void open(const std::string &fname); + void close(); + + // process batch of aligned requests in parallel + // NOTE :: blocking call + void read(std::vector &read_reqs, IOContext &ctx, bool async = false); +}; + +#endif diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index d4faebf9b..7a8ca307f 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -49,6 +49,7 @@ target_link_libraries( ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} + ${DISKANN_ASYNC_LIB_URING} ) pybind11_extension(_diskannpy) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cbca26440..667f9f2c4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,13 +10,19 @@ else() #file(GLOB CPP_SOURCES *.cpp) set(CPP_SOURCES abstract_data_store.cpp ann_exception.cpp disk_utils.cpp distance.cpp index.cpp in_mem_graph_store.cpp in_mem_data_store.cpp - linux_aligned_file_reader.cpp math_utils.cpp natural_number_map.cpp + math_utils.cpp natural_number_map.cpp in_mem_data_store.cpp in_mem_graph_store.cpp natural_number_set.cpp memory_mapper.cpp partition.cpp pq.cpp pq_flash_index.cpp scratch.cpp logger.cpp utils.cpp filter_utils.cpp index_factory.cpp abstract_index.cpp pq_l2_distance.cpp pq_data_store.cpp) if (RESTAPI) list(APPEND CPP_SOURCES restapi/search_wrapper.cpp restapi/server.cpp) endif() + if (NOT IOURING) + list(APPEND CPP_SOURCES linux_aligned_file_reader.cpp) + else() + list(APPEND CPP_SOURCES linux_aligned_file_reader_uring.cpp) + add_definitions(-DIOURING) + endif() add_library(${PROJECT_NAME} ${CPP_SOURCES}) add_library(${PROJECT_NAME}_s STATIC ${CPP_SOURCES}) endif() diff --git a/src/linux_aligned_file_reader_uring.cpp b/src/linux_aligned_file_reader_uring.cpp new file mode 100644 index 000000000..a68fe1c8e --- /dev/null +++ b/src/linux_aligned_file_reader_uring.cpp @@ -0,0 +1,218 @@ +// Copyright (c) KIOXIA Corporation. All rights reserved. +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +#include "linux_aligned_file_reader_uring.h" + +#include +#include +#include +#include "tsl/robin_map.h" +#include "utils.h" +#define QUEUE_DEPTH 1024 + +void execute_io(IOContext ctx, int fd, std::vector &read_reqs, uint64_t n_retries = 0) +{ +#ifdef DEBUG + for (auto &req : read_reqs) + { + assert(IS_ALIGNED(req.len, 512)); + // std::cout << "request:"<= req.len); + } +#endif + + // break-up requests into chunks of size QUEUE_DEPTH each + uint64_t n_iters = ROUND_UP(read_reqs.size(), QUEUE_DEPTH) / QUEUE_DEPTH; + for (uint64_t iter = 0; iter < n_iters; iter++) + { + uint64_t n_ops = std::min((uint64_t)read_reqs.size() - (iter * QUEUE_DEPTH), (uint64_t)QUEUE_DEPTH); + + // create n_ops io requests + for (uint64_t j = 0; j < n_ops; j++) + { + struct io_uring_sqe *sqe = io_uring_get_sqe(ctx); + if (!sqe){ + std::cerr << "io_uring_get_sqe() failed; ernno=" << errno; + std::cout << "ctx: " << ctx << "\n"; + exit(-1); + } + io_uring_prep_read(sqe, fd, read_reqs[j + iter * QUEUE_DEPTH].buf, read_reqs[j + iter * QUEUE_DEPTH].len, + read_reqs[j + iter * QUEUE_DEPTH].offset); + } + + uint64_t n_tries = 0; + while (n_tries <= n_retries) + { + // send io requests here + int64_t ret = io_uring_submit(ctx); + // if requests didn't get accepted + if (ret != (int64_t)n_ops) + { + std::cerr << "io_uring_submit() failed; returned " << ret << ", expected=" << n_ops << ", ernno=" << errno + << "=" << ::strerror(-ret) << ", try #" << n_tries + 1; + std::cout << "ctx: " << ctx << "\n"; + exit(-1); + } + else + { + struct io_uring_cqe *cqes[QUEUE_DEPTH]; + unsigned int count = 0; + // wait on io_uring + int64_t ret = io_uring_wait_cqes(ctx, cqes, n_ops, nullptr, nullptr); + + // if requests didn't complete + if (ret != 0) + { + std::cerr << "io_uring_waite_cqes() failed; returned " << ret << ", expected=" << n_ops + << ", ernno=" << errno << "=" << ::strerror(-ret) << ", try #" << n_tries + 1 << std::endl; + exit(-1); + } + else + { + io_uring_cq_advance(ctx, n_ops); + break; + } + } + n_tries++; + } + } +} + +LinuxAlignedFileReader::LinuxAlignedFileReader() +{ + this->file_desc = -1; +} + +LinuxAlignedFileReader::~LinuxAlignedFileReader() +{ + int64_t ret; + // check to make sure file_desc is closed + ret = ::fcntl(this->file_desc, F_GETFD); + if (ret == -1) + { + if (errno != EBADF) + { + std::cerr << "close() not called" << std::endl; + // close file desc + ret = ::close(this->file_desc); + // error checks + if (ret == -1) + { + std::cerr << "close() failed; returned " << ret << ", errno=" << errno << ":" << ::strerror(errno) + << std::endl; + } + } + } +} + +IOContext &LinuxAlignedFileReader::get_ctx() +{ + std::unique_lock lk(ctx_mut); + // perform checks only in DEBUG mode + if (ctx_map.find(std::this_thread::get_id()) == ctx_map.end()) + { + std::cerr << "bad thread access; returning -1 as io_uring" << std::endl; + return this->bad_ctx; + } + else + { + return ctx_map[std::this_thread::get_id()]; + } +} + +void LinuxAlignedFileReader::register_thread() +{ + auto my_id = std::this_thread::get_id(); + std::unique_lock lk(ctx_mut); + if (ctx_map.find(my_id) != ctx_map.end()) + { + std::cerr << "multiple calls to register_thread from the same thread" << std::endl; + return; + } + IOContext ctx = (struct io_uring *)malloc(sizeof(struct io_uring)); + int ret = io_uring_queue_init(QUEUE_DEPTH, ctx, 0); + if (ret != 0) + { + lk.unlock(); + if (ret == -EAGAIN) + { + std::cerr << "io_setup() failed with EAGAIN" << std::endl; + } + else + { + std::cerr << "io_setup() failed; returned " << ret << ": " << ::strerror(-ret) << std::endl; + } + } + else + { + diskann::cout << "allocating ctx: " << ctx << " to thread-id:" << my_id << std::endl; + ctx_map[my_id] = ctx; + } + lk.unlock(); +} + +void LinuxAlignedFileReader::deregister_thread() +{ + auto my_id = std::this_thread::get_id(); + std::unique_lock lk(ctx_mut); + assert(ctx_map.find(my_id) != ctx_map.end()); + + lk.unlock(); + IOContext ctx = this->get_ctx(); + io_uring_queue_exit(ctx); + free(ctx); + // assert(ret == 0); + lk.lock(); + ctx_map.erase(my_id); + std::cerr << "returned ctx from thread-id:" << my_id << std::endl; + lk.unlock(); +} + +void LinuxAlignedFileReader::deregister_all_threads() +{ + std::unique_lock lk(ctx_mut); + for (auto x = ctx_map.begin(); x != ctx_map.end(); x++) + { + IOContext ctx = x.value(); + io_uring_queue_exit(ctx); + free(ctx); + // assert(ret == 0); + // lk.lock(); + // ctx_map.erase(my_id); + // std::cerr << "returned ctx from thread-id:" << my_id << std::endl; + } + ctx_map.clear(); + // lk.unlock(); +} + +void LinuxAlignedFileReader::open(const std::string &fname) +{ + int flags = O_DIRECT | O_RDONLY | O_LARGEFILE; + this->file_desc = ::open(fname.c_str(), flags); + // error checks + assert(this->file_desc != -1); + std::cerr << "Opened file : " << fname << std::endl; +} + +void LinuxAlignedFileReader::close() +{ + // check to make sure file_desc is closed + ::fcntl(this->file_desc, F_GETFD); + // assert(ret != -1); + + ::close(this->file_desc); + // assert(ret != -1); +} + +void LinuxAlignedFileReader::read(std::vector &read_reqs, IOContext &ctx, bool async) +{ + if (async == true) + { + diskann::cout << "Async currently not supported in linux." << std::endl; + } + assert(this->file_desc != -1); + execute_io(ctx, this->file_desc, read_reqs); +} diff --git a/src/partition.cpp b/src/partition.cpp index d0061708a..5cb54d7c3 100644 --- a/src/partition.cpp +++ b/src/partition.cpp @@ -26,7 +26,7 @@ #endif // block size for reading/ processing large files and matrices in blocks -#define BLOCK_SIZE 5000000 +#define POINT_BLOCK_SIZE 5000000 // #define SAVE_INFLATED_PQ true @@ -192,7 +192,7 @@ int estimate_cluster_sizes(float *test_data_float, size_t num_test, float *pivot shard_counts[i] = 0; } - size_t block_size = num_test <= BLOCK_SIZE ? num_test : BLOCK_SIZE; + size_t block_size = num_test <= POINT_BLOCK_SIZE ? num_test : POINT_BLOCK_SIZE; uint32_t *block_closest_centers = new uint32_t[block_size * k_base]; float *block_data_float; @@ -270,7 +270,7 @@ int shard_data_into_clusters(const std::string data_file, float *pivots, const s shard_counts[i] = 0; } - size_t block_size = num_points <= BLOCK_SIZE ? num_points : BLOCK_SIZE; + size_t block_size = num_points <= POINT_BLOCK_SIZE ? num_points : POINT_BLOCK_SIZE; std::unique_ptr block_closest_centers = std::make_unique(block_size * k_base); std::unique_ptr block_data_T = std::make_unique(block_size * dim); std::unique_ptr block_data_float = std::make_unique(block_size * dim); @@ -358,7 +358,7 @@ int shard_data_into_clusters_only_ids(const std::string data_file, float *pivots shard_counts[i] = 0; } - size_t block_size = num_points <= BLOCK_SIZE ? num_points : BLOCK_SIZE; + size_t block_size = num_points <= POINT_BLOCK_SIZE ? num_points : POINT_BLOCK_SIZE; std::unique_ptr block_closest_centers = std::make_unique(block_size * k_base); std::unique_ptr block_data_T = std::make_unique(block_size * dim); std::unique_ptr block_data_float = std::make_unique(block_size * dim); @@ -434,7 +434,7 @@ int retrieve_shard_data_from_ids(const std::string data_file, std::string idmap_ uint32_t num_written = 0; std::cout << "Shard has " << shard_size << " points" << std::endl; - size_t block_size = num_points <= BLOCK_SIZE ? num_points : BLOCK_SIZE; + size_t block_size = num_points <= POINT_BLOCK_SIZE ? num_points : POINT_BLOCK_SIZE; std::unique_ptr block_data_T = std::make_unique(block_size * dim); size_t num_blocks = DIV_ROUND_UP(num_points, block_size); diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index d9ad50617..9f0f14263 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -12,7 +12,11 @@ #ifdef _WINDOWS #include "windows_aligned_file_reader.h" #else +#ifndef IOURING #include "linux_aligned_file_reader.h" +#else +#include "linux_aligned_file_reader_uring.h" +#endif #endif #define READ_U64(stream, val) stream.read((char *)&val, sizeof(uint64_t)) @@ -218,12 +222,12 @@ template void PQFlashIndex::load_cache_ diskann::alloc_aligned((void **)&_coord_cache_buf, coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); memset(_coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); - size_t BLOCK_SIZE = 8; - size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, BLOCK_SIZE); + size_t NODE_BLOCK_SIZE = 8; + size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, NODE_BLOCK_SIZE); for (size_t block = 0; block < num_blocks; block++) { - size_t start_idx = block * BLOCK_SIZE; - size_t end_idx = (std::min)(num_cached_nodes, (block + 1) * BLOCK_SIZE); + size_t start_idx = block * NODE_BLOCK_SIZE; + size_t end_idx = (std::min)(num_cached_nodes, (block + 1) * NODE_BLOCK_SIZE); // Copy offset into buffers to read into std::vector nodes_to_read; @@ -418,13 +422,13 @@ void PQFlashIndex::cache_bfs_levels(uint64_t num_nodes_to_cache, std: diskann::cout << "Level: " << lvl << std::flush; bool finish_flag = false; - uint64_t BLOCK_SIZE = 1024; - uint64_t nblocks = DIV_ROUND_UP(nodes_to_expand.size(), BLOCK_SIZE); + uint64_t NODE_BLOCK_SIZE = 1024; + uint64_t nblocks = DIV_ROUND_UP(nodes_to_expand.size(), NODE_BLOCK_SIZE); for (size_t block = 0; block < nblocks && !finish_flag; block++) { diskann::cout << "." << std::flush; - size_t start = block * BLOCK_SIZE; - size_t end = (std::min)((block + 1) * BLOCK_SIZE, nodes_to_expand.size()); + size_t start = block * NODE_BLOCK_SIZE; + size_t end = (std::min)((block + 1) * NODE_BLOCK_SIZE, nodes_to_expand.size()); std::vector nodes_to_read; std::vector coord_buffers(end - start, nullptr);