Skip to content

A thread-safe, in-memory hash store supporting concurrent fetches and writes

License

MIT, Apache-2.0 licenses found

Licenses found

MIT
LICENSE
Apache-2.0
LICENSE-APACHE
Notifications You must be signed in to change notification settings

EthoIRL/extractdb

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

148 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Extract DB

Crates.io docs.rs

A thread-safe, in-memory hash store supporting concurrent fetches and writes.

This is not a traditional kv-store, in the sense that it doesn't use any form of keys.
Specific "item" removal is not supported in favor of a fetching type system and can be thought of as a read-only dequeue database.

Table of contents

Guarantees.

  • All items will eventually be fetched (no duplication), but ordering is non-deterministic (Not FIFO or FILO)
  • Items are never removed once inserted (append-only / arc-reference-fetching)
  • All functions are thread safe

Trade-offs.

  • No WAL, lossy on power loss or crash.
  • No item removal
  • Non-deterministic fetch order (May seem deterministic, not guaranteed)
  • Concurrent write throughput is PRIORITIZED over reading performance

Use scenarios:

  • Concurrent queue with unique items only (HashSet + VecDeque)-like
  • Fast concurrent insertions are needed over concurrent reads
  • Fast reading on a single-thread with multiple concurrent writers
  • Persistent in-memory hash-store

This was originally built for a web-scraper which needs to write lots of links with fewer reads.

Installation

# Cargo.toml
[dependencies]
extractdb = "0.1.0"

Examples

Push, fetch, & count

use extractdb::ExtractDb;
use std::sync::Arc;

fn main() {
    let database: ExtractDb<i32> = ExtractDb::default();

    database.push(Arc::new(100));

    let total_items_in_db = database.internal_count();
    let mut items_in_quick_access_memory = 0;
    if total_items_in_db > 0 {
        let item: Arc<i32> = database.fetch_next().unwrap();

        items_in_quick_access_memory = database.fetch_count();
    }
    
    println!("Total items: {} | Quick Access item count: {}", total_items_in_db, items_in_quick_access_memory);
}

Multithreaded insert & fetch

use std::sync::Arc;
use extractdb::ExtractDb;
use std::thread;

fn main() {
    let database: Arc<ExtractDb<String>> = Arc::new(ExtractDb::default());

    for thread_id in 0..8 {
        let local_database = Arc::clone(&database);
        thread::spawn(move || {
            local_database.push(Arc::new(format!("Hello from thread {}", thread_id)))
        });
    }

    // Will only print some of the items... since we are not waiting for thread completion.
    for _ in 0..8 {
        if let Ok(item) = database.fetch_next() {
            println!("Item: {}", item);
        }
    }
}

Disk loading and saving

use std::path::PathBuf;
use std::sync::Arc;
use extractdb::{ExtractConfig, ExtractDb};

fn main() {
    let config = ExtractConfig::default()
        .database_directory(Some(PathBuf::from("./test_db")));

    let database: ExtractDb<String> = ExtractDb::new(config);

    // `True`: Load all items back into `fetch_next` queue
    database.load_from_disk(true).unwrap();

    database.push(Arc::new("Hello world!".to_string()));

    database.save_to_disk().unwrap();
}

Auto saving

use std::sync::Arc;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use extractdb::{CheckpointSettings, ExtractConfig, ExtractDb};

fn main() {
    let config = ExtractConfig::default()
        .database_directory(Some(PathBuf::from("./test_db_2")));

    let database: Arc<ExtractDb<String>> = Arc::new(ExtractDb::new(config));

    // `True`: Load all items back into `fetch_next` queue
    database.load_from_disk(true).unwrap();

    let shutdown_flag = Arc::new(AtomicBool::new(false));
    let mut save_settings = CheckpointSettings::new(shutdown_flag.clone());
    save_settings.minimum_changes = 1000;
    
    // Spawns a background watcher thread. 
    // This checks for a minimum of 1000 changes every 30 seconds (default)
    ExtractDb::background_checkpoints(save_settings, database.clone());
    
    // Perform single/multithreaded logic
    database.push(Arc::new("Hello world!".to_string()));

    // Gracefully shutdown the background saving thread
    shutdown_flag.store(true, Ordering::Relaxed);
}

Testing + More examples

This project includes some basic tests to maintain functionality please use them.

cargo test

See internal doc-comments for more indepth information about each test:

I/O

  • push
  • push_multiple
  • push_collided
  • push_structure
  • count_empty_store
  • count_loaded_store
  • fetch_data
  • fetch_data_multiple
  • fetch_data_empty
  • duplicate_fetch

Disk

  • save_state_to_disk
  • load_state_from_disk
  • load_corrupted_state_from_disk
  • load_shard_mismatch_from_disk
  • load_mismatch_type_from_disk

Multithreaded

  • push_multi_thread

Contributing

Pull request and issue contributions are very welcome. Please feel free to suggest changes in PRs/Issues :)

License

This project is licensed under either MIT or Apache-2.0, you choose.

About

A thread-safe, in-memory hash store supporting concurrent fetches and writes

Topics

Resources

License

MIT, Apache-2.0 licenses found

Licenses found

MIT
LICENSE
Apache-2.0
LICENSE-APACHE

Stars

Watchers

Forks

Releases

No releases published

Languages