diff --git a/include/neug/storages/graph/edge_table.h b/include/neug/storages/graph/edge_table.h index a5a68cab..3464d7a2 100644 --- a/include/neug/storages/graph/edge_table.h +++ b/include/neug/storages/graph/edge_table.h @@ -129,7 +129,7 @@ class EdgeTable { void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts); private: - void dropAndCreateNewBundledCSR(); + void dropAndCreateNewBundledCSR(std::shared_ptr prev_data_col); void dropAndCreateNewUnbundledCSR(bool delete_property); std::string get_next_csr_path_suffix(); diff --git a/src/storages/graph/edge_table.cc b/src/storages/graph/edge_table.cc index be1b41b1..f8c993c8 100644 --- a/src/storages/graph/edge_table.cc +++ b/src/storages/graph/edge_table.cc @@ -151,12 +151,52 @@ void batch_put_edges_with_default_edata(const std::vector& src_lid, case DataTypeId::kEmpty: batch_put_edges_with_default_edata_impl(src_lid, dst_lid, EmptyType(), out_csr); + break; default: THROW_NOT_SUPPORTED_EXCEPTION("not support edge data type: " + std::to_string(property_type)); } } +void batch_put_edges_with_edata(const std::vector& src_lid, + const std::vector& dst_lid, + DataTypeId property_type, + const std::vector& edge_data, + CsrBase* out_csr) { + switch (property_type) { +#define TYPE_DISPATCHER(enum_val, type) \ + case DataTypeId::enum_val: { \ + std::vector typed_data; \ + typed_data.reserve(edge_data.size()); \ + for (const auto& prop : edge_data) { \ + typed_data.emplace_back(PropUtils::to_typed(prop)); \ + } \ + dynamic_cast*>(out_csr)->batch_put_edges( \ + src_lid, dst_lid, typed_data); \ + break; \ + } + TYPE_DISPATCHER(kBoolean, bool); + TYPE_DISPATCHER(kInt32, int32_t); + TYPE_DISPATCHER(kUInt32, uint32_t); + TYPE_DISPATCHER(kInt64, int64_t); + TYPE_DISPATCHER(kUInt64, uint64_t); + TYPE_DISPATCHER(kFloat, float); + TYPE_DISPATCHER(kDouble, double); + TYPE_DISPATCHER(kDate, Date); + TYPE_DISPATCHER(kTimestampMs, DateTime); + TYPE_DISPATCHER(kInterval, Interval); +#undef TYPE_DISPATCHER + case DataTypeId::kEmpty: { + dynamic_cast*>(out_csr)->batch_put_edges( + src_lid, dst_lid, {}); + break; + } + default: + LOG(FATAL) << "Unsupported edge property type " + << static_cast(property_type); + } +} + template std::unique_ptr create_csr_impl(bool is_mutable, EdgeStrategy strategy) { @@ -729,7 +769,7 @@ void EdgeTable::AddProperties(const std::vector& prop_names, // NOTE: Rather than check meta_->is_bundled(),we check whether the table // is empty. if (meta_->properties.size() == 1) { - dropAndCreateNewBundledCSR(); + dropAndCreateNewBundledCSR(nullptr); } else { dropAndCreateNewUnbundledCSR(false); } @@ -770,6 +810,12 @@ void EdgeTable::DeleteProperties(const std::vector& col_names) { table_->delete_column(col); VLOG(1) << "delete column " << col; } + if (table_->col_num() == 0) { + dropAndCreateNewUnbundledCSR(true); + } else if (table_->col_num() == 1) { + auto remaining_col = table_->get_column_by_id(0); + dropAndCreateNewBundledCSR(remaining_col); + } } } @@ -782,9 +828,10 @@ int32_t EdgeTable::AddEdge(vid_t src_lid, vid_t dst_lid, (edge_data.size() == 0 && (meta_->properties.empty() || meta_->properties[0] == DataTypeId::kEmpty))); - in_csr_->put_generic_edge(dst_lid, src_lid, edge_data[0], ts, alloc); + Property bundled_data = edge_data.empty() ? Property() : edge_data[0]; + in_csr_->put_generic_edge(dst_lid, src_lid, bundled_data, ts, alloc); oe_offset = - out_csr_->put_generic_edge(src_lid, dst_lid, edge_data[0], ts, alloc); + out_csr_->put_generic_edge(src_lid, dst_lid, bundled_data, ts, alloc); } else { if (meta_->properties.size() != edge_data.size()) { THROW_INVALID_ARGUMENT_EXCEPTION( @@ -885,7 +932,9 @@ void EdgeTable::Compact(bool compact_csr, bool sort_on_compaction, in_csr_->reset_timestamp(); } -void EdgeTable::dropAndCreateNewBundledCSR() { +void EdgeTable::dropAndCreateNewBundledCSR( + std::shared_ptr remaining_col) { + CHECK(meta_->properties.size() == 1); auto suffix = get_next_csr_path_suffix(); std::string next_oe_csr_path = tmp_dir(work_dir_) + "/" + @@ -898,9 +947,7 @@ void EdgeTable::dropAndCreateNewBundledCSR() { meta_->edge_label_name) + suffix; - auto edges = out_csr_->batch_export(nullptr); std::unique_ptr new_out_csr, new_in_csr; - assert(meta_->properties.size() == 1); new_out_csr = create_csr(meta_->oe_mutable, meta_->oe_strategy, meta_->properties[0].id()); new_in_csr = create_csr(meta_->ie_mutable, meta_->ie_strategy, @@ -911,13 +958,36 @@ void EdgeTable::dropAndCreateNewBundledCSR() { new_out_csr->resize(out_csr_->size()); new_in_csr->resize(in_csr_->size()); - batch_put_edges_with_default_edata( - std::get<0>(edges), std::get<1>(edges), meta_->properties[0].id(), - meta_->default_property_values[0], new_out_csr.get()); - batch_put_edges_with_default_edata( - std::get<1>(edges), std::get<0>(edges), meta_->properties[0].id(), - meta_->default_property_values[0], new_in_csr.get()); + if (remaining_col == nullptr) { + auto edges = out_csr_->batch_export(nullptr); + batch_put_edges_with_default_edata( + std::get<0>(edges), std::get<1>(edges), meta_->properties[0].id(), + meta_->default_property_values[0], new_out_csr.get()); + batch_put_edges_with_default_edata( + std::get<1>(edges), std::get<0>(edges), meta_->properties[0].id(), + meta_->default_property_values[0], new_in_csr.get()); + } else { + auto row_id_col = std::make_shared(0, StorageStrategy::kMem); + auto edges = out_csr_->batch_export(row_id_col); + std::vector remaining_data; + remaining_data.reserve(row_id_col->size()); + for (size_t i = 0; i < row_id_col->size(); ++i) { + auto row_id = row_id_col->get_view(i); + CHECK_LT(row_id, remaining_col->size()); + remaining_data.emplace_back(remaining_col->get_prop(row_id)); + } + batch_put_edges_with_edata(std::get<0>(edges), std::get<1>(edges), + meta_->properties[0].id(), remaining_data, + new_out_csr.get()); + batch_put_edges_with_edata(std::get<1>(edges), std::get<0>(edges), + meta_->properties[0].id(), remaining_data, + new_in_csr.get()); + } + table_->drop(); + table_ = std::make_unique(); + table_idx_.store(0); + capacity_.store(0); out_csr_->close(); in_csr_->close(); out_csr_ = std::move(new_out_csr); @@ -955,9 +1025,20 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { if (table_->col_num() >= 1) { prev_data_col = table_->get_column_by_id(0); } + } else { + // delete_property == true, which means the EdgeTable will become use csr of + // empty type. we need to reset capacity and table_idx to 0 + table_idx_.store(0); + capacity_.store(0); } auto edges = out_csr_->batch_export(prev_data_col); + if (!delete_property) { + size_t edge_num = std::get<0>(edges).size(); + table_idx_.store(edge_num); + capacity_.store(calculate_new_capacity(edge_num, false)); + table_->resize(capacity_.load()); + } // Set default value for other columns for (size_t col_id = 1; col_id < table_->col_num(); ++col_id) { auto col = table_->get_column_by_id(col_id); @@ -970,7 +1051,7 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) { VLOG(10) << "Set default value for column " << col_id << ": " << default_value.to_string() << ", type: " << std::to_string(default_value.type()); - for (size_t row = 0; row < col->size(); ++row) { + for (size_t row = 0; row < std::get<0>(edges).size(); ++row) { col->set_any(row, default_value); } } diff --git a/tests/storage/test_edge_table.cc b/tests/storage/test_edge_table.cc index 42b385fb..da6a2047 100644 --- a/tests/storage/test_edge_table.cc +++ b/tests/storage/test_edge_table.cc @@ -21,11 +21,31 @@ #include "neug/storages/graph/edge_table.h" #include "neug/storages/loader/loader_utils.h" #include "neug/utils/allocators.h" +#include "neug/utils/growth.h" #include "unittest/utils.h" namespace neug { namespace test { +class LocalGeneratedRecordBatchSupplier : public neug::IRecordBatchSupplier { + public: + explicit LocalGeneratedRecordBatchSupplier( + std::vector>&& batches) + : batches_(std::move(batches)) {} + + std::shared_ptr GetNextBatch() override { + if (batches_.empty()) { + return nullptr; + } + auto batch = batches_.back(); + batches_.pop_back(); + return batch; + } + + private: + std::vector> batches_; +}; + class EdgeTableTest : public ::testing::Test { protected: void SetUp() override { @@ -51,6 +71,10 @@ class EdgeTableTest : public ::testing::Test { "comment", {}, {}, {std::make_tuple(neug::DataTypeId::kInt64, "id", 0)}, {neug::StorageStrategy::kMem}, static_cast(1) << 32, "comment vertex label"); + schema_.AddEdgeLabel("person", "comment", "create0", {}, {}, {}, + neug::EdgeStrategy::kMultiple, + neug::EdgeStrategy::kMultiple, true, true, false, + "person creates comment edge without properties"); schema_.AddEdgeLabel( "person", "comment", "create1", {neug::DataTypeId::kInt32}, {"data"}, {neug::StorageStrategy::kMem}, neug::EdgeStrategy::kMultiple, @@ -70,6 +94,7 @@ class EdgeTableTest : public ::testing::Test { true, false, "person creates comment edge with two properties"); src_label_ = schema_.get_vertex_label_id("person"); dst_label_ = schema_.get_vertex_label_id("comment"); + edge_label_empty_ = schema_.get_edge_label_id("create0"); edge_label_int_ = schema_.get_edge_label_id("create1"); edge_label_str_ = schema_.get_edge_label_id("create2"); edge_label_str_int_ = schema_.get_edge_label_id("create3"); @@ -123,7 +148,7 @@ class EdgeTableTest : public ::testing::Test { void BatchInsert(std::vector>&& batches) { auto supplier = - std::make_shared(std::move(batches)); + std::make_shared(std::move(batches)); edge_table->BatchAddEdges(src_indexer, dst_indexer, supplier); } @@ -207,8 +232,8 @@ class EdgeTableTest : public ::testing::Test { neug::LFIndexer src_indexer; neug::LFIndexer dst_indexer; neug::Schema schema_; - neug::label_t src_label_, dst_label_, edge_label_int_, edge_label_str_, - edge_label_str_int_; + neug::label_t src_label_, dst_label_, edge_label_empty_, edge_label_int_, + edge_label_str_, edge_label_str_int_; std::string allocator_dir_; private: @@ -1082,6 +1107,232 @@ TEST_F(EdgeTableTest, TestUpdateEdgeData) { } } +TEST_F(EdgeTableTest, TestAddPropertiesTransitionFromEmptyAndBundled) { + this->InitIndexers(4, 4); + this->ConstructEdgeTable(src_label_, dst_label_, edge_label_empty_); + this->OpenEdgeTableInMemory(4, 4); + + std::vector> endpoints = {{0, 1}, {1, 2}, {2, 3}}; + std::vector src_list, dst_list; + for (const auto& [src_oid, dst_oid] : endpoints) { + src_list.emplace_back(src_oid); + dst_list.emplace_back(dst_oid); + } + auto src_arrs = convert_to_arrow_arrays(src_list, src_list.size()); + auto dst_arrs = convert_to_arrow_arrays(dst_list, dst_list.size()); + auto batches = + convert_to_record_batches({"src", "dst"}, {src_arrs, dst_arrs}); + this->BatchInsert(std::move(batches)); + + EXPECT_EQ(this->edge_table->Size(), endpoints.size()); + EXPECT_EQ(this->edge_table->Capacity(), this->src_indexer.size()); + + schema_.AddEdgeProperties("person", "comment", "create0", {"weight"}, + {neug::DataTypeId::kInt32}, + {neug::Property::from_int32(7)}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_empty_)); + this->edge_table->AddProperties({"weight"}, {neug::DataTypeId::kInt32}, + {neug::Property::from_int32(7)}); + + std::vector srcs, dsts; + this->OutputOutgoingEndpoints(srcs, dsts, neug::MAX_TIMESTAMP); + ASSERT_EQ(srcs.size(), endpoints.size()); + std::vector weights; + this->OutputOutgoingEdgeData(weights, neug::MAX_TIMESTAMP, 0); + ASSERT_EQ(weights.size(), endpoints.size()); + for (auto weight : weights) { + EXPECT_EQ(weight, 7); + } + EXPECT_EQ(this->edge_table->Size(), endpoints.size()); + EXPECT_EQ(this->edge_table->Capacity(), this->src_indexer.size()); + + schema_.AddEdgeProperties("person", "comment", "create0", {"tag"}, + {neug::DataTypeId::kVarchar}, + {neug::Property::from_string_view("new-tag")}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_empty_)); + this->edge_table->AddProperties( + {"tag"}, {neug::DataTypeId::kVarchar}, + {neug::Property::from_string_view("new-tag")}); + + std::vector weights_after; + std::vector tags; + this->OutputOutgoingEdgeData(weights_after, neug::MAX_TIMESTAMP, 0); + this->OutputOutgoingEdgeData(tags, neug::MAX_TIMESTAMP, 1); + ASSERT_EQ(weights_after.size(), endpoints.size()); + ASSERT_EQ(tags.size(), endpoints.size()); + for (size_t i = 0; i < endpoints.size(); ++i) { + EXPECT_EQ(weights_after[i], 7); + EXPECT_EQ(tags[i], "new-tag"); + } + EXPECT_EQ(this->edge_table->Size(), endpoints.size()); + EXPECT_EQ(this->edge_table->Capacity(), + neug::calculate_new_capacity(endpoints.size(), false)); +} + +TEST_F(EdgeTableTest, TestAddStringPropertyTransitionFromEmptyToBundled) { + this->InitIndexers(4, 4); + this->ConstructEdgeTable(src_label_, dst_label_, edge_label_empty_); + this->OpenEdgeTableInMemory(4, 4); + + std::vector src_list = {0, 1, 2}; + std::vector dst_list = {1, 2, 3}; + auto src_arrs = convert_to_arrow_arrays(src_list, src_list.size()); + auto dst_arrs = convert_to_arrow_arrays(dst_list, dst_list.size()); + auto batches = + convert_to_record_batches({"src", "dst"}, {src_arrs, dst_arrs}); + this->BatchInsert(std::move(batches)); + + schema_.AddEdgeProperties("person", "comment", "create0", {"tag"}, + {neug::DataTypeId::kVarchar}, + {neug::Property::from_string_view("seed")}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_empty_)); + this->edge_table->AddProperties({"tag"}, {neug::DataTypeId::kVarchar}, + {neug::Property::from_string_view("seed")}); + + std::vector tags; + this->OutputOutgoingEdgeData(tags, neug::MAX_TIMESTAMP, 0); + ASSERT_EQ(tags.size(), src_list.size()); + for (auto tag : tags) { + EXPECT_EQ(tag, "seed"); + } + EXPECT_EQ(this->edge_table->Size(), src_list.size()); + EXPECT_EQ(this->edge_table->Capacity(), this->src_indexer.size()); +} + +TEST_F(EdgeTableTest, TestDeletePropertiesTransitionFromUnbundledToBundled) { + this->InitIndexers(4, 4); + this->ConstructEdgeTable(src_label_, dst_label_, edge_label_str_int_); + this->OpenEdgeTableInMemory(4, 4); + this->edge_table->Resize(this->src_indexer.size(), this->dst_indexer.size()); + + std::vector> input = { + {0, 1, "a", 11}, {1, 2, "b", 22}, {2, 3, "c", 33}}; + neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); + for (const auto& [src_oid, dst_oid, data0, data1] : input) { + this->edge_table->AddEdge( + this->GetSrcLid(neug::Property::from_int64(src_oid)), + this->GetDstLid(neug::Property::from_int64(dst_oid)), + {neug::Property::from_string_view(data0), + neug::Property::from_int32(data1)}, + 0, allocator); + } + + EXPECT_EQ(this->edge_table->Size(), input.size()); + EXPECT_EQ(this->edge_table->Capacity(), 4096); + + this->edge_table->DeleteProperties({"data1"}); + schema_.DeleteEdgeProperties("person", "comment", "create3", {"data1"}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_str_int_)); + + std::vector srcs, dsts; + this->OutputOutgoingEndpoints(srcs, dsts, neug::MAX_TIMESTAMP); + std::vector remaining_prop; + this->OutputOutgoingEdgeData(remaining_prop, + neug::MAX_TIMESTAMP, 0); + ASSERT_EQ(srcs.size(), input.size()); + ASSERT_EQ(remaining_prop.size(), input.size()); + std::vector> output; + for (size_t i = 0; i < srcs.size(); ++i) { + output.emplace_back(srcs[i], dsts[i], std::string(remaining_prop[i])); + } + std::sort(output.begin(), output.end()); + std::vector> expected; + for (const auto& [src_oid, dst_oid, data0, data1] : input) { + expected.emplace_back(src_oid, dst_oid, data0); + } + std::sort(expected.begin(), expected.end()); + EXPECT_EQ(output, expected); + EXPECT_EQ(this->edge_table->Size(), input.size()); + EXPECT_EQ(this->edge_table->Capacity(), this->src_indexer.size()); + + this->edge_table->DeleteProperties({"data0"}); + schema_.DeleteEdgeProperties("person", "comment", "create3", {"data0"}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_str_int_)); + + srcs.clear(); + dsts.clear(); + this->OutputOutgoingEndpoints(srcs, dsts, neug::MAX_TIMESTAMP); + ASSERT_EQ(srcs.size(), input.size()); + ASSERT_EQ(dsts.size(), input.size()); + EXPECT_EQ(this->edge_table->Size(), input.size()); + EXPECT_EQ(this->edge_table->Capacity(), this->src_indexer.size()); + + this->edge_table->AddEdge(this->GetSrcLid(neug::Property::from_int64(3)), + this->GetDstLid(neug::Property::from_int64(0)), {}, + 0, allocator); + srcs.clear(); + dsts.clear(); + this->OutputOutgoingEndpoints(srcs, dsts, neug::MAX_TIMESTAMP); + ASSERT_EQ(srcs.size(), input.size() + 1); + ASSERT_EQ(dsts.size(), input.size() + 1); +} + +TEST_F(EdgeTableTest, TestAddAndDeletePropertiesStayUnbundled) { + this->InitIndexers(4, 4); + this->ConstructEdgeTable(src_label_, dst_label_, edge_label_str_int_); + this->OpenEdgeTableInMemory(4, 4); + this->edge_table->Resize(this->src_indexer.size(), this->dst_indexer.size()); + + std::vector> input = { + {0, 1, "a", 11}, {1, 2, "b", 22}, {2, 3, "c", 33}}; + neug::Allocator allocator(neug::MemoryStrategy::kMemoryOnly, allocator_dir_); + for (const auto& [src_oid, dst_oid, data0, data1] : input) { + this->edge_table->AddEdge( + this->GetSrcLid(neug::Property::from_int64(src_oid)), + this->GetDstLid(neug::Property::from_int64(dst_oid)), + {neug::Property::from_string_view(data0), + neug::Property::from_int32(data1)}, + 0, allocator); + } + + schema_.AddEdgeProperties("person", "comment", "create3", {"score"}, + {neug::DataTypeId::kInt32}, + {neug::Property::from_int32(99)}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_str_int_)); + this->edge_table->AddProperties({"score"}, {neug::DataTypeId::kInt32}, + {neug::Property::from_int32(99)}); + + std::vector score; + this->OutputOutgoingEdgeData(score, neug::MAX_TIMESTAMP, 2); + ASSERT_EQ(score.size(), input.size()); + for (auto value : score) { + EXPECT_EQ(value, 99); + } + EXPECT_EQ(this->edge_table->Size(), input.size()); + EXPECT_EQ(this->edge_table->Capacity(), 4096); + + this->edge_table->DeleteProperties({"score"}); + schema_.DeleteEdgeProperties("person", "comment", "create3", {"score"}); + this->edge_table->SetEdgeSchema( + schema_.get_edge_schema(src_label_, dst_label_, edge_label_str_int_)); + + std::vector data0_after; + std::vector data1_after; + this->OutputOutgoingEdgeData(data0_after, + neug::MAX_TIMESTAMP, 0); + this->OutputOutgoingEdgeData(data1_after, neug::MAX_TIMESTAMP, 1); + ASSERT_EQ(data0_after.size(), input.size()); + ASSERT_EQ(data1_after.size(), input.size()); + std::vector> output; + std::vector srcs, dsts; + this->OutputOutgoingEndpoints(srcs, dsts, neug::MAX_TIMESTAMP); + for (size_t i = 0; i < srcs.size(); ++i) { + output.emplace_back(srcs[i], dsts[i], std::string(data0_after[i]), + data1_after[i]); + } + std::sort(output.begin(), output.end()); + std::sort(input.begin(), input.end()); + EXPECT_EQ(output, input); + EXPECT_EQ(this->edge_table->Size(), input.size()); + EXPECT_EQ(this->edge_table->Capacity(), 4096); +} + template struct TypePair { using EdType = EDATA_T; diff --git a/tests/storage/test_vertex_table.cc b/tests/storage/test_vertex_table.cc index 5bee40fd..d3a63d7d 100644 --- a/tests/storage/test_vertex_table.cc +++ b/tests/storage/test_vertex_table.cc @@ -811,4 +811,4 @@ TEST_F(VertexTableTest, VertexSetForeachVertex) { size_t count = 0; vset.foreach_vertex([&](neug::vid_t vid) { count++; }); EXPECT_EQ(count, 5); // Only odd lids are valid at ts=20 -} \ No newline at end of file +}