Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/neug/storages/csr/csr_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ enum class CsrType {

class CsrBase {
public:
static constexpr size_t INFINITE_CAPACITY =
std::numeric_limits<size_t>::max();
CsrBase() = default;
virtual ~CsrBase() = default;

Expand Down Expand Up @@ -66,6 +68,8 @@ class CsrBase {

virtual void resize(vid_t vnum) = 0;

virtual size_t capacity() const = 0;

virtual void close() = 0;

virtual void batch_sort_by_edge_data(timestamp_t ts) {
Expand Down
4 changes: 4 additions & 0 deletions include/neug/storages/csr/immutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class ImmutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -176,6 +178,8 @@ class SingleImmutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down
6 changes: 6 additions & 0 deletions include/neug/storages/csr/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class MutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -252,6 +254,8 @@ class SingleMutableCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override;

size_t capacity() const override;

void close() override;

void batch_sort_by_edge_data(timestamp_t ts) override;
Expand Down Expand Up @@ -336,6 +340,8 @@ class EmptyCsr : public TypedCsrBase<EDATA_T> {

void resize(vid_t vnum) override {}

size_t capacity() const override { return 0; }

void close() override {}

void batch_sort_by_edge_data(timestamp_t ts) override {}
Expand Down
10 changes: 10 additions & 0 deletions include/neug/storages/file_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,14 @@ inline std::string wal_ingest_allocator_prefix(const std::string& work_dir,
std::to_string(thread_id) + "_";
}

inline std::string statistics_file_prefix(const std::string& v_label) {
return "statistics_" + v_label;
}

inline std::string statistics_file_prefix(const std::string& src_label,
const std::string& dst_label,
const std::string& edge_label) {
return "statistics_" + src_label + "_" + edge_label + "_" + dst_label;
}

} // namespace neug
31 changes: 31 additions & 0 deletions include/neug/storages/graph/edge_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class EdgeTable {

void Resize(vid_t src_vertex_num, vid_t dst_vertex_num);

void EnsureCapacity(size_t capacity);

size_t EdgeNum() const;

size_t PropertyNum() const;
Expand Down Expand Up @@ -128,6 +130,34 @@ class EdgeTable {

void Compact(bool compact_csr, bool sort_on_compaction, timestamp_t ts);

inline size_t Size() const {
if (meta_->is_bundled()) {
if (out_csr_) {
return out_csr_->edge_num();
} else if (in_csr_) {
return in_csr_->edge_num();
} else {
THROW_RUNTIME_ERROR("both csr are null");
}
}
// TODO(zhanglei): the size may be inaccurate if some edges are deleted but
// not compacted yet.
return table_idx_.load();
}

inline size_t Capacity() const {
if (meta_->is_bundled()) {
if (out_csr_) {
return out_csr_->capacity();
} else if (in_csr_) {
return in_csr_->capacity();
} else {
THROW_RUNTIME_ERROR("both csr are null");
}
}
return capacity_.load();
}

private:
void dropAndCreateNewBundledCSR();
void dropAndCreateNewUnbundledCSR(bool delete_property);
Expand All @@ -141,6 +171,7 @@ class EdgeTable {
std::unique_ptr<CsrBase> in_csr_;
std::unique_ptr<Table> table_;
std::atomic<uint64_t> table_idx_{0};
std::atomic<uint64_t> capacity_{0};

friend class PropertyGraph;
};
Expand Down
9 changes: 9 additions & 0 deletions include/neug/storages/graph/property_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class PropertyGraph {

void Compact(bool compact_csr, float reserve_ratio, timestamp_t ts);

/**
* @brief Dump the current graph state to persistent storage.
* @param reopen If true, reopens the graph after dumping (default: true)
*/
void Dump(bool reopen = true);

/**
Expand Down Expand Up @@ -298,6 +302,11 @@ class PropertyGraph {

Status Reserve(label_t v_label, vid_t vertex_reserve_size);

Status EnsureCapacity(label_t v_label, size_t capacity = 0);

Status EnsureCapacity(label_t src_label, label_t dst_label,
label_t edge_label, size_t capacity = 0);

Status BatchAddVertices(label_t v_label_id,
std::shared_ptr<IRecordBatchSupplier> supplier);

Expand Down
15 changes: 9 additions & 6 deletions include/neug/storages/graph/vertex_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "neug/storages/graph/vertex_timestamp.h"
#include "neug/storages/loader/loader_utils.h"
#include "neug/utils/arrow_utils.h"
#include "neug/utils/growth.h"
#include "neug/utils/indexers.h"
#include "neug/utils/property/table.h"

Expand Down Expand Up @@ -111,8 +112,7 @@ class VertexTable {
std::swap(work_dir_, other.work_dir_);
}

void Open(const std::string& work_dir, int memory_level,
bool build_empty_graph = false);
void Open(const std::string& work_dir, int memory_level);

void Dump(const std::string& target_dir);

Expand All @@ -122,6 +122,8 @@ class VertexTable {

void Reserve(size_t cap);

size_t EnsureCapacity(size_t capacity);

bool is_dropped() const { return table_ == nullptr; }

bool get_index(const Property& oid, vid_t& lid,
Expand All @@ -144,6 +146,8 @@ class VertexTable {
// Capacity of the vertex table
inline size_t Capacity() const { return indexer_.capacity(); }

inline size_t Size() const { return indexer_.size(); }

bool IsValidLid(vid_t lid, timestamp_t ts = MAX_TIMESTAMP) const;

IndexerType& get_indexer() { return indexer_; }
Expand Down Expand Up @@ -289,11 +293,10 @@ class VertexTable {
auto ind = std::get<2>(vertex_schema_->primary_keys[0]);
auto pk_array = columns[ind];
columns.erase(columns.begin() + ind);
auto cur_size = indexer_.capacity();
while (cur_size < indexer_.size() + pk_array->length()) {
cur_size = std::max(16, 2 * static_cast<int>(cur_size));
size_t new_size = indexer_.size() + pk_array->length();
while (new_size >= Capacity()) {
EnsureCapacity(calculate_new_capacity(new_size, true));
}
Reserve(cur_size);

auto vids = insert_primary_keys<PK_T>(pk_array);

Expand Down
12 changes: 12 additions & 0 deletions include/neug/utils/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ void copy_directory(const std::string& src, const std::string& dst,

void remove_directory(const std::string& dir_path);

void write_file(const std::string& filename, const void* buffer, size_t size,
size_t num);

void read_file(const std::string& filename, void* buffer, size_t size,
size_t num);

void write_statistic_file(const std::string& file_path, size_t capacity,
size_t size);

void read_statistic_file(const std::string& file_path, size_t& capacity,
size_t& size);

} // namespace neug
39 changes: 39 additions & 0 deletions include/neug/utils/growth.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include <cstddef>
#include <cstdint>
#include <limits>
namespace neug {
inline size_t calculate_new_capacity(size_t current_size,
bool is_vertex_table) {
if (current_size < 4096) {
return 4096; // Start with a reasonable default capacity.
}
static constexpr size_t MAX_CAPACITY = std::numeric_limits<size_t>::max();
if (is_vertex_table) {
return current_size <= MAX_CAPACITY - current_size / 4
? current_size + current_size / 4
: MAX_CAPACITY;
} else {
return current_size <= MAX_CAPACITY - (current_size + 4) / 5
? current_size + (current_size + 4) / 5
: MAX_CAPACITY;
}
}

} // namespace neug
9 changes: 0 additions & 9 deletions include/neug/utils/id_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ class LFIndexer {
keys_->open(name + ".keys", "", data_dir);
indices_.open(data_dir + "/" + name + ".indices", false);
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));

indices_size_ = indices_.size();
}
Expand All @@ -462,9 +461,6 @@ class LFIndexer {
LOG(INFO) << "Open indices file in "
<< tmp_dir(work_dir) + "/" + name + ".indices";
indices_.open(tmp_dir(work_dir) + "/" + name + ".indices", true);
size_t num_elements = num_elements_.load();

keys_->resize(num_elements + (num_elements >> 2));

indices_size_ = indices_.size();
}
Expand All @@ -478,8 +474,6 @@ class LFIndexer {
keys_->open_in_memory(name + ".keys");
indices_.open(name + ".indices", false);
indices_size_ = indices_.size();
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));
}

void open_with_hugepages(const std::string& name, bool hugepage_table) {
Expand All @@ -495,12 +489,9 @@ class LFIndexer {
indices_.open(name + ".indices", false);
}
indices_size_ = indices_.size();
size_t num_elements = num_elements_.load();
keys_->resize(num_elements + (num_elements >> 2));
}

void dump(const std::string& name, const std::string& snapshot_dir) {
keys_->resize(num_elements_.load());
keys_->dump(snapshot_dir + "/" + name + ".keys");
indices_.dump(snapshot_dir + "/" + name + ".indices");
dump_meta(snapshot_dir + "/" + name + ".meta");
Expand Down
44 changes: 41 additions & 3 deletions include/neug/utils/mmap_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <assert.h>
#include <sys/mman.h>
#include <unistd.h>

#include <algorithm>
#include <atomic>
Expand Down Expand Up @@ -523,10 +524,29 @@ class mmap_array<std::string_view> {
}

void resize(size_t size, size_t data_size) {
LOG(INFO) << "Resizing mmap_array " << this << " to size: " << size
<< ", data_size: " << data_size;
items_.resize(size);
data_.resize(data_size);
}

size_t avg_size() const {
if (items_.size() == 0) {
return 0;
}
size_t total_length = 0;
size_t non_zero_count = 0;
for (size_t i = 0; i < items_.size(); ++i) {
if (items_.get(i).length > 0) {
++non_zero_count;
total_length += items_.get(i).length;
}
}
return non_zero_count > 0
? (total_length + non_zero_count - 1) / non_zero_count
: 0;
}

void set(size_t idx, size_t offset, const std::string_view& val) {
items_.set(idx, {offset, static_cast<uint32_t>(val.size())});
assert(data_.data() + offset + val.size() <= data_.data() + data_.size());
Expand Down Expand Up @@ -564,7 +584,9 @@ class mmap_array<std::string_view> {

// Compact the data buffer by removing unused space and updating offsets
// This is an in-place operation that shifts valid string data forward
// Returns the compacted data size
// Returns the compacted data size. Note that the reserved size of data buffer
// is not changed, and new strings can still be appended after the compacted
// data.
size_t compact() {
auto plan = prepare_compaction_plan();
if (items_.size() == 0) {
Expand All @@ -577,9 +599,11 @@ class mmap_array<std::string_view> {

std::vector<char> temp_buf(plan.total_size);
size_t write_offset = 0;
size_t limit_offset = 0;
for (const auto& entry : plan.entries) {
const char* src = data_.data() + entry.offset;
char* dst = temp_buf.data() + write_offset;
limit_offset = std::max(limit_offset, entry.offset + entry.length);
memcpy(dst, src, entry.length);
items_.set(entry.index,
{static_cast<uint64_t>(write_offset), entry.length});
Expand All @@ -588,9 +612,8 @@ class mmap_array<std::string_view> {
assert(write_offset == plan.total_size);
memcpy(data_.data(), temp_buf.data(), plan.total_size);

data_.resize(plan.total_size);
VLOG(1) << "Compaction completed. New data size: " << plan.total_size
<< ", old data size: " << size_before_compact;
<< ", old data size: " << limit_offset;
return plan.total_size;
}

Expand Down Expand Up @@ -654,6 +677,21 @@ class mmap_array<std::string_view> {
LOG(ERROR) << ss.str();
THROW_RUNTIME_ERROR(ss.str());
}
int fd = fileno(fout);
if (fd == -1) {
std::stringstream ss;
ss << "Failed to get file descriptor for [ " << data_filename << " ], "
<< strerror(errno);
LOG(ERROR) << ss.str();
THROW_RUNTIME_ERROR(ss.str());
}
if (ftruncate(fd, size_before_compact) != 0) {
std::stringstream ss;
ss << "Failed to ftruncate file [ " << data_filename << " ], "
<< strerror(errno);
LOG(ERROR) << ss.str();
THROW_RUNTIME_ERROR(ss.str());
}
if (fclose(fout) != 0) {
std::stringstream ss;
ss << "Failed to fclose file [ " << data_filename << " ], "
Expand Down
Loading
Loading