Skip to content

Consider true parallelisation when processing individual db_to_db steps #274

@skinkie

Description

@skinkie
import lmdb
import pickle
import multiprocessing as mp
from typing import Callable

def lmdb_reader(env_path, task_queue, stop_signal):
    """Leest LMDB en stuurt taken naar de queue."""
    env = lmdb.open(env_path, readonly=True, lock=False)
    with env.begin() as txn:
        cursor = txn.cursor()
        for key, value in cursor:
            task_queue.put((key, value))  # Voeg taak toe
    for _ in range(stop_signal):  # Stuur stop-signalen naar workers
        task_queue.put(None)

def worker(task_queue, write_queue, func):
    """Haal taken op, deserialiseer en voer functie uit."""
    while True:
        item = task_queue.get()
        if item is None:
            break
        key, value = item
        deserialized = pickle.loads(value)  # Deserialisatie (CPU/GIL)
        result = func(deserialized)  # Voer functie uit
        write_queue.put((key, pickle.dumps(result)))  # Stuur naar writer
    write_queue.put(None)  # Stop-signaal voor writer

def lmdb_writer(target_db, write_queue):
    """Schrijft verwerkte data terug naar LMDB."""
    env = lmdb.open(target_db, map_size=1e9)
    with env.begin(write=True) as txn:
        while True:
            item = write_queue.get()
            if item is None:
                break
            key, value = item
            txn.put(key, value)
    env.sync()

def run_function_in_parallel(source_db: str, target_db: str, func: Callable, num_workers: int = None):
    """Run een willekeurige functie parallel op LMDB-data."""
    if num_workers is None:
        num_workers = mp.cpu_count() - 1  # Gebruik alle cores behalve 1

    task_queue = mp.Queue(maxsize=100)  
    write_queue = mp.Queue(maxsize=50)

    reader_proc = mp.Process(target=lmdb_reader, args=(source_db, task_queue, num_workers))
    writer_proc = mp.Process(target=lmdb_writer, args=(target_db, write_queue))

    worker_pool = [mp.Process(target=worker, args=(task_queue, write_queue, func)) for _ in range(num_workers)]

    # Start alles
    reader_proc.start()
    writer_proc.start()
    for p in worker_pool:
        p.start()

    # Wacht tot alles klaar is
    reader_proc.join()
    for p in worker_pool:
        p.join()
    writer_proc.join()

Metadata

Metadata

Assignees

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions