From a2f627c2055df9c099c10e303f3560f91443e110 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Fri, 6 Mar 2026 18:56:54 +0800 Subject: [PATCH 01/19] fix overflow for vertex/edge insert Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container add capacity api Committed-by: xiaolei.zl from Dev container fix CI Committed-by: xiaolei.zl from Dev container remove tests for TP Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container fixing Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container fix Update src/storages/graph/edge_table.cc Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> fixes fix use explicit capacity calculation for EnsureCapacity --- include/neug/storages/csr/csr_base.h | 4 ++ include/neug/storages/csr/immutable_csr.h | 4 ++ include/neug/storages/csr/mutable_csr.h | 6 ++ include/neug/storages/graph/edge_table.h | 18 +++++ include/neug/storages/graph/property_graph.h | 5 ++ include/neug/storages/graph/vertex_table.h | 4 ++ include/neug/utils/growth.h | 35 +++++++++ src/storages/csr/immutable_csr.cc | 11 +++ src/storages/csr/mutable_csr.cc | 11 +++ src/storages/graph/edge_table.cc | 49 +++++++++---- src/storages/graph/graph_interface.cc | 17 +++-- src/storages/graph/property_graph.cc | 76 +++++++++++++++++++- src/storages/graph/vertex_table.cc | 8 +++ src/transaction/update_transaction.cc | 20 ++++-- src/utils/property/column.cc | 2 +- tools/python_bind/tests/test_db_query.py | 21 ++++++ 16 files changed, 264 insertions(+), 27 deletions(-) create mode 100644 include/neug/utils/growth.h diff --git a/include/neug/storages/csr/csr_base.h b/include/neug/storages/csr/csr_base.h index 3589527c7..450775755 100644 --- a/include/neug/storages/csr/csr_base.h +++ b/include/neug/storages/csr/csr_base.h @@ -34,6 +34,8 @@ enum class CsrType { class CsrBase { public: + static constexpr size_t INFINITE_CAPACITY = + std::numeric_limits::max(); CsrBase() = default; virtual ~CsrBase() = default; @@ -66,6 +68,8 @@ class CsrBase { virtual void resize(vid_t vnum) = 0; + virtual size_t capacity() const = 0; + virtual void close() = 0; virtual void batch_sort_by_edge_data(timestamp_t ts) { diff --git a/include/neug/storages/csr/immutable_csr.h b/include/neug/storages/csr/immutable_csr.h index 329508aec..13ab7ecf8 100644 --- a/include/neug/storages/csr/immutable_csr.h +++ b/include/neug/storages/csr/immutable_csr.h @@ -83,6 +83,8 @@ class ImmutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -176,6 +178,8 @@ class SingleImmutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; diff --git a/include/neug/storages/csr/mutable_csr.h b/include/neug/storages/csr/mutable_csr.h index fc246cb10..acbfd272e 100644 --- a/include/neug/storages/csr/mutable_csr.h +++ b/include/neug/storages/csr/mutable_csr.h @@ -104,6 +104,8 @@ class MutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -252,6 +254,8 @@ class SingleMutableCsr : public TypedCsrBase { void resize(vid_t vnum) override; + size_t capacity() const override; + void close() override; void batch_sort_by_edge_data(timestamp_t ts) override; @@ -336,6 +340,8 @@ class EmptyCsr : public TypedCsrBase { void resize(vid_t vnum) override {} + size_t capacity() const override { return 0; } + void close() override {} void batch_sort_by_edge_data(timestamp_t ts) override {} diff --git a/include/neug/storages/graph/edge_table.h b/include/neug/storages/graph/edge_table.h index a5a68cab7..083d26f31 100644 --- a/include/neug/storages/graph/edge_table.h +++ b/include/neug/storages/graph/edge_table.h @@ -72,6 +72,8 @@ class EdgeTable { void Resize(vid_t src_vertex_num, vid_t dst_vertex_num); + void EnsureCapacity(size_t capacity); + size_t EdgeNum() const; size_t PropertyNum() const; @@ -128,6 +130,21 @@ class EdgeTable { void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts); + inline size_t Size() const { return table_idx_.load(); } + + inline size_t Capacity() const { + if (meta_->is_bundled()) { + if (out_csr_) { + return out_csr_->capacity(); + } else if (in_csr_) { + return in_csr_->capacity(); + } else { + THROW_RUNTIME_ERROR("both csr are null"); + } + } + return capacity_.load(); + } + private: void dropAndCreateNewBundledCSR(); void dropAndCreateNewUnbundledCSR(bool delete_property); @@ -141,6 +158,7 @@ class EdgeTable { std::unique_ptr in_csr_; std::unique_ptr table_; std::atomic table_idx_{0}; + std::atomic capacity_{0}; friend class PropertyGraph; }; diff --git a/include/neug/storages/graph/property_graph.h b/include/neug/storages/graph/property_graph.h index a8c4c5b77..33bbc5ee5 100644 --- a/include/neug/storages/graph/property_graph.h +++ b/include/neug/storages/graph/property_graph.h @@ -298,6 +298,11 @@ class PropertyGraph { Status Reserve(label_t v_label, vid_t vertex_reserve_size); + Status EnsureCapacity(label_t v_label, size_t capacity = 0); + + Status EnsureCapacity(label_t src_label, label_t dst_label, + label_t edge_label, size_t capacity = 0); + Status BatchAddVertices(label_t v_label_id, std::shared_ptr supplier); diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index 6af52bb29..cf4520a71 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -122,6 +122,8 @@ class VertexTable { void Reserve(size_t cap); + size_t EnsureCapacity(size_t capacity); + bool is_dropped() const { return table_ == nullptr; } bool get_index(const Property& oid, vid_t& lid, @@ -144,6 +146,8 @@ class VertexTable { // Capacity of the vertex table inline size_t Capacity() const { return indexer_.capacity(); } + inline size_t Size() const { return indexer_.size(); } + bool IsValidLid(vid_t lid, timestamp_t ts = MAX_TIMESTAMP) const; IndexerType& get_indexer() { return indexer_; } diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h new file mode 100644 index 000000000..beb0f4c1d --- /dev/null +++ b/include/neug/utils/growth.h @@ -0,0 +1,35 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace neug { +inline size_t calculate_new_capacity(size_t current_capacity, + bool is_vertex_table) { + if (is_vertex_table) { + // For vertex tables, we grow exponentially: double the current capacity, + // with a 4K floor. + return current_capacity == 0 ? 4096 : current_capacity * 2; + } else { + // For edge tables, we grow linearly: new capacity = current capacity + + // (current capacity + 4) / 5. + return current_capacity + (current_capacity + 4) / 5; + } +} +} // namespace neug diff --git a/src/storages/csr/immutable_csr.cc b/src/storages/csr/immutable_csr.cc index 5a6cf1f12..86fdd01ad 100644 --- a/src/storages/csr/immutable_csr.cc +++ b/src/storages/csr/immutable_csr.cc @@ -189,6 +189,12 @@ void ImmutableCsr::resize(vid_t vnum) { } } +template +size_t ImmutableCsr::capacity() const { + // We assume the capacity of each csr is INFINITE. + return CsrBase::INFINITE_CAPACITY; +} + template void ImmutableCsr::close() { adj_lists_.reset(); @@ -463,6 +469,11 @@ void SingleImmutableCsr::resize(vid_t vnum) { } } +template +size_t SingleImmutableCsr::capacity() const { + return nbr_list_.size(); +} + template void SingleImmutableCsr::close() { nbr_list_.reset(); diff --git a/src/storages/csr/mutable_csr.cc b/src/storages/csr/mutable_csr.cc index b20a0adcf..569312a6c 100644 --- a/src/storages/csr/mutable_csr.cc +++ b/src/storages/csr/mutable_csr.cc @@ -343,6 +343,12 @@ void MutableCsr::resize(vid_t vnum) { } } +template +size_t MutableCsr::capacity() const { + // We assume the capacity of each csr is INFINITE. + return CsrBase::INFINITE_CAPACITY; +} + template void MutableCsr::close() { if (locks_ != nullptr) { @@ -645,6 +651,11 @@ void SingleMutableCsr::resize(vid_t vnum) { } } +template +size_t SingleMutableCsr::capacity() const { + return nbr_list_.size(); +} + template void SingleMutableCsr::close() { nbr_list_.reset(); diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index be1b41b11..3daa1b4fe 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -371,13 +371,16 @@ void batch_add_unbundled_edges_impl( const std::vector& src_lid_list, const std::vector& dst_lid_list, TypedCsrBase* out_csr, TypedCsrBase* in_csr, Table* table_, - std::atomic& table_idx_, const std::vector& prop_types, + std::atomic& table_idx_, std::atomic& capacity_, + const std::vector& prop_types, const std::vector>& data_batches, const std::vector& valid_flags) { size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(out_csr, in_csr, src_lid_list, dst_lid_list, offset); - table_->resize(table_idx_.load()); + size_t cur_idx = table_idx_.load(); + table_->resize(cur_idx); + capacity_.store(cur_idx); std::vector> expected_types; for (auto pt : prop_types) { expected_types.emplace_back(PropertyTypeToArrowType(pt)); @@ -454,6 +457,7 @@ EdgeTable::EdgeTable(EdgeTable&& edge_table) in_csr_ = std::move(edge_table.in_csr_); table_ = std::move(edge_table.table_); table_idx_ = edge_table.table_idx_.load(); + capacity_ = edge_table.capacity_.load(); } void EdgeTable::Swap(EdgeTable& edge_table) { @@ -469,6 +473,9 @@ void EdgeTable::Swap(EdgeTable& edge_table) { auto t_idx = table_idx_.load(); table_idx_.store(edge_table.table_idx_.load()); edge_table.table_idx_.store(t_idx); + auto cap = capacity_.load(); + capacity_.store(edge_table.capacity_.load()); + edge_table.capacity_.store(cap); } void EdgeTable::SetEdgeSchema(std::shared_ptr meta) { @@ -493,8 +500,9 @@ void EdgeTable::Open(const std::string& work_dir) { assert(table_->col_num() > 0); size_t property_capacity = table_->get_column_by_id(0)->size(); table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, + static_cast(4096))); + table_->resize(capacity_.load()); } } @@ -522,8 +530,9 @@ void EdgeTable::OpenInMemory(const std::string& work_dir, size_t src_v_cap, assert(table_->col_num() > 0); size_t property_capacity = table_->get_column_by_id(0)->size(); table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, + static_cast(4096))); + table_->resize(capacity_.load()); } } @@ -551,8 +560,9 @@ void EdgeTable::OpenWithHugepages(const std::string& work_dir, size_t src_v_cap, assert(table_->col_num() > 0); size_t property_capacity = table_->get_column_by_id(0)->size(); table_idx_.store(property_capacity); - table_->resize(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); + capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, + static_cast(4096))); + table_->resize(capacity_.load()); } } @@ -674,6 +684,16 @@ void EdgeTable::Resize(vid_t src_vertex_num, vid_t dst_vertex_num) { in_csr_->resize(dst_vertex_num); } +void EdgeTable::EnsureCapacity(size_t capacity) { + if (!meta_->is_bundled()) { + if (capacity <= capacity_.load()) { + return; + } + table_->resize(capacity); + capacity_.store(capacity); + } +} + size_t EdgeTable::EdgeNum() const { if (out_csr_) { return out_csr_->edge_num(); @@ -833,9 +853,9 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, auto oe_csr = dynamic_cast*>(out_csr_.get()); auto ie_csr = dynamic_cast*>(in_csr_.get()); assert(oe_csr != nullptr && ie_csr != nullptr); - batch_add_unbundled_edges_impl(src_lid, dst_lid, oe_csr, ie_csr, - table_.get(), table_idx_, meta_->properties, - data_batches, valid_flags); + batch_add_unbundled_edges_impl( + src_lid, dst_lid, oe_csr, ie_csr, table_.get(), table_idx_, capacity_, + meta_->properties, data_batches, valid_flags); } } @@ -864,7 +884,12 @@ void EdgeTable::BatchAddEdges( size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(oe_csr, ie_csr, src_lid_list, dst_lid_list, offset); - table_->resize(offset + src_lid_list.size()); + while (capacity_.load() <= table_idx_.load()) { + size_t cur_idx = table_idx_.load(); + capacity_.store( + std::max(cur_idx + (cur_idx + 4) / 5, static_cast(4096))); + } + table_->resize(capacity_.load()); for (size_t i = 0; i < edge_data_list.size(); ++i) { table_->insert(offset + i, edge_data_list[i], true); } diff --git a/src/storages/graph/graph_interface.cc b/src/storages/graph/graph_interface.cc index 1999a4795..8b9ba2ae6 100644 --- a/src/storages/graph/graph_interface.cc +++ b/src/storages/graph/graph_interface.cc @@ -35,12 +35,14 @@ void StorageAPUpdateInterface::UpdateEdgeProperty( bool StorageAPUpdateInterface::AddVertex(label_t label, const Property& id, const std::vector& props, vid_t& vid) { - const auto& table = graph_.get_vertex_table(label); - if (table.LidNum() >= table.Capacity()) { - graph_.Reserve(label, table.Capacity() * 2); + auto status = graph_.EnsureCapacity(label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space for vertex of label " + << graph_.schema().get_vertex_label_name(label) << ": " + << status.ToString(); + return false; } - auto status = - graph_.AddVertex(label, id, props, vid, neug::timestamp_t(0), true); + status = graph_.AddVertex(label, id, props, vid, neug::timestamp_t(0)); if (!status.ok()) { LOG(ERROR) << "AddVertex failed: " << status.ToString(); } @@ -50,6 +52,11 @@ bool StorageAPUpdateInterface::AddVertex(label_t label, const Property& id, bool StorageAPUpdateInterface::AddEdge( label_t src_label, vid_t src, label_t dst_label, vid_t dst, label_t edge_label, const std::vector& properties) { + auto status = graph_.EnsureCapacity(src_label, dst_label, edge_label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space for edge of: " << status.ToString(); + return false; + } graph_.AddEdge(src_label, src, dst_label, dst, edge_label, properties, neug::timestamp_t(0), alloc_, true); return true; diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index a7ee6639e..ebc043814 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -30,6 +30,7 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" #include "neug/utils/file_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/indexers.h" #include "neug/utils/property/types.h" #include "neug/utils/yaml_utils.h" @@ -98,10 +99,52 @@ Status PropertyGraph::Reserve(label_t v_label, vid_t vertex_reserve_size) { edge_tables_.at(index).Resize(vertex_tables_[v_label].Capacity(), vertex_tables_[dst_label].Capacity()); } - index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (v_label != dst_label) { + index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (edge_tables_.count(index) > 0) { + edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), + vertex_tables_[v_label].Capacity()); + } + } + } + } + return neug::Status::OK(); + } else { + return Status(StatusCode::ERR_INVALID_ARGUMENT, + "Vertex label does not exist."); + } +} + +Status PropertyGraph::EnsureCapacity(label_t v_label, size_t capacity) { + if (schema_.vertex_label_valid(v_label)) { + auto old_cap = vertex_tables_[v_label].Capacity(); + if (capacity == 0) { + auto old_size = vertex_tables_[v_label].Size(); + if (old_size >= old_cap) { + capacity = neug::calculate_new_capacity(old_cap, true); + } + } + if (capacity <= old_cap) { + return neug::Status::OK(); + } + auto v_new_cap = vertex_tables_[v_label].EnsureCapacity(capacity); + for (label_t dst_label = 0; dst_label < vertex_label_total_count_; + ++dst_label) { + if (!schema_.vertex_label_valid(dst_label)) { + continue; + } + for (label_t e_label = 0; e_label < edge_label_total_count_; ++e_label) { + size_t index = schema_.generate_edge_label(v_label, dst_label, e_label); if (edge_tables_.count(index) > 0) { - edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), - vertex_tables_[v_label].Capacity()); + edge_tables_.at(index).Resize(v_new_cap, + vertex_tables_[dst_label].Capacity()); + } + if (v_label != dst_label) { + index = schema_.generate_edge_label(dst_label, v_label, e_label); + if (edge_tables_.count(index) > 0) { + edge_tables_.at(index).Resize(vertex_tables_[dst_label].Capacity(), + v_new_cap); + } } } } @@ -112,6 +155,33 @@ Status PropertyGraph::Reserve(label_t v_label, vid_t vertex_reserve_size) { } } +Status PropertyGraph::EnsureCapacity(label_t src_label, label_t dst_label, + label_t edge_label, size_t capacity) { + if (!schema_.exist(src_label, dst_label, edge_label)) { + return Status(StatusCode::ERR_INVALID_ARGUMENT, + "Edge label does not exist for the given source and " + "destination vertex labels."); + } + size_t index = schema_.generate_edge_label(src_label, dst_label, edge_label); + if (edge_tables_.count(index) == 0) { + return Status( + StatusCode::ERR_INVALID_ARGUMENT, + "Edge table for the given edge label triplet does not exist."); + } + size_t old_cap = edge_tables_.at(index).Capacity(); + if (capacity == 0) { + size_t old_size = edge_tables_.at(index).Size(); + if (old_size >= old_cap) { + capacity = neug::calculate_new_capacity(old_cap, false); + } + } + if (capacity <= old_cap) { + return neug::Status::OK(); + } + edge_tables_.at(index).EnsureCapacity(capacity); + return neug::Status::OK(); +} + Status PropertyGraph::BatchAddVertices( label_t v_label, std::shared_ptr supplier) { assert(v_label < vertex_tables_.size()); diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 6fd367619..93987916c 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -194,6 +194,14 @@ void VertexTable::Reserve(size_t cap) { } } +size_t VertexTable::EnsureCapacity(size_t capacity) { + if (capacity <= indexer_.capacity()) { + return indexer_.capacity(); + } + Reserve(capacity); + return indexer_.capacity(); +} + void VertexTable::BatchDeleteVertices(const std::vector& vids) { indexer_.ensure_writable(work_dir_); size_t delete_cnt = 0; diff --git a/src/transaction/update_transaction.cc b/src/transaction/update_transaction.cc index 8848865c7..6e0408c43 100644 --- a/src/transaction/update_transaction.cc +++ b/src/transaction/update_transaction.cc @@ -680,14 +680,16 @@ bool UpdateTransaction::AddVertex(label_t label, const Property& oid, } } - InsertVertexRedo::Serialize(arc_, label, oid, props); - op_num_ += 1; - auto status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); + auto status = graph_.EnsureCapacity(label); if (!status.ok()) { - // The most possible reason is that the space is not enough. - graph_.Reserve(label, std::max((vid_t) 1024, graph_.LidNum(label) * 2)); - status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); + LOG(ERROR) << "Failed to ensure space for vertex of label " + << graph_.schema().get_vertex_label_name(label) << ": " + << status.ToString(); + return false; } + InsertVertexRedo::Serialize(arc_, label, oid, props); + op_num_ += 1; + status = graph_.AddVertex(label, oid, props, vid, timestamp_, true); if (!status.ok()) { LOG(ERROR) << "Failed to add vertex of label " << graph_.schema().get_vertex_label_name(label) << ": " @@ -737,6 +739,12 @@ bool UpdateTransaction::AddEdge(label_t src_label, vid_t src_lid, dst_label, GetVertexId(dst_label, dst_lid), edge_label, properties); op_num_ += 1; + auto status = graph_.EnsureCapacity(src_label, dst_label, edge_label); + if (!status.ok()) { + LOG(ERROR) << "Failed to ensure space before insert edge: " + << status.ToString(); + return false; + } auto oe_offset = graph_.AddEdge(src_label, src_lid, dst_label, dst_lid, edge_label, properties, timestamp_, alloc_, true); diff --git a/src/utils/property/column.cc b/src/utils/property/column.cc index 47cc1513d..01b3401c9 100644 --- a/src/utils/property/column.cc +++ b/src/utils/property/column.cc @@ -76,7 +76,7 @@ class TypedEmptyColumn : public ColumnBase { void set_any(size_t index, const Property& value, bool insert_safe) override { } - T get_view(size_t index) const { T{}; } + T get_view(size_t index) const { return T{}; } Property get_prop(size_t index) const override { return Property(); } diff --git a/tools/python_bind/tests/test_db_query.py b/tools/python_bind/tests/test_db_query.py index d0979f229..2c3244846 100644 --- a/tools/python_bind/tests/test_db_query.py +++ b/tools/python_bind/tests/test_db_query.py @@ -2556,3 +2556,24 @@ def test_insert_many_vertices(): assert records == [[10000]], f"Expected value [[10000]], got {records}" conn.close() db.close() + + +def test_insert_many_edges(): + db_dir = "/tmp/test_insert_many_edges" + shutil.rmtree(db_dir, ignore_errors=True) + db = Database(db_path=db_dir, mode="w") + conn = db.connect() + conn.execute("CREATE NODE TABLE Person(id INT64, PRIMARY KEY(id));") + conn.execute("CREATE REL TABLE Knows(FROM Person TO Person);") + for i in range(100): + conn.execute(f"CREATE (p: Person {{id: {i}}});") + for i in range(100): + for j in range(i + 1, 100): + conn.execute( + f"MATCH (p1: Person {{id: {i}}}), (p2: Person {{id: {j}}}) CREATE (p1)-[:Knows]->(p2);" + ) + res = conn.execute("MATCH ()-[e: Knows]->() RETURN count(e);") + records = list(res) + assert records == [[4950]], f"Expected value [[4950]], got {records}" + conn.close() + db.close() From 4b84450b781102b5420d4d819959f1b100c75a1c Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Tue, 10 Mar 2026 15:42:54 +0800 Subject: [PATCH 02/19] minor --- include/neug/utils/growth.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index beb0f4c1d..5903a90f3 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -29,7 +29,9 @@ inline size_t calculate_new_capacity(size_t current_capacity, } else { // For edge tables, we grow linearly: new capacity = current capacity + // (current capacity + 4) / 5. - return current_capacity + (current_capacity + 4) / 5; + return current_capacity == 0 + ? 4096 + : current_capacity + (current_capacity + 4) / 5; } } } // namespace neug From bc112845336223c5db31326dfb4c20664891567b Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Tue, 10 Mar 2026 15:50:57 +0800 Subject: [PATCH 03/19] Update include/neug/utils/growth.h Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- include/neug/utils/growth.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index 5903a90f3..a1c721c65 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -25,7 +25,10 @@ inline size_t calculate_new_capacity(size_t current_capacity, if (is_vertex_table) { // For vertex tables, we grow exponentially: double the current capacity, // with a 4K floor. - return current_capacity == 0 ? 4096 : current_capacity * 2; + return current_capacity == 0 ? 4096 + : (current_capacity <= std::numeric_limits::max() / 2 + ? current_capacity * 2 + : std::numeric_limits::max()); } else { // For edge tables, we grow linearly: new capacity = current capacity + // (current capacity + 4) / 5. From 3a96008f15ec38c02f4f3ff2faa1a1fa07ba4660 Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Tue, 10 Mar 2026 15:57:29 +0800 Subject: [PATCH 04/19] fix format --- include/neug/utils/growth.h | 9 +++++---- tests/unittest/test_connection.cc | 5 ++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index a1c721c65..aeb14c978 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -25,10 +25,11 @@ inline size_t calculate_new_capacity(size_t current_capacity, if (is_vertex_table) { // For vertex tables, we grow exponentially: double the current capacity, // with a 4K floor. - return current_capacity == 0 ? 4096 - : (current_capacity <= std::numeric_limits::max() / 2 - ? current_capacity * 2 - : std::numeric_limits::max()); + return current_capacity == 0 + ? 4096 + : (current_capacity <= std::numeric_limits::max() / 2 + ? current_capacity * 2 + : std::numeric_limits::max()); } else { // For edge tables, we grow linearly: new capacity = current capacity + // (current capacity + 4) / 5. diff --git a/tests/unittest/test_connection.cc b/tests/unittest/test_connection.cc index 706f4f785..026d8835a 100644 --- a/tests/unittest/test_connection.cc +++ b/tests/unittest/test_connection.cc @@ -132,9 +132,8 @@ TEST_F(ConnectionTest, TestReadWriteConnection) { auto conn1 = db.Connect(); EXPECT_NE(conn1, nullptr); - EXPECT_THROW( - { auto conn2 = db.Connect(); }, - neug::exception::TxStateConflictException); + EXPECT_THROW({ auto conn2 = db.Connect(); }, + neug::exception::TxStateConflictException); } TEST_F(ConnectionTest, TestReadOnlyConnections) { From 0bf60263daf069bbbdb02f44afc8d3cc6b51fd2d Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Wed, 11 Mar 2026 11:25:39 +0800 Subject: [PATCH 05/19] reserve and dump --- CMakeLists.txt | 6 ++ include/neug/storages/file_names.h | 10 ++ include/neug/storages/graph/property_graph.h | 8 +- include/neug/storages/graph/vertex_table.h | 9 +- include/neug/utils/file_utils.h | 12 +++ include/neug/utils/growth.h | 17 +-- include/neug/utils/id_indexer.h | 9 -- src/storages/csr/mutable_csr.cc | 54 +--------- src/storages/graph/edge_table.cc | 102 ++++++++++++------ src/storages/graph/property_graph.cc | 104 ++++++++++--------- src/storages/graph/vertex_table.cc | 26 +++-- src/utils/file_utils.cc | 67 ++++++++++++ tools/python_bind/tests/test_db_query.py | 10 ++ 13 files changed, 271 insertions(+), 163 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index af90ce55c..19e347033 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,6 +118,12 @@ if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) "Debug" "Release" "MinSizeRel" "RelWithDebInfo") endif () +#if BUILD_TYPE is DEBUG, add compile option DEBUG +if (CMAKE_BUILD_TYPE STREQUAL "Debug") + add_definitions(-DDEBUG) +endif() +add_definitions(-DDEBUG) + add_compile_definitions(NEUG_VERSION="${NEUG_VERSION}") # reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath diff --git a/include/neug/storages/file_names.h b/include/neug/storages/file_names.h index 6cfe66137..d7048866f 100644 --- a/include/neug/storages/file_names.h +++ b/include/neug/storages/file_names.h @@ -246,4 +246,14 @@ inline std::string wal_ingest_allocator_prefix(const std::string& work_dir, std::to_string(thread_id) + "_"; } +inline std::string statistics_file_prefix(const std::string& v_label) { + return "statistics_" + v_label; +} + +inline std::string statistics_file_prefix(const std::string& src_label, + const std::string& dst_label, + const std::string& edge_label) { + return "statistics_" + src_label + "_" + edge_label + "_" + dst_label; +} + } // namespace neug diff --git a/include/neug/storages/graph/property_graph.h b/include/neug/storages/graph/property_graph.h index 33bbc5ee5..88c09e942 100644 --- a/include/neug/storages/graph/property_graph.h +++ b/include/neug/storages/graph/property_graph.h @@ -137,7 +137,13 @@ class PropertyGraph { void Compact(bool compact_csr, float reserve_ratio, timestamp_t ts); - void Dump(bool reopen = true); + /** + * @brief Dump the current graph state to persistent storage. + * @param reopen If true, reopens the graph after dumping (default: true) + * @param reserve_space If true, reserves space for all vertex and edge + * tables. + */ + void Dump(bool reopen = true, bool ensure_capacity = true); /** * @brief Dump schema information to a file. diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index cf4520a71..479b6981f 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -111,8 +111,7 @@ class VertexTable { std::swap(work_dir_, other.work_dir_); } - void Open(const std::string& work_dir, int memory_level, - bool build_empty_graph = false); + void Open(const std::string& work_dir, int memory_level); void Dump(const std::string& target_dir); @@ -293,11 +292,7 @@ class VertexTable { auto ind = std::get<2>(vertex_schema_->primary_keys[0]); auto pk_array = columns[ind]; columns.erase(columns.begin() + ind); - auto cur_size = indexer_.capacity(); - while (cur_size < indexer_.size() + pk_array->length()) { - cur_size = std::max(16, 2 * static_cast(cur_size)); - } - Reserve(cur_size); + EnsureCapacity(indexer_.size() + pk_array->length()); auto vids = insert_primary_keys(pk_array); diff --git a/include/neug/utils/file_utils.h b/include/neug/utils/file_utils.h index 2815a4e75..c12ad7796 100644 --- a/include/neug/utils/file_utils.h +++ b/include/neug/utils/file_utils.h @@ -38,4 +38,16 @@ void copy_directory(const std::string& src, const std::string& dst, void remove_directory(const std::string& dir_path); +void write_file(const std::string& filename, const void* buffer, size_t size, + size_t num); + +void read_file(const std::string& filename, void* buffer, size_t size, + size_t num); + +void write_statistic_file(const std::string& file_path, size_t capacity, + size_t size); + +void read_statistic_file(const std::string& file_path, size_t& capacity, + size_t& size); + } // namespace neug diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index aeb14c978..69be2f53f 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -22,20 +22,21 @@ namespace neug { inline size_t calculate_new_capacity(size_t current_capacity, bool is_vertex_table) { + if (current_capacity < 4096) { + return 4096; // Start with a reasonable default capacity. + } + static constexpr size_t MAX_CAPACITY = std::numeric_limits::max(); if (is_vertex_table) { // For vertex tables, we grow exponentially: double the current capacity, // with a 4K floor. - return current_capacity == 0 - ? 4096 - : (current_capacity <= std::numeric_limits::max() / 2 - ? current_capacity * 2 - : std::numeric_limits::max()); + return current_capacity <= MAX_CAPACITY / 2 ? current_capacity * 2 + : MAX_CAPACITY; } else { // For edge tables, we grow linearly: new capacity = current capacity + // (current capacity + 4) / 5. - return current_capacity == 0 - ? 4096 - : current_capacity + (current_capacity + 4) / 5; + return current_capacity <= MAX_CAPACITY - (current_capacity + 4) / 5 + ? current_capacity + (current_capacity + 4) / 5 + : MAX_CAPACITY; } } } // namespace neug diff --git a/include/neug/utils/id_indexer.h b/include/neug/utils/id_indexer.h index 8f9b5a5fd..028f0cbe1 100644 --- a/include/neug/utils/id_indexer.h +++ b/include/neug/utils/id_indexer.h @@ -443,7 +443,6 @@ class LFIndexer { keys_->open(name + ".keys", "", data_dir); indices_.open(data_dir + "/" + name + ".indices", false); size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); indices_size_ = indices_.size(); } @@ -462,9 +461,6 @@ class LFIndexer { LOG(INFO) << "Open indices file in " << tmp_dir(work_dir) + "/" + name + ".indices"; indices_.open(tmp_dir(work_dir) + "/" + name + ".indices", true); - size_t num_elements = num_elements_.load(); - - keys_->resize(num_elements + (num_elements >> 2)); indices_size_ = indices_.size(); } @@ -478,8 +474,6 @@ class LFIndexer { keys_->open_in_memory(name + ".keys"); indices_.open(name + ".indices", false); indices_size_ = indices_.size(); - size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); } void open_with_hugepages(const std::string& name, bool hugepage_table) { @@ -495,12 +489,9 @@ class LFIndexer { indices_.open(name + ".indices", false); } indices_size_ = indices_.size(); - size_t num_elements = num_elements_.load(); - keys_->resize(num_elements + (num_elements >> 2)); } void dump(const std::string& name, const std::string& snapshot_dir) { - keys_->resize(num_elements_.load()); keys_->dump(snapshot_dir + "/" + name + ".keys"); indices_.dump(snapshot_dir + "/" + name + ".indices"); dump_meta(snapshot_dir + "/" + name + ".meta"); diff --git a/src/storages/csr/mutable_csr.cc b/src/storages/csr/mutable_csr.cc index 569312a6c..366cb49e1 100644 --- a/src/storages/csr/mutable_csr.cc +++ b/src/storages/csr/mutable_csr.cc @@ -28,64 +28,12 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" +#include "neug/utils/file_utils.h" #include "neug/utils/property/types.h" #include "neug/utils/spinlock.h" namespace neug { -void read_file(const std::string& filename, void* buffer, size_t size, - size_t num) { - FILE* fin = fopen(filename.c_str(), "r"); - if (fin == nullptr) { - std::stringstream ss; - ss << "Failed to open file " << filename << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - size_t ret_len = 0; - if ((ret_len = fread(buffer, size, num, fin)) != num) { - std::stringstream ss; - ss << "Failed to read file " << filename << ", expected " << num << ", got " - << ret_len << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - int ret = 0; - if ((ret = fclose(fin)) != 0) { - std::stringstream ss; - ss << "Failed to close file " << filename << ", error code: " << ret << " " - << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } -} - -void write_file(const std::string& filename, const void* buffer, size_t size, - size_t num) { - FILE* fout = fopen(filename.c_str(), "wb"); - if (fout == nullptr) { - std::stringstream ss; - ss << "Failed to open file " << filename << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } - size_t ret_len = 0; - if ((ret_len = fwrite(buffer, size, num, fout)) != num) { - std::stringstream ss; - ss << "Failed to write file " << filename << ", expected " << num - << ", got " << ret_len << ", " << strerror(errno); - LOG(ERROR) << ss.str(); - } - int ret = 0; - if ((ret = fclose(fout)) != 0) { - std::stringstream ss; - ss << "Failed to close file " << filename << ", error code: " << ret << " " - << strerror(errno); - LOG(ERROR) << ss.str(); - THROW_RUNTIME_ERROR(ss.str()); - } -} - template void MutableCsr::open(const std::string& name, const std::string& snapshot_dir, diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index 3daa1b4fe..539052082 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -33,6 +33,8 @@ #include "neug/storages/file_names.h" #include "neug/storages/loader/loader_utils.h" #include "neug/utils/arrow_utils.h" +#include "neug/utils/file_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/property/types.h" namespace neug { @@ -378,9 +380,6 @@ void batch_add_unbundled_edges_impl( size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(out_csr, in_csr, src_lid_list, dst_lid_list, offset); - size_t cur_idx = table_idx_.load(); - table_->resize(cur_idx); - capacity_.store(cur_idx); std::vector> expected_types; for (auto pt : prop_types) { expected_types.emplace_back(PropertyTypeToArrowType(pt)); @@ -482,6 +481,26 @@ void EdgeTable::SetEdgeSchema(std::shared_ptr meta) { meta_ = meta; } +void load_statistic_file(const std::string& work_dir, + const std::string& src_label_name, + const std::string& dst_label_name, + const std::string& edge_label_name, + std::atomic& cap_atomic, + std::atomic& table_idx_atomic) { + size_t cap = 0, size = 0; + auto statistic_file_path = + checkpoint_dir(work_dir) + "/" + + statistics_file_prefix(src_label_name, dst_label_name, edge_label_name); + if (!std::filesystem::exists(statistic_file_path)) { + cap_atomic.store(0); + table_idx_atomic.store(0); + return; + } + read_statistic_file(statistic_file_path, cap, size); + cap_atomic.store(cap); + table_idx_atomic.store(size); +} + void EdgeTable::Open(const std::string& work_dir) { work_dir_ = work_dir; memory_level_ = 0; @@ -498,11 +517,14 @@ void EdgeTable::Open(const std::string& work_dir) { work_dir, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); - table_->resize(capacity_.load()); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } @@ -528,11 +550,14 @@ void EdgeTable::OpenInMemory(const std::string& work_dir, size_t src_v_cap, work_dir_, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); - table_->resize(capacity_.load()); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } @@ -558,15 +583,22 @@ void EdgeTable::OpenWithHugepages(const std::string& work_dir, size_t src_v_cap, checkpoint_dir_path, meta_->property_names, meta_->properties, meta_->default_property_values, meta_->strategies, (memory_level_ > 2)); assert(table_->col_num() > 0); - size_t property_capacity = table_->get_column_by_id(0)->size(); - table_idx_.store(property_capacity); - capacity_.store(std::max(property_capacity + (property_capacity + 4) / 5, - static_cast(4096))); - table_->resize(capacity_.load()); + size_t table_cap = table_->get_column_by_id(0)->size(); + load_statistic_file(work_dir, meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name, capacity_, table_idx_); + if (table_cap != capacity_.load()) { + THROW_INVALID_ARGUMENT_EXCEPTION( + "capacity in statistic file not match actual table capacity, maybe " + "the graph is not dumped properly"); + } } } void EdgeTable::Dump(const std::string& checkpoint_dir_path) { + LOG(INFO) << "dump edge table " << meta_->src_label_name << "-" + << meta_->edge_label_name << "-" << meta_->dst_label_name << " to " + << checkpoint_dir_path << ", capacity: " << Capacity() + << ", size: " << Size(); in_csr_->dump(ie_prefix(meta_->src_label_name, meta_->dst_label_name, meta_->edge_label_name), checkpoint_dir_path); @@ -577,6 +609,11 @@ void EdgeTable::Dump(const std::string& checkpoint_dir_path) { table_->dump(edata_prefix(meta_->src_label_name, meta_->dst_label_name, meta_->edge_label_name), checkpoint_dir_path); + auto statistc_file_path = + checkpoint_dir_path + "/" + + statistics_file_prefix(meta_->src_label_name, meta_->dst_label_name, + meta_->edge_label_name); + write_statistic_file(statistc_file_path, Capacity(), Size()); } } @@ -686,6 +723,8 @@ void EdgeTable::Resize(vid_t src_vertex_num, vid_t dst_vertex_num) { void EdgeTable::EnsureCapacity(size_t capacity) { if (!meta_->is_bundled()) { + LOG(INFO) << "ensure edge table capacity " << capacity + << ", current capacity " << capacity_.load(); if (capacity <= capacity_.load()) { return; } @@ -844,6 +883,7 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, std::vector valid_flags; // true for valid edges std::tie(src_lid, dst_lid, valid_flags) = filterInvalidEdges(src_lid, dst_lid); + EnsureCapacity(table_idx_.load() + src_lid.size()); if (meta_->is_bundled()) { auto edges = extract_bundled_edge_data_from_batches(meta_, data_batches, valid_flags); @@ -863,6 +903,7 @@ void EdgeTable::BatchAddEdges( const std::vector& src_lid_list, const std::vector& dst_lid_list, const std::vector>& edge_data_list) { + EnsureCapacity(table_idx_.load() + src_lid_list.size()); if (meta_->is_bundled()) { std::vector flat_edge_data; assert(meta_->properties.size() == 1); @@ -884,12 +925,6 @@ void EdgeTable::BatchAddEdges( size_t offset = table_idx_.fetch_add(src_lid_list.size()); insert_edges_separated_impl(oe_csr, ie_csr, src_lid_list, dst_lid_list, offset); - while (capacity_.load() <= table_idx_.load()) { - size_t cur_idx = table_idx_.load(); - capacity_.store( - std::max(cur_idx + (cur_idx + 4) / 5, static_cast(4096))); - } - table_->resize(capacity_.load()); for (size_t i = 0; i < edge_data_list.size(); ++i) { table_->insert(offset + i, edge_data_list[i], true); } @@ -949,6 +984,7 @@ void EdgeTable::dropAndCreateNewBundledCSR() { in_csr_ = std::move(new_in_csr); } +// TODO(zhanglei): Keep table_idx_ and capacity_ right. void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { auto suffix = get_next_csr_path_suffix(); std::string next_oe_csr_path = @@ -980,6 +1016,16 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (table_->col_num() >= 1) { prev_data_col = table_->get_column_by_id(0); } + if (prev_data_col && prev_data_col->size() > 0) { + table_->resize(prev_data_col->size()); + table_idx_.store(prev_data_col->size()); + EnsureCapacity(calculate_new_capacity(prev_data_col->size(), false)); + } + } else { + // delete_property == true, which means the EdgeTable will become use csr of + // empty type. we need to reset capacity and table_idx to 0 + table_idx_.store(0); + capacity_.store(0); } auto edges = out_csr_->batch_export(prev_data_col); @@ -995,7 +1041,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { VLOG(10) << "Set default value for column " << col_id << ": " << default_value.to_string() << ", type: " << std::to_string(default_value.type()); - for (size_t row = 0; row < col->size(); ++row) { + for (size_t row = 0; row < Size(); ++row) { col->set_any(row, default_value); } } @@ -1003,11 +1049,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { for (size_t i = 0; i < std::get<0>(edges).size(); ++i) { row_ids.push_back(i); } - if (!delete_property) { - if (prev_data_col->size() > 0) { - table_->resize(prev_data_col->size()); - } - } + std::unique_ptr new_out_csr, new_in_csr; if (delete_property) { new_out_csr = diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index ebc043814..418b8d221 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -121,7 +121,7 @@ Status PropertyGraph::EnsureCapacity(label_t v_label, size_t capacity) { if (capacity == 0) { auto old_size = vertex_tables_[v_label].Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_cap, true); + capacity = neug::calculate_new_capacity(old_size, true); } } if (capacity <= old_cap) { @@ -172,7 +172,7 @@ Status PropertyGraph::EnsureCapacity(label_t src_label, label_t dst_label, if (capacity == 0) { size_t old_size = edge_tables_.at(index).Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_cap, false); + capacity = neug::calculate_new_capacity(old_size, false); } } if (capacity <= old_cap) { @@ -292,7 +292,7 @@ Status PropertyGraph::CreateVertexType( } auto& vtable = vertex_tables_.back(); - vtable.Open(work_dir_, memory_level_, true); + vtable.Open(work_dir_, memory_level_); vtable.Reserve(4096); vertex_label_total_count_ = schema_.vertex_label_frontier(); assert(vertex_tables_.size() == vertex_label_total_count_); @@ -859,46 +859,26 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { memory_level_ = memory_level; work_dir_.assign(work_dir); std::string schema_file = schema_path(work_dir_); - std::string checkpoint_dir_path{}; - bool build_empty_graph = false; + std::string checkpoint_dir_path = checkpoint_dir(work_dir_); if (std::filesystem::exists(schema_file)) { loadSchema(schema_file); - vertex_label_total_count_ = schema_.vertex_label_frontier(); - edge_label_total_count_ = schema_.edge_label_frontier(); - for (size_t i = 0; i < vertex_label_total_count_; i++) { - if (!schema_.vertex_label_valid(i)) { - THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + - std::to_string(i)); - } - std::string v_label_name = schema_.get_vertex_label_name(i); - auto properties = schema_.get_vertex_properties(i); - auto property_names = schema_.get_vertex_property_names(i); - auto property_strategies = - schema_.get_vertex_storage_strategies(v_label_name); - vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); - } - checkpoint_dir_path = checkpoint_dir(work_dir_); } else { - vertex_label_total_count_ = schema_.vertex_label_frontier(); - edge_label_total_count_ = schema_.edge_label_frontier(); - for (size_t i = 0; i < vertex_label_total_count_; i++) { - if (!schema_.vertex_label_valid(i)) { - THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + - std::to_string(i)); - } - std::string v_label_name = schema_.get_vertex_label_name(i); - auto properties = schema_.get_vertex_properties(i); - auto property_names = schema_.get_vertex_property_names(i); - auto property_strategies = - schema_.get_vertex_storage_strategies(v_label_name); - vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); - } - build_empty_graph = true; LOG(INFO) << "Schema file not found, build empty graph"; - - checkpoint_dir_path = checkpoint_dir(work_dir_); std::filesystem::create_directories(checkpoint_dir_path); } + vertex_label_total_count_ = schema_.vertex_label_frontier(); + edge_label_total_count_ = schema_.edge_label_frontier(); + for (size_t i = 0; i < vertex_label_total_count_; i++) { + if (!schema_.vertex_label_valid(i)) { + THROW_INTERNAL_EXCEPTION("Invalid vertex label id: " + std::to_string(i)); + } + std::string v_label_name = schema_.get_vertex_label_name(i); + auto properties = schema_.get_vertex_properties(i); + auto property_names = schema_.get_vertex_property_names(i); + auto property_strategies = + schema_.get_vertex_storage_strategies(v_label_name); + vertex_tables_.emplace_back(schema_.get_vertex_schema(i)); + } std::string tmp_dir_path = tmp_dir(work_dir_); @@ -915,14 +895,25 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { } std::string v_label_name = schema_.get_vertex_label_name(i); - vertex_tables_[i].Open(work_dir_, memory_level, build_empty_graph); - - // We will reserve the at least 4096 slots for each vertex label - size_t vertex_capacity = - std::max(vertex_tables_[i].get_indexer().capacity(), (size_t) 4096); - vertex_tables_[i].Reserve(vertex_capacity); - - vertex_capacities[i] = vertex_capacity; + vertex_tables_[i].Open(work_dir_, memory_level); +#ifdef DEBUG + size_t old_cap = vertex_tables_[i].Capacity(); + LOG(INFO) << "Open vertex table for label [" << v_label_name + << "], capacity: " << vertex_tables_[i].Capacity() + << ", size: " << vertex_tables_[i].Size() << ", new capacity: " + << calculate_new_capacity(vertex_tables_[i].Size(), true); +#endif + // Case 1: Open from checkpoint, the capacity should be already reserved and + // satisfied Case 2: Open from empty, Capacity should be the default minimum + // capacity(4096) + vertex_tables_[i].EnsureCapacity( + calculate_new_capacity(vertex_tables_[i].Size(), true)); + vertex_capacities[i] = vertex_tables_[i].Capacity(); +#ifdef DEBUG + if (vertex_tables_[i].Size() > 0) { + assert(old_cap == vertex_tables_[i].Capacity()); + } +#endif } for (size_t src_label_i = 0; src_label_i != vertex_label_total_count_; @@ -964,9 +955,21 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { edge_table.OpenInMemory(work_dir_, vertex_capacities[src_label_i], vertex_capacities[dst_label_i]); } - - edge_table.Resize(vertex_capacities[src_label_i], - vertex_capacities[dst_label_i]); +#ifdef DEBUG + size_t old_cap = edge_table.Capacity(); + LOG(INFO) << "Open edge table for edge label [" << edge_label + << "] from [" << src_label << "] to [" << dst_label + << "], capacity: " << edge_table.Capacity() + << ", size: " << edge_table.Size() << ", new capacity: " + << calculate_new_capacity(edge_table.Size(), false); +#endif + edge_table.EnsureCapacity( + calculate_new_capacity(edge_table.Size(), false)); +#ifdef DEBUG + if (edge_table.Size() > 0) { + assert(old_cap == edge_table.Capacity()); + } +#endif edge_tables_.emplace(index, std::move(edge_table)); } } @@ -1086,7 +1089,7 @@ void PropertyGraph::Compact(bool compact_csr, float reserve_ratio, } } -void PropertyGraph::Dump(bool reopen) { +void PropertyGraph::Dump(bool reopen, bool ensure_capacity) { // First dump to the temp dir, then move to the checkpoint dir std::string target_dir = temp_checkpoint_dir(work_dir_); if (std::filesystem::exists(target_dir)) { @@ -1108,6 +1111,7 @@ void PropertyGraph::Dump(bool reopen) { for (size_t i = 0; i < vertex_label_total_count_; ++i) { if (!vertex_tables_[i].is_dropped()) { vertex_num[i] = vertex_tables_[i].LidNum(); + EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true)); vertex_tables_[i].Dump(target_dir); } } @@ -1142,6 +1146,8 @@ void PropertyGraph::Dump(bool reopen) { if (edge_tables_.count(index) > 0) { auto& edge_table = edge_tables_.at(index); edge_table.Resize(vertex_num[src_label_i], vertex_num[dst_label_i]); + EnsureCapacity(src_label_i, dst_label_i, e_label_i, + calculate_new_capacity(edge_table.Capacity(), false)); edge_table.Dump(target_dir); } } diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 93987916c..6f1948d15 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -14,12 +14,12 @@ */ #include "neug/storages/graph/vertex_table.h" +#include "neug/utils/file_utils.h" #include "neug/utils/likely.h" namespace neug { -void VertexTable::Open(const std::string& work_dir, int memory_level, - bool build_empty_graph) { +void VertexTable::Open(const std::string& work_dir, int memory_level) { memory_level_ = memory_level; work_dir_ = work_dir; std::string tmp_dir_path = tmp_dir(work_dir_); @@ -57,6 +57,12 @@ void VertexTable::Open(const std::string& work_dir, int memory_level, THROW_INTERNAL_EXCEPTION("Invalid memory level: " + std::to_string(memory_level_)); } + LOG(INFO) << "Open vertex table for label [" << label_name + << "], capacity: " << indexer_.capacity() + << ", size: " << indexer_.size(); + if (table_ && table_->col_num() > 0) { + LOG(INFO) << ", table size: " << table_->get_column_by_id(0)->size(); + } v_ts_.Open(vertex_tracker_filename); } @@ -82,15 +88,15 @@ void VertexTable::insert_vertices( void VertexTable::Dump(const std::string& target_dir) { const auto& label_name = vertex_schema_->label_name; + VLOG(1) << "Dump vertex table " << label_name << " done, size " + << indexer_.size() << ", capacity " << indexer_.capacity(); indexer_.dump(IndexerType::prefix() + "_" + vertex_map_prefix(label_name), target_dir); - table_->resize(indexer_.size()); + // table_->resize(indexer_.size()); table_->dump(vertex_table_prefix(label_name), target_dir); // Shrink v_ts_ to fit the indexer size - v_ts_.Reserve(indexer_.size()); + // v_ts_.Reserve(indexer_.size()); v_ts_.Dump(target_dir + "/" + vertex_tracker_file(label_name)); - VLOG(1) << "Dump vertex table " << label_name << " done, size " - << indexer_.size(); } void VertexTable::Close() { @@ -185,6 +191,10 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { } void VertexTable::Reserve(size_t cap) { + LOG(INFO) << "Reserving capacity for vertex table " + << vertex_schema_->label_name + << ", current capacity: " << indexer_.capacity() + << ", requested capacity: " << cap; if (cap > indexer_.capacity()) { indexer_.reserve(cap); } @@ -195,6 +205,10 @@ void VertexTable::Reserve(size_t cap) { } size_t VertexTable::EnsureCapacity(size_t capacity) { + LOG(INFO) << "Ensuring capacity for vertex table " + << vertex_schema_->label_name + << ", current capacity: " << indexer_.capacity() + << ", requested capacity: " << capacity; if (capacity <= indexer_.capacity()) { return indexer_.capacity(); } diff --git a/src/utils/file_utils.cc b/src/utils/file_utils.cc index 3bfd6cecb..b063df501 100644 --- a/src/utils/file_utils.cc +++ b/src/utils/file_utils.cc @@ -112,4 +112,71 @@ void remove_directory(const std::string& dir_path) { } } +void read_file(const std::string& filename, void* buffer, size_t size, + size_t num) { + FILE* fin = fopen(filename.c_str(), "r"); + if (fin == nullptr) { + std::stringstream ss; + ss << "Failed to open file " << filename << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + size_t ret_len = 0; + if ((ret_len = fread(buffer, size, num, fin)) != num) { + std::stringstream ss; + ss << "Failed to read file " << filename << ", expected " << num << ", got " + << ret_len << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + int ret = 0; + if ((ret = fclose(fin)) != 0) { + std::stringstream ss; + ss << "Failed to close file " << filename << ", error code: " << ret << " " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } +} + +void write_file(const std::string& filename, const void* buffer, size_t size, + size_t num) { + FILE* fout = fopen(filename.c_str(), "wb"); + if (fout == nullptr) { + std::stringstream ss; + ss << "Failed to open file " << filename << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + size_t ret_len = 0; + if ((ret_len = fwrite(buffer, size, num, fout)) != num) { + std::stringstream ss; + ss << "Failed to write file " << filename << ", expected " << num + << ", got " << ret_len << ", " << strerror(errno); + LOG(ERROR) << ss.str(); + } + int ret = 0; + if ((ret = fclose(fout)) != 0) { + std::stringstream ss; + ss << "Failed to close file " << filename << ", error code: " << ret << " " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } +} + +void write_statistic_file(const std::string& filename, size_t capacity, + size_t size) { + size_t buffer[2] = {capacity, size}; + write_file(filename, buffer, sizeof(size_t), 2); +} + +void read_statistic_file(const std::string& filename, size_t& capacity, + size_t& size) { + size_t buffer[2]; + read_file(filename, buffer, sizeof(size_t), 2); + capacity = buffer[0]; + size = buffer[1]; +} + } // namespace neug diff --git a/tools/python_bind/tests/test_db_query.py b/tools/python_bind/tests/test_db_query.py index 2c3244846..f8bd20f15 100644 --- a/tools/python_bind/tests/test_db_query.py +++ b/tools/python_bind/tests/test_db_query.py @@ -2577,3 +2577,13 @@ def test_insert_many_edges(): assert records == [[4950]], f"Expected value [[4950]], got {records}" conn.close() db.close() + +def test_open_modern_graph(): + db_dir = "/tmp/tinysnb" + db = Database(db_path=db_dir, mode="r") + conn = db.connect() + res = conn.execute("MATCH (n) RETURN n LIMIT 1;") + records = list(res) + assert len(records) == 1 + conn.close() + db.close() \ No newline at end of file From f4e93f4af33ed9e5c6d97a1362a83e3fc66c8fc3 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 11 Mar 2026 13:49:24 +0800 Subject: [PATCH 06/19] remove logs --- CMakeLists.txt | 6 ------ include/neug/storages/graph/edge_table.h | 15 ++++++++++++- include/neug/storages/graph/vertex_table.h | 6 +++++- src/main/neug_db.cc | 1 + src/storages/graph/edge_table.cc | 16 +++++++------- src/storages/graph/property_graph.cc | 25 ---------------------- src/storages/graph/vertex_table.cc | 14 ------------ 7 files changed, 28 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 19e347033..af90ce55c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,12 +118,6 @@ if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) "Debug" "Release" "MinSizeRel" "RelWithDebInfo") endif () -#if BUILD_TYPE is DEBUG, add compile option DEBUG -if (CMAKE_BUILD_TYPE STREQUAL "Debug") - add_definitions(-DDEBUG) -endif() -add_definitions(-DDEBUG) - add_compile_definitions(NEUG_VERSION="${NEUG_VERSION}") # reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath diff --git a/include/neug/storages/graph/edge_table.h b/include/neug/storages/graph/edge_table.h index 083d26f31..6ab753c70 100644 --- a/include/neug/storages/graph/edge_table.h +++ b/include/neug/storages/graph/edge_table.h @@ -130,7 +130,20 @@ class EdgeTable { void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts); - inline size_t Size() const { return table_idx_.load(); } + inline size_t Size() const { + if (meta_->is_bundled()) { + if (out_csr_) { + return out_csr_->edge_num(); + } else if (in_csr_) { + return in_csr_->edge_num(); + } else { + THROW_RUNTIME_ERROR("both csr are null"); + } + } + // TODO(zhanglei): the size may be inaccurate if some edges are deleted but + // not compacted yet. + return table_idx_.load(); + } inline size_t Capacity() const { if (meta_->is_bundled()) { diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index 479b6981f..2c74b4340 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -18,6 +18,7 @@ #include "neug/storages/graph/vertex_timestamp.h" #include "neug/storages/loader/loader_utils.h" #include "neug/utils/arrow_utils.h" +#include "neug/utils/growth.h" #include "neug/utils/indexers.h" #include "neug/utils/property/table.h" @@ -292,7 +293,10 @@ class VertexTable { auto ind = std::get<2>(vertex_schema_->primary_keys[0]); auto pk_array = columns[ind]; columns.erase(columns.begin() + ind); - EnsureCapacity(indexer_.size() + pk_array->length()); + size_t new_size = indexer_.size() + pk_array->length(); + if (new_size >= indexer_.capacity()) { + EnsureCapacity(calculate_new_capacity(new_size, true)); + } auto vids = insert_primary_keys(pk_array); diff --git a/src/main/neug_db.cc b/src/main/neug_db.cc index 3d71389ae..79c89c393 100644 --- a/src/main/neug_db.cc +++ b/src/main/neug_db.cc @@ -289,6 +289,7 @@ void NeugDB::initPlannerAndQueryProcessor() { } planner_->update_meta(schema().to_yaml().value()); planner_->update_statistics(graph().get_statistics_json()); + LOG(INFO) << "Finish initializing planner with schema and statistics"; global_query_cache_ = std::make_shared(planner_); diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index 539052082..9e2935fed 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -595,10 +595,6 @@ void EdgeTable::OpenWithHugepages(const std::string& work_dir, size_t src_v_cap, } void EdgeTable::Dump(const std::string& checkpoint_dir_path) { - LOG(INFO) << "dump edge table " << meta_->src_label_name << "-" - << meta_->edge_label_name << "-" << meta_->dst_label_name << " to " - << checkpoint_dir_path << ", capacity: " << Capacity() - << ", size: " << Size(); in_csr_->dump(ie_prefix(meta_->src_label_name, meta_->dst_label_name, meta_->edge_label_name), checkpoint_dir_path); @@ -723,8 +719,6 @@ void EdgeTable::Resize(vid_t src_vertex_num, vid_t dst_vertex_num) { void EdgeTable::EnsureCapacity(size_t capacity) { if (!meta_->is_bundled()) { - LOG(INFO) << "ensure edge table capacity " << capacity - << ", current capacity " << capacity_.load(); if (capacity <= capacity_.load()) { return; } @@ -883,7 +877,10 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, std::vector valid_flags; // true for valid edges std::tie(src_lid, dst_lid, valid_flags) = filterInvalidEdges(src_lid, dst_lid); - EnsureCapacity(table_idx_.load() + src_lid.size()); + size_t new_size = table_idx_.load() + src_lid.size(); + if (new_size >= capacity_.load()) { + EnsureCapacity(calculate_new_capacity(new_size, false)); + } if (meta_->is_bundled()) { auto edges = extract_bundled_edge_data_from_batches(meta_, data_batches, valid_flags); @@ -903,7 +900,10 @@ void EdgeTable::BatchAddEdges( const std::vector& src_lid_list, const std::vector& dst_lid_list, const std::vector>& edge_data_list) { - EnsureCapacity(table_idx_.load() + src_lid_list.size()); + size_t new_size = table_idx_.load() + src_lid_list.size(); + if (new_size >= capacity_.load()) { + EnsureCapacity(calculate_new_capacity(new_size, false)); + } if (meta_->is_bundled()) { std::vector flat_edge_data; assert(meta_->properties.size() == 1); diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index 418b8d221..d1750edb2 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -896,24 +896,12 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { std::string v_label_name = schema_.get_vertex_label_name(i); vertex_tables_[i].Open(work_dir_, memory_level); -#ifdef DEBUG - size_t old_cap = vertex_tables_[i].Capacity(); - LOG(INFO) << "Open vertex table for label [" << v_label_name - << "], capacity: " << vertex_tables_[i].Capacity() - << ", size: " << vertex_tables_[i].Size() << ", new capacity: " - << calculate_new_capacity(vertex_tables_[i].Size(), true); -#endif // Case 1: Open from checkpoint, the capacity should be already reserved and // satisfied Case 2: Open from empty, Capacity should be the default minimum // capacity(4096) vertex_tables_[i].EnsureCapacity( calculate_new_capacity(vertex_tables_[i].Size(), true)); vertex_capacities[i] = vertex_tables_[i].Capacity(); -#ifdef DEBUG - if (vertex_tables_[i].Size() > 0) { - assert(old_cap == vertex_tables_[i].Capacity()); - } -#endif } for (size_t src_label_i = 0; src_label_i != vertex_label_total_count_; @@ -955,21 +943,8 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { edge_table.OpenInMemory(work_dir_, vertex_capacities[src_label_i], vertex_capacities[dst_label_i]); } -#ifdef DEBUG - size_t old_cap = edge_table.Capacity(); - LOG(INFO) << "Open edge table for edge label [" << edge_label - << "] from [" << src_label << "] to [" << dst_label - << "], capacity: " << edge_table.Capacity() - << ", size: " << edge_table.Size() << ", new capacity: " - << calculate_new_capacity(edge_table.Size(), false); -#endif edge_table.EnsureCapacity( calculate_new_capacity(edge_table.Size(), false)); -#ifdef DEBUG - if (edge_table.Size() > 0) { - assert(old_cap == edge_table.Capacity()); - } -#endif edge_tables_.emplace(index, std::move(edge_table)); } } diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 6f1948d15..7850fbf36 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -57,12 +57,6 @@ void VertexTable::Open(const std::string& work_dir, int memory_level) { THROW_INTERNAL_EXCEPTION("Invalid memory level: " + std::to_string(memory_level_)); } - LOG(INFO) << "Open vertex table for label [" << label_name - << "], capacity: " << indexer_.capacity() - << ", size: " << indexer_.size(); - if (table_ && table_->col_num() > 0) { - LOG(INFO) << ", table size: " << table_->get_column_by_id(0)->size(); - } v_ts_.Open(vertex_tracker_filename); } @@ -191,10 +185,6 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { } void VertexTable::Reserve(size_t cap) { - LOG(INFO) << "Reserving capacity for vertex table " - << vertex_schema_->label_name - << ", current capacity: " << indexer_.capacity() - << ", requested capacity: " << cap; if (cap > indexer_.capacity()) { indexer_.reserve(cap); } @@ -205,10 +195,6 @@ void VertexTable::Reserve(size_t cap) { } size_t VertexTable::EnsureCapacity(size_t capacity) { - LOG(INFO) << "Ensuring capacity for vertex table " - << vertex_schema_->label_name - << ", current capacity: " << indexer_.capacity() - << ", requested capacity: " << capacity; if (capacity <= indexer_.capacity()) { return indexer_.capacity(); } From dae1473f54e1c38b696f49305595d49780fdf610 Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Wed, 11 Mar 2026 13:52:11 +0800 Subject: [PATCH 07/19] fix format --- tools/python_bind/tests/test_db_query.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tools/python_bind/tests/test_db_query.py b/tools/python_bind/tests/test_db_query.py index f8bd20f15..2c3244846 100644 --- a/tools/python_bind/tests/test_db_query.py +++ b/tools/python_bind/tests/test_db_query.py @@ -2577,13 +2577,3 @@ def test_insert_many_edges(): assert records == [[4950]], f"Expected value [[4950]], got {records}" conn.close() db.close() - -def test_open_modern_graph(): - db_dir = "/tmp/tinysnb" - db = Database(db_path=db_dir, mode="r") - conn = db.connect() - res = conn.execute("MATCH (n) RETURN n LIMIT 1;") - records = list(res) - assert len(records) == 1 - conn.close() - db.close() \ No newline at end of file From 6a1623b8e81880f79b0fc41dbce6e2ccdf970452 Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Wed, 11 Mar 2026 14:20:01 +0800 Subject: [PATCH 08/19] fix test --- tests/storage/test_vertex_table.cc | 34 ++++++++++++------------- tests/storage/vertex_table_benchmark.cc | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/storage/test_vertex_table.cc b/tests/storage/test_vertex_table.cc index 5bee40fdd..403f051e7 100644 --- a/tests/storage/test_vertex_table.cc +++ b/tests/storage/test_vertex_table.cc @@ -113,7 +113,7 @@ class VertexTableTest : public ::testing::Test { TEST_F(VertexTableTest, VertexTableBasicOps) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(vertex_count_); neug::vid_t lid1, lid2, lid3; @@ -166,7 +166,7 @@ TEST_F(VertexTableTest, VertexTableDumpAndReload) { std::filesystem::create_directories(neug::temp_checkpoint_dir(dump_dir)); { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(vertex_count_); neug::vid_t lid1, lid2, lid3; @@ -209,7 +209,7 @@ TEST_F(VertexTableTest, VertexTableAddAndDeleteAndReload) { neug::Property oid1, oid2, oid3; { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(vertex_count_); oid1.set_int64(1); @@ -270,7 +270,7 @@ TEST_F(VertexTableTest, VertexTableAddAndDeleteAndReload) { TEST_F(VertexTableTest, AddVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -309,7 +309,7 @@ TEST_F(VertexTableTest, AddVertexBasic) { // Test AddVertex for concurrent scenarios TEST_F(VertexTableTest, AddVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); neug::vid_t tmp_vid; EXPECT_FALSE(table.AddVertex(neug::Property::from_int64(1), property_values_, tmp_vid)); @@ -340,7 +340,7 @@ TEST_F(VertexTableTest, AddVertex) { // Test DeleteVertex basic functionality TEST_F(VertexTableTest, DeleteVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -373,7 +373,7 @@ TEST_F(VertexTableTest, DeleteVertexBasic) { // Test RevertDeleteVertex basic functionality TEST_F(VertexTableTest, RevertDeleteVertexBasic) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1; @@ -403,7 +403,7 @@ TEST_F(VertexTableTest, RevertDeleteVertexBasic) { // Test complex combination: Add -> Delete -> Revert TEST_F(VertexTableTest, AddDeleteRevertCombination) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); std::vector oids; @@ -448,7 +448,7 @@ TEST_F(VertexTableTest, AddDeleteRevertCombination) { // Test complex combination: Multiple deletes and reverts TEST_F(VertexTableTest, MultipleDeletesAndReverts) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -483,7 +483,7 @@ TEST_F(VertexTableTest, MultipleDeletesAndReverts) { // Test AddVertex and AddVertexSafe mixed usage TEST_F(VertexTableTest, MixedAddVertexAndAddVertexSafe) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(50); std::vector lids; @@ -507,7 +507,7 @@ TEST_F(VertexTableTest, MixedAddVertexAndAddVertexSafe) { // Test temporal visibility with add/delete/revert TEST_F(VertexTableTest, TemporalVisibilityComplex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid1, oid2, oid3; @@ -550,7 +550,7 @@ TEST_F(VertexTableTest, TemporalVisibilityComplex) { // Test edge cases: Delete already deleted vertex TEST_F(VertexTableTest, DeleteAlreadyDeletedVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -573,7 +573,7 @@ TEST_F(VertexTableTest, DeleteAlreadyDeletedVertex) { // Test edge cases: Revert non-deleted vertex TEST_F(VertexTableTest, RevertNonDeletedVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); neug::Property oid; @@ -606,7 +606,7 @@ TEST_F(VertexTableTest, ComplexAddDeleteRevertDumpReload) { // Create complex state { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dump_dir, memory_level_, true); + table.Open(dump_dir, memory_level_); table.Reserve(100); for (int64_t i = 0; i < 20; ++i) { @@ -667,7 +667,7 @@ TEST_F(VertexTableTest, ComplexAddDeleteRevertDumpReload) { // Test stress: Many add/delete/revert operations TEST_F(VertexTableTest, StressAddDeleteRevert) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(1000); std::vector oids; @@ -719,7 +719,7 @@ TEST_F(VertexTableTest, StressAddDeleteRevert) { TEST_F(VertexTableTest, VertexTableResizeTest) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); auto record_batches = generate_record_batches(10000); std::shared_ptr batch_supplier = std::make_shared(std::move(record_batches)); @@ -789,7 +789,7 @@ TEST_F(VertexTableTest, VertexTimestampValidVertexNum) { TEST_F(VertexTableTest, VertexSetForeachVertex) { neug::VertexTable table(schema_.get_vertex_schema(v_label_id_)); - table.Open(dir_, memory_level_, true); + table.Open(dir_, memory_level_); table.Reserve(100); std::vector oids; diff --git a/tests/storage/vertex_table_benchmark.cc b/tests/storage/vertex_table_benchmark.cc index e11fdd2fc..32ce5e949 100644 --- a/tests/storage/vertex_table_benchmark.cc +++ b/tests/storage/vertex_table_benchmark.cc @@ -70,7 +70,7 @@ class VertexTableBenchmark : public ::testing::Test { void CreateAndOpenVertexTable(neug::VertexTable& table) { // Open the vertex table - table.Open(test_dir_, 1, true); // memory_level=1 for in-memory + table.Open(test_dir_, 1); // memory_level=1 for in-memory } void AddVerticesWithProperties(neug::VertexTable& table, size_t count) { From 645584ca2b02fea86c15ee3b83bf585dab24dd4a Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 11 Mar 2026 15:03:07 +0800 Subject: [PATCH 09/19] Update src/utils/file_utils.cc Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- src/utils/file_utils.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/file_utils.cc b/src/utils/file_utils.cc index b063df501..ee07b514e 100644 --- a/src/utils/file_utils.cc +++ b/src/utils/file_utils.cc @@ -154,6 +154,7 @@ void write_file(const std::string& filename, const void* buffer, size_t size, ss << "Failed to write file " << filename << ", expected " << num << ", got " << ret_len << ", " << strerror(errno); LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); } int ret = 0; if ((ret = fclose(fout)) != 0) { From eb61a06594ad20fbd5643f13966b44a19908c22e Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Wed, 11 Mar 2026 15:36:37 +0800 Subject: [PATCH 10/19] minor fix --- include/neug/storages/graph/vertex_table.h | 2 +- src/storages/graph/property_graph.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index 2c74b4340..391a69f42 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -294,7 +294,7 @@ class VertexTable { auto pk_array = columns[ind]; columns.erase(columns.begin() + ind); size_t new_size = indexer_.size() + pk_array->length(); - if (new_size >= indexer_.capacity()) { + while (new_size >= Capacity()) { EnsureCapacity(calculate_new_capacity(new_size, true)); } diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index d1750edb2..4c3c21e6f 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -1122,7 +1122,7 @@ void PropertyGraph::Dump(bool reopen, bool ensure_capacity) { auto& edge_table = edge_tables_.at(index); edge_table.Resize(vertex_num[src_label_i], vertex_num[dst_label_i]); EnsureCapacity(src_label_i, dst_label_i, e_label_i, - calculate_new_capacity(edge_table.Capacity(), false)); + calculate_new_capacity(edge_table.Size(), false)); edge_table.Dump(target_dir); } } From f6501a8c1bc30d87330ad1692f8b5476ad53580f Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 11 Mar 2026 17:02:48 +0800 Subject: [PATCH 11/19] fix string column reserve --- include/neug/storages/graph/property_graph.h | 2 +- include/neug/utils/mmap_array.h | 9 ++++--- include/neug/utils/property/column.h | 21 +++++++++++----- src/storages/graph/property_graph.cc | 26 -------------------- src/storages/graph/vertex_table.cc | 6 ++--- 5 files changed, 25 insertions(+), 39 deletions(-) diff --git a/include/neug/storages/graph/property_graph.h b/include/neug/storages/graph/property_graph.h index 88c09e942..398691adf 100644 --- a/include/neug/storages/graph/property_graph.h +++ b/include/neug/storages/graph/property_graph.h @@ -116,7 +116,7 @@ class PropertyGraph { * * @since v0.1.0 */ - ~PropertyGraph(); + ~PropertyGraph() = default; /** * @brief Open the property graph from persistent storage. diff --git a/include/neug/utils/mmap_array.h b/include/neug/utils/mmap_array.h index c1d799a35..bdd9cb6d2 100644 --- a/include/neug/utils/mmap_array.h +++ b/include/neug/utils/mmap_array.h @@ -564,7 +564,9 @@ class mmap_array { // Compact the data buffer by removing unused space and updating offsets // This is an in-place operation that shifts valid string data forward - // Returns the compacted data size + // Returns the compacted data size. Note that the reserved size of data buffer + // is not changed, and new strings can still be appended after the compacted + // data. size_t compact() { auto plan = prepare_compaction_plan(); if (items_.size() == 0) { @@ -577,9 +579,11 @@ class mmap_array { std::vector temp_buf(plan.total_size); size_t write_offset = 0; + size_t limit_offset = 0; for (const auto& entry : plan.entries) { const char* src = data_.data() + entry.offset; char* dst = temp_buf.data() + write_offset; + limit_offset = std::max(limit_offset, entry.offset + entry.length); memcpy(dst, src, entry.length); items_.set(entry.index, {static_cast(write_offset), entry.length}); @@ -588,9 +592,8 @@ class mmap_array { assert(write_offset == plan.total_size); memcpy(data_.data(), temp_buf.data(), plan.total_size); - data_.resize(plan.total_size); VLOG(1) << "Compaction completed. New data size: " << plan.total_size - << ", old data size: " << size_before_compact; + << ", old data size: " << limit_offset; return plan.total_size; } diff --git a/include/neug/utils/property/column.h b/include/neug/utils/property/column.h index a48314f0f..6dad64959 100644 --- a/include/neug/utils/property/column.h +++ b/include/neug/utils/property/column.h @@ -30,6 +30,7 @@ #include "neug/storages/file_names.h" #include "neug/utils/exception/exception.h" +#include "neug/utils/file_utils.h" #include "neug/utils/likely.h" #include "neug/utils/mmap_array.h" #include "neug/utils/property/property.h" @@ -301,7 +302,7 @@ class TypedColumn : public ColumnBase { if (std::filesystem::exists(basic_path + ".items")) { buffer_.open(basic_path, false, false); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(basic_path + ".pos"); } else { if (work_dir == "") { size_ = 0; @@ -309,7 +310,7 @@ class TypedColumn : public ColumnBase { } else { buffer_.open(work_dir + "/" + name, true); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(work_dir + "/" + name + ".pos"); } } } @@ -317,14 +318,14 @@ class TypedColumn : public ColumnBase { void open_in_memory(const std::string& prefix) override { buffer_.open(prefix, false); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(prefix + ".pos"); } void open_with_hugepages(const std::string& prefix, bool force) override { if (strategy_ == StorageStrategy::kMem || force) { buffer_.open_with_hugepages(prefix); size_ = buffer_.size(); - pos_ = buffer_.data_size(); + init_pos(prefix + ".pos"); } else if (strategy_ == StorageStrategy::kDisk) { LOG(INFO) << "Open " << prefix << " with normal mmap pages"; @@ -342,16 +343,17 @@ class TypedColumn : public ColumnBase { } copy_file(cur_path + ".data", tmp_path + ".data"); copy_file(cur_path + ".items", tmp_path + ".items"); + copy_file(cur_path + ".pos", tmp_path + ".pos"); buffer_.reset(); tmp.open(tmp_path, true); buffer_.swap(tmp); tmp.reset(); - pos_.store(buffer_.data_size()); + init_pos(tmp_path + ".pos"); } void dump(const std::string& filename) override { - buffer_.resize(size_, pos_.load()); + write_file(filename + ".pos", &pos_, sizeof(pos_), 1); buffer_.dump(filename); } @@ -431,6 +433,13 @@ class TypedColumn : public ColumnBase { } private: + inline void init_pos(const std::string& file_path) { + if (std::filesystem::exists(file_path)) { + read_file(file_path, &pos_, sizeof(pos_), 1); + } else { + pos_.store(0); + } + } mmap_array buffer_; size_t size_; std::atomic pos_; diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index 4c3c21e6f..b60672261 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -42,32 +42,6 @@ PropertyGraph::PropertyGraph() edge_label_total_count_(0), memory_level_(1) {} -PropertyGraph::~PropertyGraph() { - std::vector degree_list(vertex_label_total_count_, 0); - for (size_t i = 0; i < vertex_label_total_count_; ++i) { - if (!vertex_tables_[i].is_dropped()) { - degree_list[i] = vertex_tables_[i].LidNum(); - vertex_tables_[i].Reserve(degree_list[i]); - } - } - for (size_t src_label = 0; src_label != vertex_label_total_count_; - ++src_label) { - for (size_t dst_label = 0; dst_label != vertex_label_total_count_; - ++dst_label) { - for (size_t e_label = 0; e_label != edge_label_total_count_; ++e_label) { - size_t index = - schema_.generate_edge_label(src_label, dst_label, e_label); - auto pair = edge_tables_.find(index); - if (pair != edge_tables_.end()) { - auto& edge_table = pair->second; - edge_table.Resize(degree_list[src_label], degree_list[dst_label]); - edge_tables_.erase(pair); - } - } - } - } -} - void PropertyGraph::loadSchema(const std::string& schema_path) { std::ifstream in(schema_path); schema_.Deserialize(in); diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 7850fbf36..bde75bc9d 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -187,9 +187,9 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { void VertexTable::Reserve(size_t cap) { if (cap > indexer_.capacity()) { indexer_.reserve(cap); - } - if (table_) { - table_->resize(cap); + if (table_) { + table_->resize(cap); + } v_ts_.Reserve(cap); } } From 40000f79e33f84ffc0b1f3305cebac33ab48d1f4 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 11 Mar 2026 18:03:20 +0800 Subject: [PATCH 12/19] Update include/neug/utils/growth.h Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- include/neug/utils/growth.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index 69be2f53f..cd126e7bf 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -15,10 +15,9 @@ */ #pragma once - #include #include - +#include namespace neug { inline size_t calculate_new_capacity(size_t current_capacity, bool is_vertex_table) { From 69f43da7231d16b0034e4c42b7d856c464bbc816 Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 11 Mar 2026 18:03:34 +0800 Subject: [PATCH 13/19] Update src/utils/file_utils.cc Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- src/utils/file_utils.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/file_utils.cc b/src/utils/file_utils.cc index ee07b514e..584cf5c52 100644 --- a/src/utils/file_utils.cc +++ b/src/utils/file_utils.cc @@ -114,7 +114,7 @@ void remove_directory(const std::string& dir_path) { void read_file(const std::string& filename, void* buffer, size_t size, size_t num) { - FILE* fin = fopen(filename.c_str(), "r"); + FILE* fin = fopen(filename.c_str(), "rb"); if (fin == nullptr) { std::stringstream ss; ss << "Failed to open file " << filename << ", " << strerror(errno); From 9fd1bbcdda2da302679206325580c11ef9dfe97a Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Wed, 11 Mar 2026 18:52:54 +0800 Subject: [PATCH 14/19] fix string column stream compact --- include/neug/storages/graph/property_graph.h | 6 ++-- include/neug/utils/mmap_array.h | 16 ++++++++++ include/neug/utils/property/table.h | 7 +++++ src/storages/graph/property_graph.cc | 33 +++++++++++++++++++- src/storages/graph/vertex_table.cc | 8 ++--- tests/storage/test_edge_table.cc | 2 ++ 6 files changed, 63 insertions(+), 9 deletions(-) diff --git a/include/neug/storages/graph/property_graph.h b/include/neug/storages/graph/property_graph.h index 398691adf..518bea5e8 100644 --- a/include/neug/storages/graph/property_graph.h +++ b/include/neug/storages/graph/property_graph.h @@ -116,7 +116,7 @@ class PropertyGraph { * * @since v0.1.0 */ - ~PropertyGraph() = default; + ~PropertyGraph(); /** * @brief Open the property graph from persistent storage. @@ -140,10 +140,8 @@ class PropertyGraph { /** * @brief Dump the current graph state to persistent storage. * @param reopen If true, reopens the graph after dumping (default: true) - * @param reserve_space If true, reserves space for all vertex and edge - * tables. */ - void Dump(bool reopen = true, bool ensure_capacity = true); + void Dump(bool reopen = true); /** * @brief Dump schema information to a file. diff --git a/include/neug/utils/mmap_array.h b/include/neug/utils/mmap_array.h index bdd9cb6d2..6206ffe8d 100644 --- a/include/neug/utils/mmap_array.h +++ b/include/neug/utils/mmap_array.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -657,6 +658,21 @@ class mmap_array { LOG(ERROR) << ss.str(); THROW_RUNTIME_ERROR(ss.str()); } + int fd = fileno(fout); + if (fd == -1) { + std::stringstream ss; + ss << "Failed to get file descriptor for [ " << data_filename << " ], " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } + if (ftruncate(fd, size_before_compact) != 0) { + std::stringstream ss; + ss << "Failed to ftruncate file [ " << data_filename << " ], " + << strerror(errno); + LOG(ERROR) << ss.str(); + THROW_RUNTIME_ERROR(ss.str()); + } if (fclose(fout) != 0) { std::stringstream ss; ss << "Failed to fclose file [ " << data_filename << " ], " diff --git a/include/neug/utils/property/table.h b/include/neug/utils/property/table.h index 3d7d1e74d..ed5fa1199 100644 --- a/include/neug/utils/property/table.h +++ b/include/neug/utils/property/table.h @@ -94,6 +94,13 @@ class Table { void delete_column(const std::string& col_name); size_t col_num() const; + inline size_t size() const { + if (columns_.empty()) { + return 0; + } else { + return columns_[0]->size(); + } + } std::vector>& columns(); std::vector& column_ptrs(); diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index b60672261..1244394cf 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -42,6 +42,32 @@ PropertyGraph::PropertyGraph() edge_label_total_count_(0), memory_level_(1) {} +PropertyGraph::~PropertyGraph() { + std::vector degree_list(vertex_label_total_count_, 0); + for (size_t i = 0; i < vertex_label_total_count_; ++i) { + if (!vertex_tables_[i].is_dropped()) { + degree_list[i] = vertex_tables_[i].LidNum(); + vertex_tables_[i].Reserve(degree_list[i]); + } + } + for (size_t src_label = 0; src_label != vertex_label_total_count_; + ++src_label) { + for (size_t dst_label = 0; dst_label != vertex_label_total_count_; + ++dst_label) { + for (size_t e_label = 0; e_label != edge_label_total_count_; ++e_label) { + size_t index = + schema_.generate_edge_label(src_label, dst_label, e_label); + auto pair = edge_tables_.find(index); + if (pair != edge_tables_.end()) { + auto& edge_table = pair->second; + edge_table.Resize(degree_list[src_label], degree_list[dst_label]); + edge_tables_.erase(pair); + } + } + } + } +} + void PropertyGraph::loadSchema(const std::string& schema_path) { std::ifstream in(schema_path); schema_.Deserialize(in); @@ -919,6 +945,11 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { } edge_table.EnsureCapacity( calculate_new_capacity(edge_table.Size(), false)); + // TODO(zhanglei): Any better way to resize for memory level 0? + if (memory_level_ == 0) { + edge_table.Resize(vertex_capacities[src_label_i], + vertex_capacities[dst_label_i]); + } edge_tables_.emplace(index, std::move(edge_table)); } } @@ -1038,7 +1069,7 @@ void PropertyGraph::Compact(bool compact_csr, float reserve_ratio, } } -void PropertyGraph::Dump(bool reopen, bool ensure_capacity) { +void PropertyGraph::Dump(bool reopen) { // First dump to the temp dir, then move to the checkpoint dir std::string target_dir = temp_checkpoint_dir(work_dir_); if (std::filesystem::exists(target_dir)) { diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index bde75bc9d..c81fee07c 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -187,11 +187,11 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { void VertexTable::Reserve(size_t cap) { if (cap > indexer_.capacity()) { indexer_.reserve(cap); - if (table_) { - table_->resize(cap); - } - v_ts_.Reserve(cap); } + if (table_ && table_->size() < cap) { + table_->resize(cap); + } + v_ts_.Reserve(cap); } size_t VertexTable::EnsureCapacity(size_t capacity) { diff --git a/tests/storage/test_edge_table.cc b/tests/storage/test_edge_table.cc index 42b385fba..4d834d471 100644 --- a/tests/storage/test_edge_table.cc +++ b/tests/storage/test_edge_table.cc @@ -895,6 +895,7 @@ TEST_F(EdgeTableTest, TestAddEdgeDeleteUnbundled) { neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); size_t edge_count = 0; + this->edge_table->EnsureCapacity(edge_data.size()); for (size_t i = 0; i < src_lids.size(); ++i) { this->edge_table->AddEdge(src_lids[i], dst_lids[i], edge_data[i], 0, allocator); @@ -1043,6 +1044,7 @@ TEST_F(EdgeTableTest, TestUpdateEdgeData) { neug::Property::from_int32(static_cast(0))}); } + this->edge_table->EnsureCapacity(edge_data.size()); neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); for (size_t i = 0; i < src_lids.size(); ++i) { this->edge_table->AddEdge(src_lids[i], dst_lids[i], edge_data[i], 0, From 190ca504deed2fc5f1628b971487124f1f8fcf1d Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 12 Mar 2026 12:03:49 +0800 Subject: [PATCH 15/19] diff capacity reserve for open/dump and inserting --- include/neug/storages/graph/vertex_table.h | 2 +- include/neug/utils/growth.h | 33 ++++++++++++------- include/neug/utils/mmap_array.h | 2 ++ src/storages/graph/edge_table.cc | 9 +++-- src/storages/graph/property_graph.cc | 32 +++++++++++++----- src/storages/graph/vertex_table.cc | 4 +++ .../loader/abstract_property_graph_loader.cc | 2 +- 7 files changed, 59 insertions(+), 25 deletions(-) diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index 391a69f42..b76b4d786 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -295,7 +295,7 @@ class VertexTable { columns.erase(columns.begin() + ind); size_t new_size = indexer_.size() + pk_array->length(); while (new_size >= Capacity()) { - EnsureCapacity(calculate_new_capacity(new_size, true)); + EnsureCapacity(calculate_new_capacity(new_size, true, false)); } auto vids = insert_primary_keys(pk_array); diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index cd126e7bf..e9c888ec1 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -19,23 +19,32 @@ #include #include namespace neug { -inline size_t calculate_new_capacity(size_t current_capacity, - bool is_vertex_table) { - if (current_capacity < 4096) { +inline size_t calculate_new_capacity(size_t current_size, bool is_vertex_table, + bool on_dump_or_open) { + if (current_size < 4096) { return 4096; // Start with a reasonable default capacity. } static constexpr size_t MAX_CAPACITY = std::numeric_limits::max(); if (is_vertex_table) { - // For vertex tables, we grow exponentially: double the current capacity, - // with a 4K floor. - return current_capacity <= MAX_CAPACITY / 2 ? current_capacity * 2 - : MAX_CAPACITY; + if (on_dump_or_open) { + // plus 1/4 + return current_size <= MAX_CAPACITY - current_size / 4 + ? current_size + current_size / 4 + : MAX_CAPACITY; + } else { + return current_size <= MAX_CAPACITY / 2 ? current_size * 2 : MAX_CAPACITY; + } } else { - // For edge tables, we grow linearly: new capacity = current capacity + - // (current capacity + 4) / 5. - return current_capacity <= MAX_CAPACITY - (current_capacity + 4) / 5 - ? current_capacity + (current_capacity + 4) / 5 - : MAX_CAPACITY; + if (on_dump_or_open) { + // For edge tables, we grow linearly: new capacity = current capacity + + // (current capacity + 4) / 5. + return current_size <= MAX_CAPACITY - (current_size + 4) / 5 + ? current_size + (current_size + 4) / 5 + : MAX_CAPACITY; + } else { + return current_size <= MAX_CAPACITY / 2 ? current_size * 2 : MAX_CAPACITY; + } } } + } // namespace neug diff --git a/include/neug/utils/mmap_array.h b/include/neug/utils/mmap_array.h index 6206ffe8d..0c041c16c 100644 --- a/include/neug/utils/mmap_array.h +++ b/include/neug/utils/mmap_array.h @@ -524,6 +524,8 @@ class mmap_array { } void resize(size_t size, size_t data_size) { + LOG(INFO) << "Resizing mmap_array " << this << " to size: " << size + << ", data_size: " << data_size; items_.resize(size); data_.resize(data_size); } diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index 9e2935fed..f8bb72dd2 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -722,6 +722,8 @@ void EdgeTable::EnsureCapacity(size_t capacity) { if (capacity <= capacity_.load()) { return; } + LOG(INFO) << "resize edge table from capacity " << capacity_.load() + << " to " << capacity; table_->resize(capacity); capacity_.store(capacity); } @@ -879,7 +881,7 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, filterInvalidEdges(src_lid, dst_lid); size_t new_size = table_idx_.load() + src_lid.size(); if (new_size >= capacity_.load()) { - EnsureCapacity(calculate_new_capacity(new_size, false)); + EnsureCapacity(calculate_new_capacity(new_size, false, false)); } if (meta_->is_bundled()) { auto edges = extract_bundled_edge_data_from_batches(meta_, data_batches, @@ -902,7 +904,7 @@ void EdgeTable::BatchAddEdges( const std::vector>& edge_data_list) { size_t new_size = table_idx_.load() + src_lid_list.size(); if (new_size >= capacity_.load()) { - EnsureCapacity(calculate_new_capacity(new_size, false)); + EnsureCapacity(calculate_new_capacity(new_size, false, false)); } if (meta_->is_bundled()) { std::vector flat_edge_data; @@ -1019,7 +1021,8 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (prev_data_col && prev_data_col->size() > 0) { table_->resize(prev_data_col->size()); table_idx_.store(prev_data_col->size()); - EnsureCapacity(calculate_new_capacity(prev_data_col->size(), false)); + EnsureCapacity( + calculate_new_capacity(prev_data_col->size(), false, false)); } } else { // delete_property == true, which means the EdgeTable will become use csr of diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index 1244394cf..c0d498aed 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -121,7 +121,7 @@ Status PropertyGraph::EnsureCapacity(label_t v_label, size_t capacity) { if (capacity == 0) { auto old_size = vertex_tables_[v_label].Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_size, true); + capacity = neug::calculate_new_capacity(old_size, true, false); } } if (capacity <= old_cap) { @@ -172,7 +172,7 @@ Status PropertyGraph::EnsureCapacity(label_t src_label, label_t dst_label, if (capacity == 0) { size_t old_size = edge_tables_.at(index).Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_size, false); + capacity = neug::calculate_new_capacity(old_size, false, false); } } if (capacity <= old_cap) { @@ -900,7 +900,7 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { // satisfied Case 2: Open from empty, Capacity should be the default minimum // capacity(4096) vertex_tables_[i].EnsureCapacity( - calculate_new_capacity(vertex_tables_[i].Size(), true)); + calculate_new_capacity(vertex_tables_[i].Size(), true, true)); vertex_capacities[i] = vertex_tables_[i].Capacity(); } @@ -944,7 +944,7 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { vertex_capacities[dst_label_i]); } edge_table.EnsureCapacity( - calculate_new_capacity(edge_table.Size(), false)); + calculate_new_capacity(edge_table.Size(), false, true)); // TODO(zhanglei): Any better way to resize for memory level 0? if (memory_level_ == 0) { edge_table.Resize(vertex_capacities[src_label_i], @@ -1067,6 +1067,7 @@ void PropertyGraph::Compact(bool compact_csr, float reserve_ratio, } } } + LOG(INFO) << "Compaction completed."; } void PropertyGraph::Dump(bool reopen) { @@ -1088,10 +1089,17 @@ void PropertyGraph::Dump(bool reopen) { THROW_RUNTIME_ERROR(ss.str()); } std::vector vertex_num(vertex_label_total_count_, 0); + std::vector vertex_capacity(vertex_label_total_count_, 0); for (size_t i = 0; i < vertex_label_total_count_; ++i) { if (!vertex_tables_[i].is_dropped()) { vertex_num[i] = vertex_tables_[i].LidNum(); - EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true)); + LOG(INFO) << "Dump vertex table for label " << i + << ", vertex num: " << vertex_num[i] + << ", capacity: " << vertex_tables_[i].Capacity() + << ", new capacity: " + << calculate_new_capacity(vertex_num[i], true, true); + EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true, true)); + vertex_capacity[i] = vertex_tables_[i].Capacity(); vertex_tables_[i].Dump(target_dir); } } @@ -1125,9 +1133,17 @@ void PropertyGraph::Dump(bool reopen) { schema_.generate_edge_label(src_label_i, dst_label_i, e_label_i); if (edge_tables_.count(index) > 0) { auto& edge_table = edge_tables_.at(index); - edge_table.Resize(vertex_num[src_label_i], vertex_num[dst_label_i]); - EnsureCapacity(src_label_i, dst_label_i, e_label_i, - calculate_new_capacity(edge_table.Size(), false)); + edge_table.Resize(vertex_capacity[src_label_i], + vertex_capacity[dst_label_i]); + LOG(INFO) << "Dump edge table for edge label " << edge_label + << " from " << src_label << " to " << dst_label + << ", edge num: " << edge_table.EdgeNum() + << ", capacity: " << edge_table.Capacity() + << ", new capacity: " + << calculate_new_capacity(edge_table.Size(), false, true); + EnsureCapacity( + src_label_i, dst_label_i, e_label_i, + calculate_new_capacity(edge_table.Size(), false, true)); edge_table.Dump(target_dir); } } diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index c81fee07c..53a92ee52 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -186,9 +186,13 @@ bool VertexTable::IsValidLid(vid_t lid, timestamp_t ts) const { void VertexTable::Reserve(size_t cap) { if (cap > indexer_.capacity()) { + LOG(INFO) << "Reserving vertex table capacity from " << indexer_.capacity() + << " to " << cap; indexer_.reserve(cap); } if (table_ && table_->size() < cap) { + LOG(INFO) << "Resizing vertex table from size " << table_->size() << " to " + << cap; table_->resize(cap); } v_ts_.Reserve(cap); diff --git a/src/storages/loader/abstract_property_graph_loader.cc b/src/storages/loader/abstract_property_graph_loader.cc index 9c1ca2c52..fac11e993 100644 --- a/src/storages/loader/abstract_property_graph_loader.cc +++ b/src/storages/loader/abstract_property_graph_loader.cc @@ -178,8 +178,8 @@ void AbstractPropertyGraphLoader::loadEdges() { for (auto& thread : threads) { thread.join(); } - LOG(INFO) << "Finished loading edges"; } + LOG(INFO) << "Finished loading edges"; } result AbstractPropertyGraphLoader::LoadFragment() { From b56eccc03c4d486ebe1750e1ebf7a70a2705141b Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Sat, 7 Mar 2026 20:21:57 +0800 Subject: [PATCH 16/19] refine ccache and try to test whether upload codecov could succeed --- .github/workflows/neug-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/neug-test.yml b/.github/workflows/neug-test.yml index ad6353d20..5b77f83a2 100644 --- a/.github/workflows/neug-test.yml +++ b/.github/workflows/neug-test.yml @@ -409,7 +409,7 @@ jobs: mv coverage_filtered.info coverage.info - name: Upload coverage to Codecov - if: steps.scope.outputs.extension_only != 'true' && (github.ref == 'refs/heads/main' && github.repository == 'alibaba/neug' && github.event_name == 'push') + # if: steps.scope.outputs.extension_only != 'true' && (github.ref == 'refs/heads/main' && github.repository == 'alibaba/neug' && github.event_name == 'push') uses: codecov/codecov-action@v4 with: file: ${{ github.workspace }}/tools/python_bind/build/neug_py_bind/coverage.info From 8e04f41d14584f77c94ac682555259b94f06cdc8 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Tue, 10 Mar 2026 10:55:56 +0800 Subject: [PATCH 17/19] revert comment Committed-by: xiaolei.zl from Dev container --- .github/workflows/neug-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/neug-test.yml b/.github/workflows/neug-test.yml index 5b77f83a2..ad6353d20 100644 --- a/.github/workflows/neug-test.yml +++ b/.github/workflows/neug-test.yml @@ -409,7 +409,7 @@ jobs: mv coverage_filtered.info coverage.info - name: Upload coverage to Codecov - # if: steps.scope.outputs.extension_only != 'true' && (github.ref == 'refs/heads/main' && github.repository == 'alibaba/neug' && github.event_name == 'push') + if: steps.scope.outputs.extension_only != 'true' && (github.ref == 'refs/heads/main' && github.repository == 'alibaba/neug' && github.event_name == 'push') uses: codecov/codecov-action@v4 with: file: ${{ github.workspace }}/tools/python_bind/build/neug_py_bind/coverage.info From 4f900a619edee8aad769688a30647dfbc18bbeef Mon Sep 17 00:00:00 2001 From: zhanglei1949 Date: Thu, 12 Mar 2026 12:06:12 +0800 Subject: [PATCH 18/19] fix cpp test --- src/storages/graph/edge_table.cc | 13 ++++++------- src/storages/graph/property_graph.cc | 4 ++-- src/storages/graph/vertex_table.cc | 5 ----- tests/transaction/test_acid.cc | 4 ++++ 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index f8bb72dd2..c3e5d8c47 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -1018,12 +1018,6 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (table_->col_num() >= 1) { prev_data_col = table_->get_column_by_id(0); } - if (prev_data_col && prev_data_col->size() > 0) { - table_->resize(prev_data_col->size()); - table_idx_.store(prev_data_col->size()); - EnsureCapacity( - calculate_new_capacity(prev_data_col->size(), false, false)); - } } else { // delete_property == true, which means the EdgeTable will become use csr of // empty type. we need to reset capacity and table_idx to 0 @@ -1032,6 +1026,11 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { } auto edges = out_csr_->batch_export(prev_data_col); + if (prev_data_col && prev_data_col->size() > 0) { + table_->resize(prev_data_col->size()); + table_idx_.store(prev_data_col->size()); + EnsureCapacity(calculate_new_capacity(prev_data_col->size(), false)); + } // Set default value for other columns for (size_t col_id = 1; col_id < table_->col_num(); ++col_id) { auto col = table_->get_column_by_id(col_id); @@ -1044,7 +1043,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { VLOG(10) << "Set default value for column " << col_id << ": " << default_value.to_string() << ", type: " << std::to_string(default_value.type()); - for (size_t row = 0; row < Size(); ++row) { + for (size_t row = 0; row < col->size(); ++row) { col->set_any(row, default_value); } } diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index c0d498aed..eefc21ead 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -897,7 +897,8 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { vertex_tables_[i].Open(work_dir_, memory_level); // Case 1: Open from checkpoint, the capacity should be already reserved and - // satisfied Case 2: Open from empty, Capacity should be the default minimum + // satisfied. + // Case 2: Open from empty, Capacity should be the default minimum // capacity(4096) vertex_tables_[i].EnsureCapacity( calculate_new_capacity(vertex_tables_[i].Size(), true, true)); @@ -945,7 +946,6 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { } edge_table.EnsureCapacity( calculate_new_capacity(edge_table.Size(), false, true)); - // TODO(zhanglei): Any better way to resize for memory level 0? if (memory_level_ == 0) { edge_table.Resize(vertex_capacities[src_label_i], vertex_capacities[dst_label_i]); diff --git a/src/storages/graph/vertex_table.cc b/src/storages/graph/vertex_table.cc index 53a92ee52..9823e4563 100644 --- a/src/storages/graph/vertex_table.cc +++ b/src/storages/graph/vertex_table.cc @@ -82,14 +82,9 @@ void VertexTable::insert_vertices( void VertexTable::Dump(const std::string& target_dir) { const auto& label_name = vertex_schema_->label_name; - VLOG(1) << "Dump vertex table " << label_name << " done, size " - << indexer_.size() << ", capacity " << indexer_.capacity(); indexer_.dump(IndexerType::prefix() + "_" + vertex_map_prefix(label_name), target_dir); - // table_->resize(indexer_.size()); table_->dump(vertex_table_prefix(label_name), target_dir); - // Shrink v_ts_ to fit the indexer size - // v_ts_.Reserve(indexer_.size()); v_ts_.Dump(target_dir + "/" + vertex_tracker_file(label_name)); } diff --git a/tests/transaction/test_acid.cc b/tests/transaction/test_acid.cc index ef3ae806b..7c6e2dbe1 100644 --- a/tests/transaction/test_acid.cc +++ b/tests/transaction/test_acid.cc @@ -343,6 +343,10 @@ std::shared_ptr G0Init(NeugDB& db, auto person_label_id = schema.get_vertex_label_id("PERSON"); auto knows_label_id = schema.get_edge_label_id("KNOWS"); + db.graph().EnsureCapacity(person_label_id, 1000); + db.graph().EnsureCapacity(person_label_id, knows_label_id, person_label_id, + 1000); + auto sess = svc->AcquireSession(); auto txn = sess->GetInsertTransaction(); StorageTPInsertInterface gii(txn); From ebe5e12dc7d7beaf62dc883abb1eedbb3633b7a1 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 12 Mar 2026 15:32:10 +0800 Subject: [PATCH 19/19] fix avg width calculation --- include/neug/storages/graph/vertex_table.h | 2 +- include/neug/utils/growth.h | 27 +++++++--------------- include/neug/utils/mmap_array.h | 17 ++++++++++++++ include/neug/utils/property/column.h | 2 +- src/storages/graph/edge_table.cc | 7 +++--- src/storages/graph/property_graph.cc | 19 ++++++++------- 6 files changed, 39 insertions(+), 35 deletions(-) diff --git a/include/neug/storages/graph/vertex_table.h b/include/neug/storages/graph/vertex_table.h index b76b4d786..391a69f42 100644 --- a/include/neug/storages/graph/vertex_table.h +++ b/include/neug/storages/graph/vertex_table.h @@ -295,7 +295,7 @@ class VertexTable { columns.erase(columns.begin() + ind); size_t new_size = indexer_.size() + pk_array->length(); while (new_size >= Capacity()) { - EnsureCapacity(calculate_new_capacity(new_size, true, false)); + EnsureCapacity(calculate_new_capacity(new_size, true)); } auto vids = insert_primary_keys(pk_array); diff --git a/include/neug/utils/growth.h b/include/neug/utils/growth.h index e9c888ec1..4064c6b11 100644 --- a/include/neug/utils/growth.h +++ b/include/neug/utils/growth.h @@ -19,31 +19,20 @@ #include #include namespace neug { -inline size_t calculate_new_capacity(size_t current_size, bool is_vertex_table, - bool on_dump_or_open) { +inline size_t calculate_new_capacity(size_t current_size, + bool is_vertex_table) { if (current_size < 4096) { return 4096; // Start with a reasonable default capacity. } static constexpr size_t MAX_CAPACITY = std::numeric_limits::max(); if (is_vertex_table) { - if (on_dump_or_open) { - // plus 1/4 - return current_size <= MAX_CAPACITY - current_size / 4 - ? current_size + current_size / 4 - : MAX_CAPACITY; - } else { - return current_size <= MAX_CAPACITY / 2 ? current_size * 2 : MAX_CAPACITY; - } + return current_size <= MAX_CAPACITY - current_size / 4 + ? current_size + current_size / 4 + : MAX_CAPACITY; } else { - if (on_dump_or_open) { - // For edge tables, we grow linearly: new capacity = current capacity + - // (current capacity + 4) / 5. - return current_size <= MAX_CAPACITY - (current_size + 4) / 5 - ? current_size + (current_size + 4) / 5 - : MAX_CAPACITY; - } else { - return current_size <= MAX_CAPACITY / 2 ? current_size * 2 : MAX_CAPACITY; - } + return current_size <= MAX_CAPACITY - (current_size + 4) / 5 + ? current_size + (current_size + 4) / 5 + : MAX_CAPACITY; } } diff --git a/include/neug/utils/mmap_array.h b/include/neug/utils/mmap_array.h index 0c041c16c..db28b456c 100644 --- a/include/neug/utils/mmap_array.h +++ b/include/neug/utils/mmap_array.h @@ -530,6 +530,23 @@ class mmap_array { data_.resize(data_size); } + size_t avg_size() const { + if (items_.size() == 0) { + return 0; + } + size_t total_length = 0; + size_t non_zero_count = 0; + for (size_t i = 0; i < items_.size(); ++i) { + if (items_.get(i).length > 0) { + ++non_zero_count; + total_length += items_.get(i).length; + } + } + return non_zero_count > 0 + ? (total_length + non_zero_count - 1) / non_zero_count + : 0; + } + void set(size_t idx, size_t offset, const std::string_view& val) { items_.set(idx, {offset, static_cast(val.size())}); assert(data_.data() + offset + val.size() <= data_.data() + data_.size()); diff --git a/include/neug/utils/property/column.h b/include/neug/utils/property/column.h index 6dad64959..0b85bb162 100644 --- a/include/neug/utils/property/column.h +++ b/include/neug/utils/property/column.h @@ -364,7 +364,7 @@ class TypedColumn : public ColumnBase { size_ = size; if (buffer_.size() != 0) { size_t avg_width = - (buffer_.data_size() + buffer_.size() - 1) / buffer_.size(); + buffer_.avg_size(); // calculate average width of existing strings buffer_.resize( size_, std::max(size_ * (avg_width > 0 ? avg_width : STRING_DEFAULT_MAX_LENGTH), diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index f8bb72dd2..ee2aed0e8 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -881,7 +881,7 @@ void EdgeTable::BatchAddEdges(const IndexerType& src_indexer, filterInvalidEdges(src_lid, dst_lid); size_t new_size = table_idx_.load() + src_lid.size(); if (new_size >= capacity_.load()) { - EnsureCapacity(calculate_new_capacity(new_size, false, false)); + EnsureCapacity(calculate_new_capacity(new_size, false)); } if (meta_->is_bundled()) { auto edges = extract_bundled_edge_data_from_batches(meta_, data_batches, @@ -904,7 +904,7 @@ void EdgeTable::BatchAddEdges( const std::vector>& edge_data_list) { size_t new_size = table_idx_.load() + src_lid_list.size(); if (new_size >= capacity_.load()) { - EnsureCapacity(calculate_new_capacity(new_size, false, false)); + EnsureCapacity(calculate_new_capacity(new_size, false)); } if (meta_->is_bundled()) { std::vector flat_edge_data; @@ -1021,8 +1021,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (prev_data_col && prev_data_col->size() > 0) { table_->resize(prev_data_col->size()); table_idx_.store(prev_data_col->size()); - EnsureCapacity( - calculate_new_capacity(prev_data_col->size(), false, false)); + EnsureCapacity(calculate_new_capacity(prev_data_col->size(), false)); } } else { // delete_property == true, which means the EdgeTable will become use csr of diff --git a/src/storages/graph/property_graph.cc b/src/storages/graph/property_graph.cc index c0d498aed..28667b2e7 100644 --- a/src/storages/graph/property_graph.cc +++ b/src/storages/graph/property_graph.cc @@ -121,7 +121,7 @@ Status PropertyGraph::EnsureCapacity(label_t v_label, size_t capacity) { if (capacity == 0) { auto old_size = vertex_tables_[v_label].Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_size, true, false); + capacity = neug::calculate_new_capacity(old_size, true); } } if (capacity <= old_cap) { @@ -172,7 +172,7 @@ Status PropertyGraph::EnsureCapacity(label_t src_label, label_t dst_label, if (capacity == 0) { size_t old_size = edge_tables_.at(index).Size(); if (old_size >= old_cap) { - capacity = neug::calculate_new_capacity(old_size, false, false); + capacity = neug::calculate_new_capacity(old_size, false); } } if (capacity <= old_cap) { @@ -900,7 +900,7 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { // satisfied Case 2: Open from empty, Capacity should be the default minimum // capacity(4096) vertex_tables_[i].EnsureCapacity( - calculate_new_capacity(vertex_tables_[i].Size(), true, true)); + calculate_new_capacity(vertex_tables_[i].Size(), true)); vertex_capacities[i] = vertex_tables_[i].Capacity(); } @@ -944,7 +944,7 @@ void PropertyGraph::Open(const std::string& work_dir, int memory_level) { vertex_capacities[dst_label_i]); } edge_table.EnsureCapacity( - calculate_new_capacity(edge_table.Size(), false, true)); + calculate_new_capacity(edge_table.Size(), false)); // TODO(zhanglei): Any better way to resize for memory level 0? if (memory_level_ == 0) { edge_table.Resize(vertex_capacities[src_label_i], @@ -1097,8 +1097,8 @@ void PropertyGraph::Dump(bool reopen) { << ", vertex num: " << vertex_num[i] << ", capacity: " << vertex_tables_[i].Capacity() << ", new capacity: " - << calculate_new_capacity(vertex_num[i], true, true); - EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true, true)); + << calculate_new_capacity(vertex_num[i], true); + EnsureCapacity(i, calculate_new_capacity(vertex_num[i], true)); vertex_capacity[i] = vertex_tables_[i].Capacity(); vertex_tables_[i].Dump(target_dir); } @@ -1140,10 +1140,9 @@ void PropertyGraph::Dump(bool reopen) { << ", edge num: " << edge_table.EdgeNum() << ", capacity: " << edge_table.Capacity() << ", new capacity: " - << calculate_new_capacity(edge_table.Size(), false, true); - EnsureCapacity( - src_label_i, dst_label_i, e_label_i, - calculate_new_capacity(edge_table.Size(), false, true)); + << calculate_new_capacity(edge_table.Size(), false); + EnsureCapacity(src_label_i, dst_label_i, e_label_i, + calculate_new_capacity(edge_table.Size(), false)); edge_table.Dump(target_dir); } }