Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ endif()

if (NOT MSVC)
set(DISKANN_ASYNC_LIB aio)
set(DISKANN_ASYNC_LIB_URING uring)
endif()

#Main compiler/linker settings
Expand Down
8 changes: 4 additions & 4 deletions apps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ add_executable(build_stitched_index build_stitched_index.cpp)
target_link_libraries(build_stitched_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)

add_executable(search_memory_index search_memory_index.cpp)
target_link_libraries(search_memory_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)
target_link_libraries(search_memory_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)

add_executable(build_disk_index build_disk_index.cpp)
target_link_libraries(build_disk_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} Boost::program_options)
target_link_libraries(build_disk_index ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options)

add_executable(search_disk_index search_disk_index.cpp)
target_link_libraries(search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)
target_link_libraries(search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)

add_executable(range_search_disk_index range_search_disk_index.cpp)
target_link_libraries(range_search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)
target_link_libraries(range_search_disk_index ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)

add_executable(test_streaming_scenario test_streaming_scenario.cpp)
target_link_libraries(test_streaming_scenario ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} Boost::program_options)
Expand Down
10 changes: 5 additions & 5 deletions apps/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ target_link_libraries(gen_random_slice ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_
add_executable(simulate_aggregate_recall simulate_aggregate_recall.cpp)

add_executable(calculate_recall calculate_recall.cpp)
target_link_libraries(calculate_recall ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS})
target_link_libraries(calculate_recall ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS})

# Compute ground truth thing outside of DiskANN main source that depends on MKL.
add_executable(compute_groundtruth compute_groundtruth.cpp)
target_include_directories(compute_groundtruth PRIVATE ${DISKANN_MKL_INCLUDE_DIRECTORIES})
target_link_libraries(compute_groundtruth ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} Boost::program_options)
target_link_libraries(compute_groundtruth ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options)

add_executable(compute_groundtruth_for_filters compute_groundtruth_for_filters.cpp)
target_include_directories(compute_groundtruth_for_filters PRIVATE ${DISKANN_MKL_INCLUDE_DIRECTORIES})
target_link_libraries(compute_groundtruth_for_filters ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} Boost::program_options)
target_link_libraries(compute_groundtruth_for_filters ${PROJECT_NAME} ${DISKANN_MKL_LINK_LIBRARIES} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} Boost::program_options)


add_executable(generate_pq generate_pq.cpp)
Expand All @@ -67,10 +67,10 @@ add_executable(partition_with_ram_budget partition_with_ram_budget.cpp)
target_link_libraries(partition_with_ram_budget ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS})

add_executable(merge_shards merge_shards.cpp)
target_link_libraries(merge_shards ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB})
target_link_libraries(merge_shards ${PROJECT_NAME} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING})

add_executable(create_disk_layout create_disk_layout.cpp)
target_link_libraries(create_disk_layout ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS})
target_link_libraries(create_disk_layout ${PROJECT_NAME} ${DISKANN_ASYNC_LIB} ${DISKANN_ASYNC_LIB_URING} ${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS})

add_executable(generate_synthetic_labels generate_synthetic_labels.cpp)
target_link_libraries(generate_synthetic_labels ${PROJECT_NAME} Boost::program_options)
Expand Down
8 changes: 8 additions & 0 deletions include/aligned_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@

#ifndef _WINDOWS
#include <fcntl.h>
#ifndef IOURING
#include <libaio.h>
#else
#include <liburing.h>
#endif
#include <unistd.h>
#ifndef IOURING
typedef io_context_t IOContext;
#else
typedef struct io_uring *IOContext;
#endif
#else
#include <Windows.h>
#include <minwinbase.h>

Expand Down
40 changes: 40 additions & 0 deletions include/linux_aligned_file_reader_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) KIOXIA Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#pragma once
#ifndef _WINDOWS

#include "aligned_file_reader.h"

class LinuxAlignedFileReader : public AlignedFileReader
{
private:
uint64_t file_sz;
FileHandle file_desc;
IOContext bad_ctx = (IOContext)-1;

public:
LinuxAlignedFileReader();
~LinuxAlignedFileReader();

IOContext &get_ctx();

// register thread-id for a context
void register_thread();

// de-register thread-id for a context
void deregister_thread();
void deregister_all_threads();

// Open & close ops
// Blocking calls
void open(const std::string &fname);
void close();

// process batch of aligned requests in parallel
// NOTE :: blocking call
void read(std::vector<AlignedRead> &read_reqs, IOContext &ctx, bool async = false);
};

#endif
1 change: 1 addition & 0 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ target_link_libraries(
${PROJECT_NAME}
${DISKANN_TOOLS_TCMALLOC_LINK_OPTIONS}
${DISKANN_ASYNC_LIB}
${DISKANN_ASYNC_LIB_URING}
)

pybind11_extension(_diskannpy)
Expand Down
8 changes: 7 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ else()
#file(GLOB CPP_SOURCES *.cpp)
set(CPP_SOURCES abstract_data_store.cpp ann_exception.cpp disk_utils.cpp
distance.cpp index.cpp in_mem_graph_store.cpp in_mem_data_store.cpp
linux_aligned_file_reader.cpp math_utils.cpp natural_number_map.cpp
math_utils.cpp natural_number_map.cpp
in_mem_data_store.cpp in_mem_graph_store.cpp
natural_number_set.cpp memory_mapper.cpp partition.cpp pq.cpp
pq_flash_index.cpp scratch.cpp logger.cpp utils.cpp filter_utils.cpp index_factory.cpp abstract_index.cpp pq_l2_distance.cpp pq_data_store.cpp)
if (RESTAPI)
list(APPEND CPP_SOURCES restapi/search_wrapper.cpp restapi/server.cpp)
endif()
if (NOT IOURING)
list(APPEND CPP_SOURCES linux_aligned_file_reader.cpp)
else()
list(APPEND CPP_SOURCES linux_aligned_file_reader_uring.cpp)
add_definitions(-DIOURING)
endif()
add_library(${PROJECT_NAME} ${CPP_SOURCES})
add_library(${PROJECT_NAME}_s STATIC ${CPP_SOURCES})
endif()
Expand Down
218 changes: 218 additions & 0 deletions src/linux_aligned_file_reader_uring.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) KIOXIA Corporation. All rights reserved.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

#include "linux_aligned_file_reader_uring.h"

#include <cassert>
#include <cstdio>
#include <iostream>
#include "tsl/robin_map.h"
#include "utils.h"
#define QUEUE_DEPTH 1024

void execute_io(IOContext ctx, int fd, std::vector<AlignedRead> &read_reqs, uint64_t n_retries = 0)
{
#ifdef DEBUG
for (auto &req : read_reqs)
{
assert(IS_ALIGNED(req.len, 512));
// std::cout << "request:"<<req.offset<<":"<<req.len << std::endl;
assert(IS_ALIGNED(req.offset, 512));
assert(IS_ALIGNED(req.buf, 512));
// assert(malloc_usable_size(req.buf) >= req.len);
}
#endif

// break-up requests into chunks of size QUEUE_DEPTH each
uint64_t n_iters = ROUND_UP(read_reqs.size(), QUEUE_DEPTH) / QUEUE_DEPTH;
for (uint64_t iter = 0; iter < n_iters; iter++)
{
uint64_t n_ops = std::min((uint64_t)read_reqs.size() - (iter * QUEUE_DEPTH), (uint64_t)QUEUE_DEPTH);

// create n_ops io requests
for (uint64_t j = 0; j < n_ops; j++)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ctx);
if (!sqe){
std::cerr << "io_uring_get_sqe() failed; ernno=" << errno;
std::cout << "ctx: " << ctx << "\n";
exit(-1);
}
io_uring_prep_read(sqe, fd, read_reqs[j + iter * QUEUE_DEPTH].buf, read_reqs[j + iter * QUEUE_DEPTH].len,
read_reqs[j + iter * QUEUE_DEPTH].offset);
}

uint64_t n_tries = 0;
while (n_tries <= n_retries)
{
// send io requests here
int64_t ret = io_uring_submit(ctx);
// if requests didn't get accepted
if (ret != (int64_t)n_ops)
{
std::cerr << "io_uring_submit() failed; returned " << ret << ", expected=" << n_ops << ", ernno=" << errno
<< "=" << ::strerror(-ret) << ", try #" << n_tries + 1;
std::cout << "ctx: " << ctx << "\n";
exit(-1);
}
else
{
struct io_uring_cqe *cqes[QUEUE_DEPTH];
unsigned int count = 0;
// wait on io_uring
int64_t ret = io_uring_wait_cqes(ctx, cqes, n_ops, nullptr, nullptr);

// if requests didn't complete
if (ret != 0)
{
std::cerr << "io_uring_waite_cqes() failed; returned " << ret << ", expected=" << n_ops
<< ", ernno=" << errno << "=" << ::strerror(-ret) << ", try #" << n_tries + 1 << std::endl;
exit(-1);
}
else
{
io_uring_cq_advance(ctx, n_ops);
break;
}
}
n_tries++;
}
}
}

LinuxAlignedFileReader::LinuxAlignedFileReader()
{
this->file_desc = -1;
}

LinuxAlignedFileReader::~LinuxAlignedFileReader()
{
int64_t ret;
// check to make sure file_desc is closed
ret = ::fcntl(this->file_desc, F_GETFD);
if (ret == -1)
{
if (errno != EBADF)
{
std::cerr << "close() not called" << std::endl;
// close file desc
ret = ::close(this->file_desc);
// error checks
if (ret == -1)
{
std::cerr << "close() failed; returned " << ret << ", errno=" << errno << ":" << ::strerror(errno)
<< std::endl;
}
}
}
}

IOContext &LinuxAlignedFileReader::get_ctx()
{
std::unique_lock<std::mutex> lk(ctx_mut);
// perform checks only in DEBUG mode
if (ctx_map.find(std::this_thread::get_id()) == ctx_map.end())
{
std::cerr << "bad thread access; returning -1 as io_uring" << std::endl;
return this->bad_ctx;
}
else
{
return ctx_map[std::this_thread::get_id()];
}
}

void LinuxAlignedFileReader::register_thread()
{
auto my_id = std::this_thread::get_id();
std::unique_lock<std::mutex> lk(ctx_mut);
if (ctx_map.find(my_id) != ctx_map.end())
{
std::cerr << "multiple calls to register_thread from the same thread" << std::endl;
return;
}
IOContext ctx = (struct io_uring *)malloc(sizeof(struct io_uring));
int ret = io_uring_queue_init(QUEUE_DEPTH, ctx, 0);
if (ret != 0)
{
lk.unlock();
if (ret == -EAGAIN)
{
std::cerr << "io_setup() failed with EAGAIN" << std::endl;
}
else
{
std::cerr << "io_setup() failed; returned " << ret << ": " << ::strerror(-ret) << std::endl;
}
}
else
{
diskann::cout << "allocating ctx: " << ctx << " to thread-id:" << my_id << std::endl;
ctx_map[my_id] = ctx;
}
lk.unlock();
}

void LinuxAlignedFileReader::deregister_thread()
{
auto my_id = std::this_thread::get_id();
std::unique_lock<std::mutex> lk(ctx_mut);
assert(ctx_map.find(my_id) != ctx_map.end());

lk.unlock();
IOContext ctx = this->get_ctx();
io_uring_queue_exit(ctx);
free(ctx);
// assert(ret == 0);
lk.lock();
ctx_map.erase(my_id);
std::cerr << "returned ctx from thread-id:" << my_id << std::endl;
lk.unlock();
}

void LinuxAlignedFileReader::deregister_all_threads()
{
std::unique_lock<std::mutex> lk(ctx_mut);
for (auto x = ctx_map.begin(); x != ctx_map.end(); x++)
{
IOContext ctx = x.value();
io_uring_queue_exit(ctx);
free(ctx);
// assert(ret == 0);
// lk.lock();
// ctx_map.erase(my_id);
// std::cerr << "returned ctx from thread-id:" << my_id << std::endl;
}
ctx_map.clear();
// lk.unlock();
}

void LinuxAlignedFileReader::open(const std::string &fname)
{
int flags = O_DIRECT | O_RDONLY | O_LARGEFILE;
this->file_desc = ::open(fname.c_str(), flags);
// error checks
assert(this->file_desc != -1);
std::cerr << "Opened file : " << fname << std::endl;
}

void LinuxAlignedFileReader::close()
{
// check to make sure file_desc is closed
::fcntl(this->file_desc, F_GETFD);
// assert(ret != -1);

::close(this->file_desc);
// assert(ret != -1);
}

void LinuxAlignedFileReader::read(std::vector<AlignedRead> &read_reqs, IOContext &ctx, bool async)
{
if (async == true)
{
diskann::cout << "Async currently not supported in linux." << std::endl;
}
assert(this->file_desc != -1);
execute_io(ctx, this->file_desc, read_reqs);
}
Loading