diff --git a/include/neug/storages/csr/csr_base.h b/include/neug/storages/csr/csr_base.h index 3589527c..45077575 100644 --- a/include/neug/storages/csr/csr_base.h +++ b/include/neug/storages/csr/csr_base.h @@ -34,6 +34,8 @@ enum class CsrType { class CsrBase { public: + static constexpr size_t INFINITE_CAPACITY = + std::numeric_limits::max(); CsrBase() = default; virtual ~CsrBase() = default; @@ -66,6 +68,8 @@ class CsrBase { virtual void resize(vid_t vnum) = 0; + virtual size_t capacity() const = 0; + virtual void close() = 0; virtual void batch_sort_by_edge_data(timestamp_t ts) { diff --git a/include/neug/storages/csr/immutable_csr.h b/include/neug/storages/csr/immutable_csr.h index 329508ae..13ab7ecf 100644 --- a/include/neug/storages/csr/immutable_csr.h +++ b/include/neug/storages/csr/immutable_csr.h @@ -83,6 +83,8 @@ class ImmutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -176,6 +178,8 @@ class SingleImmutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; diff --git a/include/neug/storages/csr/mutable_csr.h b/include/neug/storages/csr/mutable_csr.h index fc246cb1..acbfd272 100644 --- a/include/neug/storages/csr/mutable_csr.h +++ b/include/neug/storages/csr/mutable_csr.h @@ -104,6 +104,8 @@ class MutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -252,6 +254,8 @@ class SingleMutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -336,6 +340,8 @@ class EmptyCsr : public TypedCsrBase { void resize(vid_t vnum) override {} + size_t capacity() const override { return 0; } + void close() override {} void batch_sort_by_edge_data(timestamp_t ts) override {} diff --git a/include/neug/storages/file_names.h b/include/neug/storages/file_names.h index 6cfe6613..d7048866 100644 --- a/include/neug/storages/file_names.h +++ b/include/neug/storages/file_names.h @@ -246,4 +246,14 @@ inline std::string wal_ingest_allocator_prefix(const std::string& work_dir, std::to_string(thread_id) + "_"; } +inline std::string statistics_file_prefix(const std::string& v_label) { + return "statistics_" + v_label; +} + +inline std::string statistics_file_prefix(const std::string& src_label, + const std::string& dst_label, + const std::string& edge_label) { + return "statistics_" + src_label + "_" + edge_label + "_" + dst_label; +} + } // namespace neug diff --git a/include/neug/storages/graph/edge_table.h b/include/neug/storages/graph/edge_table.h index a5a68cab..6ab753c7 100644 --- a/include/neug/storages/graph/edge_table.h +++ b/include/neug/storages/graph/edge_table.h @@ -72,6 +72,8 @@ class EdgeTable { void Resize(vid_t src_vertex_num, vid_t dst_vertex_num); + void EnsureCapacity(size_t capacity); + size_t EdgeNum() const; size_t PropertyNum() const; @@ -128,6 +130,34 @@ class EdgeTable { void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts); + inline size_t Size() const { + if (meta_->is_bundled()) { + if (out_csr_) { + return out_csr_->edge_num(); + } else if (in_csr_) { + return in_csr_->edge_num(); + } else { + THROW_RUNTIME_ERROR("both csr are null"); + } + } + // TODO(zhanglei): the size may be inaccurate if some edges are deleted but + // not compacted yet. + return table_idx_.load(); + } + + inline size_t Capacity() const { + if (meta_->is_bundled()) { + if (out_csr_) { + return out_csr_->capacity(); + } else if (in_csr_) { + return in_csr_->capacity(); + } else { + THROW_RUNTIME_ERROR("both csr are null"); + } + } + return capacity_.load(); + } + private: void dropAndCreateNewBundledCSR(); void dropAndCreateNewUnbundledCSR(bool delete_property); @@ -141,6 +171,7 @@ class EdgeTable { std::unique_ptr in_csr_; std::unique_ptr table_; std::atomic table_idx_{0}; + std::atomic capacity_{0}; friend class PropertyGraph; }; diff --git a/include/neug/storages/graph/property_graph.h b/include/neug/storages/graph/property_graph.h index a8c4c5b7..518bea5e 100644 --- a/include/neug/storages/graph/property_graph.h +++ b/include/neug/storages/graph/property_graph.h @@ -137,6 +137,10 @@ class PropertyGraph { void Compact(bool compact_csr, float reserve_ratio, timestamp_t ts); + /** + * @brief Dump the current graph state to persistent storage. + * @param reopen If true, reopens the graph after dumping (default: true) + */ void Dump(bool reopen = true); /** @@ -298,6 +302,11 @@ class PropertyGraph { Status Reserve(label_t v_label, vid_t vertex_reserve_size); + Status EnsureCapacity(label_t v_label, size_t capacity = 0); + + Status EnsureCapacity(label_t src_label, label_t dst_label, + label_t edge_label, size_t capacity = 0); + Status BatchAddVertices(label_t v_label_id, std::shared_ptr supplier); diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index 6af52bb2..391a69f4 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -18,6 +18,7 @@ #include "neug/storages/graph/vertex_timestamp.h" #include "neug/storages/loader/loader_utils.h" #include "neug/utils/arrow_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/indexers.h" #include "neug/utils/property/table.h" @@ -111,8 +112,7 @@ class VertexTable { std::swap(work_dir_, other.work_dir_); } - void Open(const std::string& work_dir, int memory_level, - bool build_empty_graph = false); + void Open(const std::string& work_dir, int memory_level); void Dump(const std::string& target_dir); @@ -122,6 +122,8 @@ class VertexTable { void Reserve(size_t cap); + size_t EnsureCapacity(size_t capacity); + bool is_dropped() const { return table_ == nullptr; } bool get_index(const Property& oid, vid_t& lid, @@ -144,6 +146,8 @@ class VertexTable { // Capacity of the vertex table inline size_t Capacity() const { return indexer_.capacity(); } + inline size_t Size() const { return indexer_.size(); } + bool IsValidLid(vid_t lid, timestamp_t ts = MAX_TIMESTAMP) const; IndexerType& get_indexer() { return indexer_; } @@ -289,11 +293,10 @@ class VertexTable { auto ind = std::get<2>(vertex_schema_->primary_keys[0]); auto pk_array = columns[ind]; columns.erase(columns.begin() + ind); - auto cur_size = indexer_.capacity(); - while (cur_size < indexer_.size() + pk_array->length()) { - cur_size = std::max(16, 2 * static_cast(cur_size)); + size_t new_size = indexer_.size() + pk_array->length(); + while (new_size >= Capacity()) { + EnsureCapacity(calculate_new_capacity(new_size, true)); } - Reserve(cur_size); auto vids = insert_primary_keys(pk_array); diff --git a/include/neug/utils/file_utils.h b/include/neug/utils/file_utils.h index 2815a4e7..c12ad779 100644 --- a/include/neug/utils/file_utils.h +++ b/include/neug/utils/file_utils.h @@ -38,4 +38,16 @@ void copy_directory(const std::string& src, const std::string& dst, void remove_directory(const std::string& dir_path); +void write_file(const std::string& filename, const void* buffer, size_t size, + size_t num); + +void read_file(const std::string& filename, void* buffer, size_t size, + size_t num); + +void write_statistic_file(const std::string& file_path, size_t capacity, + size_t size); + +void read_statistic_file(const std::string& file_path, size_t& capacity, + size_t& size); + } // namespace neug diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h new file mode 100644 index 00000000..4064c6b1 --- /dev/null +++ b/include/neug/utils/growth.h @@ -0,0 +1,39 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +namespace neug { +inline size_t calculate_new_capacity(size_t current_size, + bool is_vertex_table) { + if (current_size < 4096) { + return 4096; // Start with a reasonable default capacity. + } + static constexpr size_t MAX_CAPACITY = std::numeric_limits::max(); + if (is_vertex_table) { + return current_size <= MAX_CAPACITY - current_size / 4 + ? current_size + current_size / 4 + : MAX_CAPACITY; + } else { + return current_size <= MAX_CAPACITY - (current_size + 4) / 5 + ? current_size + (current_size + 4) / 5 + : MAX_CAPACITY; + } +} + +} // namespace neug diff --git a/include/neug/utils/id_indexer.h b/include/neug/utils/id_indexer.h index 8f9b5a5f..028f0cbe 100644 --- a/include/neug/utils/id_indexer.h +++ b/include/neug/utils/id_indexer.h @@ -443,7 +443,6 @@ class LFIndexer { keys_->open(name + ".keys", "", data_dir); indices_.open(data_dir + "/" + name + ".indices", false); size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); indices_size_ = indices_.size(); } @@ -462,9 +461,6 @@ class LFIndexer { LOG(INFO) << "Open indices file in " << tmp_dir(work_dir) + "/" + name + ".indices"; indices_.open(tmp_dir(work_dir) + "/" + name + ".indices", true); - size_t num_elements = num_elements_.load(); - - keys_->resize(num_elements + (num_elements >> 2)); indices_size_ = indices_.size(); } @@ -478,8 +474,6 @@ class LFIndexer { keys_->open_in_memory(name + ".keys"); indices_.open(name + ".indices", false); indices_size_ = indices_.size(); - size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); } void open_with_hugepages(const std::string& name, bool hugepage_table) { @@ -495,12 +489,9 @@ class LFIndexer { indices_.open(name + ".indices", false); } indices_size_ = indices_.size(); - size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); } void dump(const std::string& name, const std::string& snapshot_dir) { - keys_->resize(num_elements_.load()); keys_->dump(snapshot_dir + "/" + name + ".keys"); indices_.dump(snapshot_dir + "/" + name + ".indices"); dump_meta(snapshot_dir + "/" + name + ".meta"); diff --git a/include/neug/utils/mmap_array.h b/include/neug/utils/mmap_array.h index c1d799a3..db28b456 100644 --- a/include/neug/utils/mmap_array.h +++ b/include/neug/utils/mmap_array.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -523,10 +524,29 @@ class mmap_array { } void resize(size_t size, size_t data_size) { + LOG(INFO) << "Resizing mmap_array " << this << " to size: " << size + << ", data_size: " << data_size; items_.resize(size); data_.resize(data_size); } + size_t avg_size() const { + if (items_.size() == 0) { + return 0; + } + size_t total_length = 0; + size_t non_zero_count = 0; + for (size_t i = 0; i < items_.size(); ++i) { + if (items_.get(i).length > 0) { + ++non_zero_count; + total_length += items_.get(i).length; + } + } + return non_zero_count > 0 + ? (total_length + non_zero_count - 1) / non_zero_count + : 0; + } + void set(size_t idx, size_t offset, const std::string_view& val) { items_.set(idx, {offset, static_cast(val.size())}); assert(data_.data() + offset + val.size() <= data_.data() + data_.size()); @@ -564,7 +584,9 @@ class mmap_array { // Compact the data buffer by removing unused space and updating offsets // This is an in-place operation that shifts valid string data forward - // Returns the compacted data size + // Returns the compacted data size. Note that the reserved size of data buffer + // is not changed, and new strings can still be appended after the compacted + // data. size_t compact() { auto plan = prepare_compaction_plan(); if (items_.size() == 0) { @@ -577,9 +599,11 @@ class mmap_array { std::vector temp_buf(plan.total_size); size_t write_offset = 0; + size_t limit_offset = 0; for (const auto& entry : plan.entries) { const char* src = data_.data() + entry.offset; char* dst = temp_buf.data() + write_offset; + limit_offset = std::max(limit_offset, entry.offset + entry.length); memcpy(dst, src, entry.length); items_.set(entry.index, {static_cast(write_offset), entry.length}); @@ -588,9 +612,8 @@ class mmap_array { assert(write_offset == plan.total_size); memcpy(data_.data(), temp_buf.data(), plan.total_size); - data_.resize(plan.total_size); VLOG(1) << "Compaction completed. New data size: " << plan.total_size - << ", old data size: " << size_before_compact; + << ", old data size: " << limit_offset; return plan.total_size; } @@ -654,6 +677,21 @@ class mmap_array { LOG(ERROR) << ss.str(); THROW_RUNTIME_ERROR(ss.str()); } + int fd = fileno(fout); + if (fd == -1) { + std::stringstream ss; + ss << "Failed to get file descriptor for [ " << data_filename << " ], " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + if (ftruncate(fd, size_before_compact) != 0) { + std::stringstream ss; + ss << "Failed to ftruncate file [ " << data_filename << " ], " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } if (fclose(fout) != 0) { std::stringstream ss; ss << "Failed to fclose file [ " << data_filename << " ], " diff --git a/include/neug/utils/property/column.h b/include/neug/utils/property/column.h index a48314f0..0b85bb16 100644 --- a/include/neug/utils/property/column.h +++ b/include/neug/utils/property/column.h @@ -30,6 +30,7 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" +#include "neug/utils/file_utils.h" #include "neug/utils/likely.h" #include "neug/utils/mmap_array.h" #include "neug/utils/property/property.h" @@ -301,7 +302,7 @@ class TypedColumn : public ColumnBase { if (std::filesystem::exists(basic_path + ".items")) { buffer_.open(basic_path, false, false); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(basic_path + ".pos"); } else { if (work_dir == "") { size_ = 0; @@ -309,7 +310,7 @@ class TypedColumn : public ColumnBase { } else { buffer_.open(work_dir + "/" + name, true); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(work_dir + "/" + name + ".pos"); } } } @@ -317,14 +318,14 @@ class TypedColumn : public ColumnBase { void open_in_memory(const std::string& prefix) override { buffer_.open(prefix, false); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(prefix + ".pos"); } void open_with_hugepages(const std::string& prefix, bool force) override { if (strategy_ == StorageStrategy::kMem || force) { buffer_.open_with_hugepages(prefix); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(prefix + ".pos"); } else if (strategy_ == StorageStrategy::kDisk) { LOG(INFO) << "Open " << prefix << " with normal mmap pages"; @@ -342,16 +343,17 @@ class TypedColumn : public ColumnBase { } copy_file(cur_path + ".data", tmp_path + ".data"); copy_file(cur_path + ".items", tmp_path + ".items"); + copy_file(cur_path + ".pos", tmp_path + ".pos"); buffer_.reset(); tmp.open(tmp_path, true); buffer_.swap(tmp); tmp.reset(); - pos_.store(buffer_.data_size()); + init_pos(tmp_path + ".pos"); } void dump(const std::string& filename) override { - buffer_.resize(size_, pos_.load()); + write_file(filename + ".pos", &pos_, sizeof(pos_), 1); buffer_.dump(filename); } @@ -362,7 +364,7 @@ class TypedColumn : public ColumnBase { size_ = size; if (buffer_.size() != 0) { size_t avg_width = - (buffer_.data_size() + buffer_.size() - 1) / buffer_.size(); + buffer_.avg_size(); // calculate average width of existing strings buffer_.resize( size_, std::max(size_ * (avg_width > 0 ? avg_width : STRING_DEFAULT_MAX_LENGTH), @@ -431,6 +433,13 @@ class TypedColumn : public ColumnBase { } private: + inline void init_pos(const std::string& file_path) { + if (std::filesystem::exists(file_path)) { + read_file(file_path, &pos_, sizeof(pos_), 1); + } else { + pos_.store(0); + } + } mmap_array buffer_; size_t size_; std::atomic pos_; diff --git a/include/neug/utils/property/table.h b/include/neug/utils/property/table.h index 3d7d1e74..ed5fa119 100644 --- a/include/neug/utils/property/table.h +++ b/include/neug/utils/property/table.h @@ -94,6 +94,13 @@ class Table { void delete_column(const std::string& col_name); size_t col_num() const; + inline size_t size() const { + if (columns_.empty()) { + return 0; + } else { + return columns_[0]->size(); + } + } std::vector>& columns(); std::vector& column_ptrs(); diff --git a/src/main/neug_db.cc b/src/main/neug_db.cc index 3d71389a..79c89c39 100644 --- a/src/main/neug_db.cc +++ b/src/main/neug_db.cc @@ -289,6 +289,7 @@ void NeugDB::initPlannerAndQueryProcessor() { } planner_->update_meta(schema().to_yaml().value()); planner_->update_statistics(graph().get_statistics_json()); + LOG(INFO) << "Finish initializing planner with schema and statistics"; global_query_cache_ = std::make_shared(planner_); diff --git a/src/storages/csr/immutable_csr.cc b/src/storages/csr/immutable_csr.cc index 5a6cf1f1..86fdd01a 100644 --- a/src/storages/csr/immutable_csr.cc +++ b/src/storages/csr/immutable_csr.cc @@ -189,6 +189,12 @@ void ImmutableCsr::resize(vid_t vnum) { } } +template +size_t ImmutableCsr::capacity() const { + // We assume the capacity of each csr is INFINITE. + return CsrBase::INFINITE_CAPACITY; +} + template void ImmutableCsr::close() { adj_lists_.reset(); @@ -463,6 +469,11 @@ void SingleImmutableCsr::resize(vid_t vnum) { } } +template +size_t SingleImmutableCsr::capacity() const { + return nbr_list_.size(); +} + template void SingleImmutableCsr::close() { nbr_list_.reset(); diff --git a/src/storages/csr/mutable_csr.cc b/src/storages/csr/mutable_csr.cc index b20a0adc..366cb49e 100644 --- a/src/storages/csr/mutable_csr.cc +++ b/src/storages/csr/mutable_csr.cc @@ -28,64 +28,12 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" +#include "neug/utils/file_utils.h" #include "neug/utils/property/types.h" #include "neug/utils/spinlock.h" namespace neug { -void read_file(const std::string& filename, void* buffer, size_t size, - size_t num) { - FILE* fin = fopen(filename.c_str(), "r"); - if (fin == nullptr) { - std::stringstream ss; - ss << "Failed to open file " << filename << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - size_t ret_len = 0; - if ((ret_len = fread(buffer, size, num, fin)) != num) { - std::stringstream ss; - ss << "Failed to read file " << filename << ", expected " << num << ", got " - << ret_len << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - int ret = 0; - if ((ret = fclose(fin)) != 0) { - std::stringstream ss; - ss << "Failed to close file " << filename << ", error code: " << ret << " " - << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } -} - -void write_file(const std::string& filename, const void* buffer, size_t size, - size_t num) { - FILE* fout = fopen(filename.c_str(), "wb"); - if (fout == nullptr) { - std::stringstream ss; - ss << "Failed to open file " << filename << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - size_t ret_len = 0; - if ((ret_len = fwrite(buffer, size, num, fout)) != num) { - std::stringstream ss; - ss << "Failed to write file " << filename << ", expected " << num - << ", got " << ret_len << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - } - int ret = 0; - if ((ret = fclose(fout)) != 0) { - std::stringstream ss; - ss << "Failed to close file " << filename << ", error code: " << ret << " " - << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } -} - template void MutableCsr::open(const std::string& name, const std::string& snapshot_dir, @@ -343,6 +291,12 @@ void MutableCsr::resize(vid_t vnum) { } } +template +size_t MutableCsr::capacity() const { + // We assume the capacity of each csr is INFINITE. + return CsrBase::INFINITE_CAPACITY; +} + template void MutableCsr::close() { if (locks_ != nullptr) { @@ -645,6 +599,11 @@ void SingleMutableCsr::resize(vid_t vnum) { } } +template +size_t SingleMutableCsr::capacity() const { + return nbr_list_.size(); +} + template void SingleMutableCsr::close() { nbr_list_.reset(); diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index be1b41b1..998d8586 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -33,6 +33,8 @@ #include "neug/storages/file_names.h" #include "neug/storages/loader/loader_utils.h" #include "neug/utils/arrow_utils.h" +#include "neug/utils/file_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/property/types.h" namespace neug { @@ -371,13 +373,13 @@ void batch_add_unbundled_edges_impl( const std::vector& src_lid_list, const std::vector& dst_lid_list, TypedCsrBase* out_csr, TypedCsrBase* in_csr, Table* table_, - std::atomic& table_idx_, const std::vector& prop_types, + std::atomic& table_idx_, std::atomic& capacity_, + const std::vector& prop_types, const std::vector>& data_batches, const std::vector& valid_flags) { size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(out_csr, in_csr, src_lid_list, dst_lid_list, offset); - table_->resize(table_idx_.load()); std::vector> expected_types; for (auto pt : prop_types) { expected_types.emplace_back(PropertyTypeToArrowType(pt)); @@ -454,6 +456,7 @@ EdgeTable::EdgeTable(EdgeTable&& edge_table) in_csr_ = std::move(edge_table.in_csr_); table_ = std::move(edge_table.table_); table_idx_ = edge_table.table_idx_.load(); + capacity_ = edge_table.capacity_.load(); } void EdgeTable::Swap(EdgeTable& edge_table) { @@ -469,12 +472,35 @@ void EdgeTable::Swap(EdgeTable& edge_table) { auto t_idx = table_idx_.load(); table_idx_.store(edge_table.table_idx_.load()); edge_table.table_idx_.store(t_idx); + auto cap = capacity_.load(); + capacity_.store(edge_table.capacity_.load()); + edge_table.capacity_.store(cap); } void EdgeTable::SetEdgeSchema(std::shared_ptr meta) { meta_ = meta; } +void load_statistic_file(const std::string& work_dir, + const std::string& src_label_name, + const std::string& dst_label_name, + const std::string& edge_label_name, + std::atomic& cap_atomic, + std::atomic& table_idx_atomic) { + size_t cap = 0, size = 0; + auto statistic_file_path = + checkpoint_dir(work_dir) + "/" + + statistics_file_prefix(src_label_name, dst_label_name, edge_label_name); + if (!std::filesystem::exists(statistic_file_path)) { + cap_atomic.store(0); + table_idx_atomic.store(0); + return; + } + read_statistic_file(statistic_file_path, cap, size); + cap_atomic.store(cap); + table_idx_atomic.store(size); +} + void EdgeTable::Open(const std::string& work_dir) { work_dir_ = work_dir; memory_level_ = 0; @@ -491,10 +517,14 @@ void EdgeTable::Open(const std::string& work_dir) { work_dir, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } @@ -520,10 +550,14 @@ void EdgeTable::OpenInMemory(const std::string& work_dir, size_t src_v_cap, work_dir_, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } @@ -549,10 +583,14 @@ void EdgeTable::OpenWithHugepages(const std::string& work_dir, size_t src_v_cap, checkpoint_dir_path, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies, (memory_level_ > 2)); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } @@ -567,6 +605,11 @@ void EdgeTable::Dump(const std::string& checkpoint_dir_path) { table_->dump(edata_prefix(meta_->src_label_name, meta_->dst_label_name, meta_->edge_label_name), checkpoint_dir_path); + auto statistc_file_path = + checkpoint_dir_path + "/" + + statistics_file_prefix(meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name); + write_statistic_file(statistc_file_path, Capacity(), Size()); } } @@ -674,6 +717,18 @@ void EdgeTable::Resize(vid_t src_vertex_num, vid_t dst_vertex_num) { in_csr_->resize(dst_vertex_num); } +void EdgeTable::EnsureCapacity(size_t capacity) { + if (!meta_->is_bundled()) { + if (capacity <= capacity_.load()) { + return; + } + LOG(INFO) << "resize edge table from capacity " << capacity_.load() + << " to " << capacity; + table_->resize(capacity); + capacity_.store(capacity); + } +} + size_t EdgeTable::EdgeNum() const { if (out_csr_) { return out_csr_->edge_num(); @@ -824,6 +879,10 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, std::vector valid_flags; // true for valid edges std::tie(src_lid, dst_lid, valid_flags) = filterInvalidEdges(src_lid, dst_lid); + size_t new_size = table_idx_.load() + src_lid.size(); + if (new_size >= capacity_.load()) { + EnsureCapacity(calculate_new_capacity(new_size, false)); + } if (meta_->is_bundled()) { auto edges = extract_bundled_edge_data_from_batches(meta_, data_batches, valid_flags); @@ -833,9 +892,9 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, auto oe_csr = dynamic_cast*>(out_csr_.get()); auto ie_csr = dynamic_cast*>(in_csr_.get()); assert(oe_csr != nullptr && ie_csr != nullptr); - batch_add_unbundled_edges_impl(src_lid, dst_lid, oe_csr, ie_csr, - table_.get(), table_idx_, meta_->properties, - data_batches, valid_flags); + batch_add_unbundled_edges_impl( + src_lid, dst_lid, oe_csr, ie_csr, table_.get(), table_idx_, capacity_, + meta_->properties, data_batches, valid_flags); } } @@ -843,6 +902,10 @@ void EdgeTable::BatchAddEdges( const std::vector& src_lid_list, const std::vector& dst_lid_list, const std::vector>& edge_data_list) { + size_t new_size = table_idx_.load() + src_lid_list.size(); + if (new_size >= capacity_.load()) { + EnsureCapacity(calculate_new_capacity(new_size, false)); + } if (meta_->is_bundled()) { std::vector flat_edge_data; assert(meta_->properties.size() == 1); @@ -864,7 +927,6 @@ void EdgeTable::BatchAddEdges( size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(oe_csr, ie_csr, src_lid_list, dst_lid_list, offset); - table_->resize(offset + src_lid_list.size()); for (size_t i = 0; i < edge_data_list.size(); ++i) { table_->insert(offset + i, edge_data_list[i], true); } @@ -924,6 +986,7 @@ void EdgeTable::dropAndCreateNewBundledCSR() { in_csr_ = std::move(new_in_csr); } +// TODO(zhanglei): Keep table_idx_ and capacity_ right. void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { auto suffix = get_next_csr_path_suffix(); std::string next_oe_csr_path = @@ -955,9 +1018,19 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (table_->col_num() >= 1) { prev_data_col = table_->get_column_by_id(0); } + } else { + // delete_property == true, which means the EdgeTable will become use csr of + // empty type. we need to reset capacity and table_idx to 0 + table_idx_.store(0); + capacity_.store(0); } auto edges = out_csr_->batch_export(prev_data_col); + if (prev_data_col && prev_data_col->size() > 0) { + table_->resize(prev_data_col->size()); + table_idx_.store(prev_data_col->size()); + EnsureCapacity(calculate_new_capacity(prev_data_col->size(), false)); + } // Set default value for other columns for (size_t col_id = 1; col_id < table_->col_num(); ++col_id) { auto col = table_->get_column_by_id(col_id); @@ -978,11 +1051,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { for (size_t i = 0; i < std::get<0>(edges).size(); ++i) { row_ids.push_back(i); } - if (!delete_property) { - if (prev_data_col->size() > 0) { - table_->resize(prev_data_col->size()); - } - } + std::unique_ptr new_out_csr, new_in_csr; if (delete_property) { new_out_csr = diff --git a/src/storages/graph/graph_interface.cc b/src/storages/graph/graph_interface.cc index 1999a479..8b9ba2ae 100644 --- a/src/storages/graph/graph_interface.cc +++ b/src/storages/graph/graph_interface.cc @@ -35,12 +35,14 @@ void StorageAPUpdateInterface::UpdateEdgeProperty( bool StorageAPUpdateInterface::AddVertex(label_t label, const Property& id, const std::vector& props, vid_t& vid) { - const auto& table = graph_.get_vertex_table(label); - if (table.LidNum() >= table.Capacity()) { - graph_.Reserve(label, table.Capacity() * 2); + auto status = graph_.EnsureCapacity(label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space for vertex of label " + << graph_.schema().get_vertex_label_name(label) << ": " + << status.ToString(); + return false; } - auto status = - graph_.AddVertex(label, id, props, vid, neug::timestamp_t(0), true); + status = graph_.AddVertex(label, id, props, vid, neug::timestamp_t(0)); if (!status.ok()) { LOG(ERROR) << "AddVertex failed: " << status.ToString(); } @@ -50,6 +52,11 @@ bool StorageAPUpdateInterface::AddVertex(label_t label, const Property& id, bool StorageAPUpdateInterface::AddEdge( label_t src_label, vid_t src, label_t dst_label, vid_t dst, label_t edge_label, const std::vector& properties) { + auto status = graph_.EnsureCapacity(src_label, dst_label, edge_label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space for edge of: " << status.ToString(); + return false; + } graph_.AddEdge(src_label, src, dst_label, dst, edge_label, properties, neug::timestamp_t(0), alloc_, true); return true; diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index a7ee6639..356b997c 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -30,6 +30,7 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" #include "neug/utils/file_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/indexers.h" #include "neug/utils/property/types.h" #include "neug/utils/yaml_utils.h" @@ -98,10 +99,52 @@ Status PropertyGraph::Reserve(label_t v_label, vid_t vertex_reserve_size) { edge_tables_.at(index).Resize(vertex_tables_[v_label].Capacity(), vertex_tables_[dst_label].Capacity()); } - index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (v_label != dst_label) { + index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (edge_tables_.count(index) > 0) { + edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), + vertex_tables_[v_label].Capacity()); + } + } + } + } + return neug::Status::OK(); + } else { + return Status(StatusCode::ERR_INVALID_ARGUMENT, + "Vertex label does not exist."); + } +} + +Status PropertyGraph::EnsureCapacity(label_t v_label, size_t capacity) { + if (schema_.vertex_label_valid(v_label)) { + auto old_cap = vertex_tables_[v_label].Capacity(); + if (capacity == 0) { + auto old_size = vertex_tables_[v_label].Size(); + if (old_size >= old_cap) { + capacity = neug::calculate_new_capacity(old_size, true); + } + } + if (capacity <= old_cap) { + return neug::Status::OK(); + } + auto v_new_cap = vertex_tables_[v_label].EnsureCapacity(capacity); + for (label_t dst_label = 0; dst_label < vertex_label_total_count_; + ++dst_label) { + if (!schema_.vertex_label_valid(dst_label)) { + continue; + } + for (label_t e_label = 0; e_label < edge_label_total_count_; ++e_label) { + size_t index = schema_.generate_edge_label(v_label, dst_label, e_label); if (edge_tables_.count(index) > 0) { - edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), - vertex_tables_[v_label].Capacity()); + edge_tables_.at(index).Resize(v_new_cap, + vertex_tables_[dst_label].Capacity()); + } + if (v_label != dst_label) { + index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (edge_tables_.count(index) > 0) { + edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), + v_new_cap); + } } } } @@ -112,6 +155,33 @@ Status PropertyGraph::Reserve(label_t v_label, vid_t vertex_reserve_size) { } } +Status PropertyGraph::EnsureCapacity(label_t src_label, label_t dst_label, + label_t edge_label, size_t capacity) { + if (!schema_.exist(src_label, dst_label, edge_label)) { + return Status(StatusCode::ERR_INVALID_ARGUMENT, + "Edge label does not exist for the given source and " + "destination vertex labels."); + } + size_t index = schema_.generate_edge_label(src_label, dst_label, edge_label); + if (edge_tables_.count(index) == 0) { + return Status( + StatusCode::ERR_INVALID_ARGUMENT, + "Edge table for the given edge label triplet does not exist."); + } + size_t old_cap = edge_tables_.at(index).Capacity(); + if (capacity == 0) { + size_t old_size = edge_tables_.at(index).Size(); + if (old_size >= old_cap) { + capacity = neug::calculate_new_capacity(old_size, false); + } + } + if (capacity <= old_cap) { + return neug::Status::OK(); + } + edge_tables_.at(index).EnsureCapacity(capacity); + return neug::Status::OK(); +} + Status PropertyGraph::BatchAddVertices( label_t v_label, std::shared_ptr supplier) { assert(v_label < vertex_tables_.size()); @@ -222,7 +292,7 @@ Status PropertyGraph::CreateVertexType( } auto& vtable = vertex_tables_.back(); - vtable.Open(work_dir_, memory_level_, true); + vtable.Open(work_dir_, memory_level_); vtable.Reserve(4096); vertex_label_total_count_ = schema_.vertex_label_frontier(); assert(vertex_tables_.size() == vertex_label_total_count_); @@ -789,46 +859,26 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { memory_level_ = memory_level; work_dir_.assign(work_dir); std::string schema_file = schema_path(work_dir_); - std::string checkpoint_dir_path{}; - bool build_empty_graph = false; + std::string checkpoint_dir_path = checkpoint_dir(work_dir_); if (std::filesystem::exists(schema_file)) { loadSchema(schema_file); - vertex_label_total_count_ = schema_.vertex_label_frontier(); - edge_label_total_count_ = schema_.edge_label_frontier(); - for (size_t i = 0; i < vertex_label_total_count_; i++) { - if (!schema_.vertex_label_valid(i)) { - THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + - std::to_string(i)); - } - std::string v_label_name = schema_.get_vertex_label_name(i); - auto properties = schema_.get_vertex_properties(i); - auto property_names = schema_.get_vertex_property_names(i); - auto property_strategies = - schema_.get_vertex_storage_strategies(v_label_name); - vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); - } - checkpoint_dir_path = checkpoint_dir(work_dir_); } else { - vertex_label_total_count_ = schema_.vertex_label_frontier(); - edge_label_total_count_ = schema_.edge_label_frontier(); - for (size_t i = 0; i < vertex_label_total_count_; i++) { - if (!schema_.vertex_label_valid(i)) { - THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + - std::to_string(i)); - } - std::string v_label_name = schema_.get_vertex_label_name(i); - auto properties = schema_.get_vertex_properties(i); - auto property_names = schema_.get_vertex_property_names(i); - auto property_strategies = - schema_.get_vertex_storage_strategies(v_label_name); - vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); - } - build_empty_graph = true; LOG(INFO) << "Schema file not found, build empty graph"; - - checkpoint_dir_path = checkpoint_dir(work_dir_); std::filesystem::create_directories(checkpoint_dir_path); } + vertex_label_total_count_ = schema_.vertex_label_frontier(); + edge_label_total_count_ = schema_.edge_label_frontier(); + for (size_t i = 0; i < vertex_label_total_count_; i++) { + if (!schema_.vertex_label_valid(i)) { + THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + std::to_string(i)); + } + std::string v_label_name = schema_.get_vertex_label_name(i); + auto properties = schema_.get_vertex_properties(i); + auto property_names = schema_.get_vertex_property_names(i); + auto property_strategies = + schema_.get_vertex_storage_strategies(v_label_name); + vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); + } std::string tmp_dir_path = tmp_dir(work_dir_); @@ -845,14 +895,14 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { } std::string v_label_name = schema_.get_vertex_label_name(i); - vertex_tables_[i].Open(work_dir_, memory_level, build_empty_graph); - - // We will reserve the at least 4096 slots for each vertex label - size_t vertex_capacity = - std::max(vertex_tables_[i].get_indexer().capacity(), (size_t) 4096); - vertex_tables_[i].Reserve(vertex_capacity); - - vertex_capacities[i] = vertex_capacity; + vertex_tables_[i].Open(work_dir_, memory_level); + // Case 1: Open from checkpoint, the capacity should be already reserved and + // satisfied. + // Case 2: Open from empty, Capacity should be the default minimum + // capacity(4096) + vertex_tables_[i].EnsureCapacity( + calculate_new_capacity(vertex_tables_[i].Size(), true)); + vertex_capacities[i] = vertex_tables_[i].Capacity(); } for (size_t src_label_i = 0; src_label_i != vertex_label_total_count_; @@ -894,9 +944,12 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { edge_table.OpenInMemory(work_dir_, vertex_capacities[src_label_i], vertex_capacities[dst_label_i]); } - - edge_table.Resize(vertex_capacities[src_label_i], - vertex_capacities[dst_label_i]); + edge_table.EnsureCapacity( + calculate_new_capacity(edge_table.Size(), false)); + if (memory_level_ == 0) { + edge_table.Resize(vertex_capacities[src_label_i], + vertex_capacities[dst_label_i]); + } edge_tables_.emplace(index, std::move(edge_table)); } } @@ -1014,6 +1067,7 @@ void PropertyGraph::Compact(bool compact_csr, float reserve_ratio, } } } + LOG(INFO) << "Compaction completed."; } void PropertyGraph::Dump(bool reopen) { @@ -1035,9 +1089,17 @@ void PropertyGraph::Dump(bool reopen) { THROW_RUNTIME_ERROR(ss.str()); } std::vector vertex_num(vertex_label_total_count_, 0); + std::vector vertex_capacity(vertex_label_total_count_, 0); for (size_t i = 0; i < vertex_label_total_count_; ++i) { if (!vertex_tables_[i].is_dropped()) { vertex_num[i] = vertex_tables_[i].LidNum(); + LOG(INFO) << "Dump vertex table for label " << i + << ", vertex num: " << vertex_num[i] + << ", capacity: " << vertex_tables_[i].Capacity() + << ", new capacity: " + << calculate_new_capacity(vertex_num[i], true); + EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true)); + vertex_capacity[i] = vertex_tables_[i].Capacity(); vertex_tables_[i].Dump(target_dir); } } @@ -1071,7 +1133,16 @@ void PropertyGraph::Dump(bool reopen) { schema_.generate_edge_label(src_label_i, dst_label_i, e_label_i); if (edge_tables_.count(index) > 0) { auto& edge_table = edge_tables_.at(index); - edge_table.Resize(vertex_num[src_label_i], vertex_num[dst_label_i]); + edge_table.Resize(vertex_capacity[src_label_i], + vertex_capacity[dst_label_i]); + LOG(INFO) << "Dump edge table for edge label " << edge_label + << " from " << src_label << " to " << dst_label + << ", edge num: " << edge_table.EdgeNum() + << ", capacity: " << edge_table.Capacity() + << ", new capacity: " + << calculate_new_capacity(edge_table.Size(), false); + EnsureCapacity(src_label_i, dst_label_i, e_label_i, + calculate_new_capacity(edge_table.Size(), false)); edge_table.Dump(target_dir); } } diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 6fd36761..9823e456 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -14,12 +14,12 @@ */ #include "neug/storages/graph/vertex_table.h" +#include "neug/utils/file_utils.h" #include "neug/utils/likely.h" namespace neug { -void VertexTable::Open(const std::string& work_dir, int memory_level, - bool build_empty_graph) { +void VertexTable::Open(const std::string& work_dir, int memory_level) { memory_level_ = memory_level; work_dir_ = work_dir; std::string tmp_dir_path = tmp_dir(work_dir_); @@ -84,13 +84,8 @@ void VertexTable::Dump(const std::string& target_dir) { const auto& label_name = vertex_schema_->label_name; indexer_.dump(IndexerType::prefix() + "_" + vertex_map_prefix(label_name), target_dir); - table_->resize(indexer_.size()); table_->dump(vertex_table_prefix(label_name), target_dir); - // Shrink v_ts_ to fit the indexer size - v_ts_.Reserve(indexer_.size()); v_ts_.Dump(target_dir + "/" + vertex_tracker_file(label_name)); - VLOG(1) << "Dump vertex table " << label_name << " done, size " - << indexer_.size(); } void VertexTable::Close() { @@ -186,12 +181,24 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { void VertexTable::Reserve(size_t cap) { if (cap > indexer_.capacity()) { + LOG(INFO) << "Reserving vertex table capacity from " << indexer_.capacity() + << " to " << cap; indexer_.reserve(cap); } - if (table_) { + if (table_ && table_->size() < cap) { + LOG(INFO) << "Resizing vertex table from size " << table_->size() << " to " + << cap; table_->resize(cap); - v_ts_.Reserve(cap); } + v_ts_.Reserve(cap); +} + +size_t VertexTable::EnsureCapacity(size_t capacity) { + if (capacity <= indexer_.capacity()) { + return indexer_.capacity(); + } + Reserve(capacity); + return indexer_.capacity(); } void VertexTable::BatchDeleteVertices(const std::vector& vids) { diff --git a/src/storages/loader/abstract_property_graph_loader.cc b/src/storages/loader/abstract_property_graph_loader.cc index 9c1ca2c5..fac11e99 100644 --- a/src/storages/loader/abstract_property_graph_loader.cc +++ b/src/storages/loader/abstract_property_graph_loader.cc @@ -178,8 +178,8 @@ void AbstractPropertyGraphLoader::loadEdges() { for (auto& thread : threads) { thread.join(); } - LOG(INFO) << "Finished loading edges"; } + LOG(INFO) << "Finished loading edges"; } result AbstractPropertyGraphLoader::LoadFragment() { diff --git a/src/transaction/update_transaction.cc b/src/transaction/update_transaction.cc index 8848865c..6e0408c4 100644 --- a/src/transaction/update_transaction.cc +++ b/src/transaction/update_transaction.cc @@ -680,14 +680,16 @@ bool UpdateTransaction::AddVertex(label_t label, const Property& oid, } } - InsertVertexRedo::Serialize(arc_, label, oid, props); - op_num_ += 1; - auto status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); + auto status = graph_.EnsureCapacity(label); if (!status.ok()) { - // The most possible reason is that the space is not enough. - graph_.Reserve(label, std::max((vid_t) 1024, graph_.LidNum(label) * 2)); - status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); + LOG(ERROR) << "Failed to ensure space for vertex of label " + << graph_.schema().get_vertex_label_name(label) << ": " + << status.ToString(); + return false; } + InsertVertexRedo::Serialize(arc_, label, oid, props); + op_num_ += 1; + status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); if (!status.ok()) { LOG(ERROR) << "Failed to add vertex of label " << graph_.schema().get_vertex_label_name(label) << ": " @@ -737,6 +739,12 @@ bool UpdateTransaction::AddEdge(label_t src_label, vid_t src_lid, dst_label, GetVertexId(dst_label, dst_lid), edge_label, properties); op_num_ += 1; + auto status = graph_.EnsureCapacity(src_label, dst_label, edge_label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space before insert edge: " + << status.ToString(); + return false; + } auto oe_offset = graph_.AddEdge(src_label, src_lid, dst_label, dst_lid, edge_label, properties, timestamp_, alloc_, true); diff --git a/src/utils/file_utils.cc b/src/utils/file_utils.cc index 3bfd6cec..584cf5c5 100644 --- a/src/utils/file_utils.cc +++ b/src/utils/file_utils.cc @@ -112,4 +112,72 @@ void remove_directory(const std::string& dir_path) { } } +void read_file(const std::string& filename, void* buffer, size_t size, + size_t num) { + FILE* fin = fopen(filename.c_str(), "rb"); + if (fin == nullptr) { + std::stringstream ss; + ss << "Failed to open file " << filename << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + size_t ret_len = 0; + if ((ret_len = fread(buffer, size, num, fin)) != num) { + std::stringstream ss; + ss << "Failed to read file " << filename << ", expected " << num << ", got " + << ret_len << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + int ret = 0; + if ((ret = fclose(fin)) != 0) { + std::stringstream ss; + ss << "Failed to close file " << filename << ", error code: " << ret << " " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } +} + +void write_file(const std::string& filename, const void* buffer, size_t size, + size_t num) { + FILE* fout = fopen(filename.c_str(), "wb"); + if (fout == nullptr) { + std::stringstream ss; + ss << "Failed to open file " << filename << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + size_t ret_len = 0; + if ((ret_len = fwrite(buffer, size, num, fout)) != num) { + std::stringstream ss; + ss << "Failed to write file " << filename << ", expected " << num + << ", got " << ret_len << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + int ret = 0; + if ((ret = fclose(fout)) != 0) { + std::stringstream ss; + ss << "Failed to close file " << filename << ", error code: " << ret << " " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } +} + +void write_statistic_file(const std::string& filename, size_t capacity, + size_t size) { + size_t buffer[2] = {capacity, size}; + write_file(filename, buffer, sizeof(size_t), 2); +} + +void read_statistic_file(const std::string& filename, size_t& capacity, + size_t& size) { + size_t buffer[2]; + read_file(filename, buffer, sizeof(size_t), 2); + capacity = buffer[0]; + size = buffer[1]; +} + } // namespace neug diff --git a/src/utils/property/column.cc b/src/utils/property/column.cc index 47cc1513..01b3401c 100644 --- a/src/utils/property/column.cc +++ b/src/utils/property/column.cc @@ -76,7 +76,7 @@ class TypedEmptyColumn : public ColumnBase { void set_any(size_t index, const Property& value, bool insert_safe) override { } - T get_view(size_t index) const { T{}; } + T get_view(size_t index) const { return T{}; } Property get_prop(size_t index) const override { return Property(); } diff --git a/tests/storage/test_edge_table.cc b/tests/storage/test_edge_table.cc index 42b385fb..4d834d47 100644 --- a/tests/storage/test_edge_table.cc +++ b/tests/storage/test_edge_table.cc @@ -895,6 +895,7 @@ TEST_F(EdgeTableTest, TestAddEdgeDeleteUnbundled) { neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); size_t edge_count = 0; + this->edge_table->EnsureCapacity(edge_data.size()); for (size_t i = 0; i < src_lids.size(); ++i) { this->edge_table->AddEdge(src_lids[i], dst_lids[i], edge_data[i], 0, allocator); @@ -1043,6 +1044,7 @@ TEST_F(EdgeTableTest, TestUpdateEdgeData) { neug::Property::from_int32(static_cast(0))}); } + this->edge_table->EnsureCapacity(edge_data.size()); neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); for (size_t i = 0; i < src_lids.size(); ++i) { this->edge_table->AddEdge(src_lids[i], dst_lids[i], edge_data[i], 0, diff --git a/tests/storage/test_vertex_table.cc b/tests/storage/test_vertex_table.cc index 5bee40fd..403f051e 100644 --- a/tests/storage/test_vertex_table.cc +++ b/tests/storage/test_vertex_table.cc @@ -113,7 +113,7 @@ class VertexTableTest : public ::testing::Test { TEST_F(VertexTableTest, VertexTableBasicOps) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(vertex_count_); neug::vid_t lid1, lid2, lid3; @@ -166,7 +166,7 @@ TEST_F(VertexTableTest, VertexTableDumpAndReload) { std::filesystem::create_directories(neug::temp_checkpoint_dir(dump_dir)); { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(vertex_count_); neug::vid_t lid1, lid2, lid3; @@ -209,7 +209,7 @@ TEST_F(VertexTableTest, VertexTableAddAndDeleteAndReload) { neug::Property oid1, oid2, oid3; { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(vertex_count_); oid1.set_int64(1); @@ -270,7 +270,7 @@ TEST_F(VertexTableTest, VertexTableAddAndDeleteAndReload) { TEST_F(VertexTableTest, AddVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -309,7 +309,7 @@ TEST_F(VertexTableTest, AddVertexBasic) { // Test AddVertex for concurrent scenarios TEST_F(VertexTableTest, AddVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); neug::vid_t tmp_vid; EXPECT_FALSE(table.AddVertex(neug::Property::from_int64(1), property_values_, tmp_vid)); @@ -340,7 +340,7 @@ TEST_F(VertexTableTest, AddVertex) { // Test DeleteVertex basic functionality TEST_F(VertexTableTest, DeleteVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -373,7 +373,7 @@ TEST_F(VertexTableTest, DeleteVertexBasic) { // Test RevertDeleteVertex basic functionality TEST_F(VertexTableTest, RevertDeleteVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1; @@ -403,7 +403,7 @@ TEST_F(VertexTableTest, RevertDeleteVertexBasic) { // Test complex combination: Add -> Delete -> Revert TEST_F(VertexTableTest, AddDeleteRevertCombination) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); std::vector oids; @@ -448,7 +448,7 @@ TEST_F(VertexTableTest, AddDeleteRevertCombination) { // Test complex combination: Multiple deletes and reverts TEST_F(VertexTableTest, MultipleDeletesAndReverts) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -483,7 +483,7 @@ TEST_F(VertexTableTest, MultipleDeletesAndReverts) { // Test AddVertex and AddVertexSafe mixed usage TEST_F(VertexTableTest, MixedAddVertexAndAddVertexSafe) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(50); std::vector lids; @@ -507,7 +507,7 @@ TEST_F(VertexTableTest, MixedAddVertexAndAddVertexSafe) { // Test temporal visibility with add/delete/revert TEST_F(VertexTableTest, TemporalVisibilityComplex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -550,7 +550,7 @@ TEST_F(VertexTableTest, TemporalVisibilityComplex) { // Test edge cases: Delete already deleted vertex TEST_F(VertexTableTest, DeleteAlreadyDeletedVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -573,7 +573,7 @@ TEST_F(VertexTableTest, DeleteAlreadyDeletedVertex) { // Test edge cases: Revert non-deleted vertex TEST_F(VertexTableTest, RevertNonDeletedVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -606,7 +606,7 @@ TEST_F(VertexTableTest, ComplexAddDeleteRevertDumpReload) { // Create complex state { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(100); for (int64_t i = 0; i < 20; ++i) { @@ -667,7 +667,7 @@ TEST_F(VertexTableTest, ComplexAddDeleteRevertDumpReload) { // Test stress: Many add/delete/revert operations TEST_F(VertexTableTest, StressAddDeleteRevert) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(1000); std::vector oids; @@ -719,7 +719,7 @@ TEST_F(VertexTableTest, StressAddDeleteRevert) { TEST_F(VertexTableTest, VertexTableResizeTest) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); auto record_batches = generate_record_batches(10000); std::shared_ptr batch_supplier = std::make_shared(std::move(record_batches)); @@ -789,7 +789,7 @@ TEST_F(VertexTableTest, VertexTimestampValidVertexNum) { TEST_F(VertexTableTest, VertexSetForeachVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); std::vector oids; diff --git a/tests/storage/vertex_table_benchmark.cc b/tests/storage/vertex_table_benchmark.cc index e11fdd2f..32ce5e94 100644 --- a/tests/storage/vertex_table_benchmark.cc +++ b/tests/storage/vertex_table_benchmark.cc @@ -70,7 +70,7 @@ class VertexTableBenchmark : public ::testing::Test { void CreateAndOpenVertexTable(neug::VertexTable& table) { // Open the vertex table - table.Open(test_dir_, 1, true); // memory_level=1 for in-memory + table.Open(test_dir_, 1); // memory_level=1 for in-memory } void AddVerticesWithProperties(neug::VertexTable& table, size_t count) { diff --git a/tests/transaction/test_acid.cc b/tests/transaction/test_acid.cc index ef3ae806..7c6e2dbe 100644 --- a/tests/transaction/test_acid.cc +++ b/tests/transaction/test_acid.cc @@ -343,6 +343,10 @@ std::shared_ptr G0Init(NeugDB& db, auto person_label_id = schema.get_vertex_label_id("PERSON"); auto knows_label_id = schema.get_edge_label_id("KNOWS"); + db.graph().EnsureCapacity(person_label_id, 1000); + db.graph().EnsureCapacity(person_label_id, knows_label_id, person_label_id, + 1000); + auto sess = svc->AcquireSession(); auto txn = sess->GetInsertTransaction(); StorageTPInsertInterface gii(txn); diff --git a/tests/unittest/test_connection.cc b/tests/unittest/test_connection.cc index 706f4f78..026d8835 100644 --- a/tests/unittest/test_connection.cc +++ b/tests/unittest/test_connection.cc @@ -132,9 +132,8 @@ TEST_F(ConnectionTest, TestReadWriteConnection) { auto conn1 = db.Connect(); EXPECT_NE(conn1, nullptr); - EXPECT_THROW( - { auto conn2 = db.Connect(); }, - neug::exception::TxStateConflictException); + EXPECT_THROW({ auto conn2 = db.Connect(); }, + neug::exception::TxStateConflictException); } TEST_F(ConnectionTest, TestReadOnlyConnections) { diff --git a/tools/python_bind/tests/test_db_query.py b/tools/python_bind/tests/test_db_query.py index d0979f22..2c324484 100644 --- a/tools/python_bind/tests/test_db_query.py +++ b/tools/python_bind/tests/test_db_query.py @@ -2556,3 +2556,24 @@ def test_insert_many_vertices(): assert records == [[10000]], f"Expected value [[10000]], got {records}" conn.close() db.close() + + +def test_insert_many_edges(): + db_dir = "/tmp/test_insert_many_edges" + shutil.rmtree(db_dir, ignore_errors=True) + db = Database(db_path=db_dir, mode="w") + conn = db.connect() + conn.execute("CREATE NODE TABLE Person(id INT64, PRIMARY KEY(id));") + conn.execute("CREATE REL TABLE Knows(FROM Person TO Person);") + for i in range(100): + conn.execute(f"CREATE (p: Person {{id: {i}}});") + for i in range(100): + for j in range(i + 1, 100): + conn.execute( + f"MATCH (p1: Person {{id: {i}}}), (p2: Person {{id: {j}}}) CREATE (p1)-[:Knows]->(p2);" + ) + res = conn.execute("MATCH ()-[e: Knows]->() RETURN count(e);") + records = list(res) + assert records == [[4950]], f"Expected value [[4950]], got {records}" + conn.close() + db.close()