From 99d9d7a5a8cfecf498a307d553e2201a688b6ee2 Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Tue, 11 Jun 2019 15:35:43 +0200 Subject: [PATCH 1/7] Added marshalling wrappers needed for TransactionDb support. --- RocksDbSharp/Native.Marshaled.cs | 199 +++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) diff --git a/RocksDbSharp/Native.Marshaled.cs b/RocksDbSharp/Native.Marshaled.cs index 532d19c..ee64754 100644 --- a/RocksDbSharp/Native.Marshaled.cs +++ b/RocksDbSharp/Native.Marshaled.cs @@ -123,6 +123,165 @@ public string rocksdb_get( } } + public string rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + string key, + out IntPtr errptr, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + var klength = key.Length; + var bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = rocksdb_transaction_get(txn, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); + } + } + } + + public void rocksdb_transaction_put( + IntPtr txn, + string key, + string val, + out IntPtr errptr, + Encoding encoding = null) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + var klength = key.Length; + var vlength = val.Length; + var bklength = encoding.GetByteCount(k, klength); + var bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + var bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + rocksdb_transaction_put(txn, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); +#if DEBUG + Zero(bk, bklength); + Zero(bv, bvlength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + + public void rocksdb_transaction_delete( + IntPtr txn, + string key, + out IntPtr errptr, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + rocksdb_transaction_delete(txn, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + + public string rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + string key, + out IntPtr errptr, + Encoding encoding = null) + { + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + var klength = key.Length; + var bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + + var resultPtr = rocksdb_transactiondb_get(txn_db, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); + } + } + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr write_options, + string key, + string val, + out IntPtr errptr, + Encoding encoding = null) + { + unsafe + { + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) + { + var klength = key.Length; + var vlength = val.Length; + var bklength = encoding.GetByteCount(k, klength); + var bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); + var bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + var bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + rocksdb_transactiondb_put(txn_db, write_options, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); +#if DEBUG + Zero(bk, bklength); + Zero(bv, bvlength); +#endif + Marshal.FreeHGlobal(buffer); + } + } + } + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr write_options, + string key, + out IntPtr errptr, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + rocksdb_transactiondb_delete(txn_db, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + + private unsafe string MarshalAndFreeRocksDbString(IntPtr resultPtr, long resultLength, Encoding encoding) { var result = CurrentFramework.CreateString((sbyte*)resultPtr.ToPointer(), 0, (int)resultLength, encoding); @@ -156,6 +315,46 @@ public byte[] rocksdb_get( return result; } + public byte[] rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + ulong keyLength, + out IntPtr errptr) + { + var resultPtr = rocksdb_transaction_get(txn, read_options, key, new UIntPtr(keyLength), out UIntPtr valueLength, out errptr); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + var result = new byte[valueLength.ToUInt64()]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength.ToUInt64()); + rocksdb_free(resultPtr); + return result; + } + + public byte[] rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + ulong keyLength, + out IntPtr errptr) + { + var resultPtr = rocksdb_transactiondb_get(txn_db, read_options, key, new UIntPtr(keyLength), out UIntPtr valueLength, out errptr); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + var result = new byte[valueLength.ToUInt64()]; + Marshal.Copy(resultPtr, result, 0, (int)valueLength.ToUInt64()); + rocksdb_free(resultPtr); + return result; + } + /// /// Executes a multi_get with automatic marshalling /// From 3904dcd8d6f8d9c406d72da581034c04b953771f Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Tue, 11 Jun 2019 16:04:10 +0200 Subject: [PATCH 2/7] Decouple class Snapshot from class RocksDb. --- RocksDbSharp/RocksDb.cs | 2 +- RocksDbSharp/Snapshot.cs | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/RocksDbSharp/RocksDb.cs b/RocksDbSharp/RocksDb.cs index 9468188..63529e7 100644 --- a/RocksDbSharp/RocksDb.cs +++ b/RocksDbSharp/RocksDb.cs @@ -239,7 +239,7 @@ public Iterator[] NewIterators(ColumnFamilyHandle[] cfs, ReadOptions[] readOptio public Snapshot CreateSnapshot() { IntPtr snapshotHandle = Native.Instance.rocksdb_create_snapshot(Handle); - return new Snapshot(Handle, snapshotHandle); + return new Snapshot(Handle, snapshotHandle, () => Native.Instance.rocksdb_release_snapshot(Handle, snapshotHandle)); } public static IEnumerable ListColumnFamilies(DbOptions options, string name) diff --git a/RocksDbSharp/Snapshot.cs b/RocksDbSharp/Snapshot.cs index 85af5ff..18f036e 100644 --- a/RocksDbSharp/Snapshot.cs +++ b/RocksDbSharp/Snapshot.cs @@ -13,12 +13,15 @@ namespace RocksDbSharp public class Snapshot : IDisposable { private IntPtr dbHandle; - public IntPtr Handle { get; private set; } + private Action releaseAction; - internal Snapshot(IntPtr dbHandle, IntPtr snapshotHandle) + public IntPtr Handle { get; private set; } + + internal Snapshot(IntPtr dbHandle, IntPtr snapshotHandle, Action doRelease) { this.dbHandle = dbHandle; Handle = snapshotHandle; + releaseAction = doRelease; } public void Dispose() @@ -26,7 +29,7 @@ public void Dispose() if (Handle != IntPtr.Zero) { #if !NODESTROY - Native.Instance.rocksdb_release_snapshot(dbHandle, Handle); + releaseAction?.Invoke(); #endif Handle = IntPtr.Zero; } From 0498b6ba687ae3ded0c724f42a940733ef83216e Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Tue, 11 Jun 2019 16:05:13 +0200 Subject: [PATCH 3/7] Added classes for TransactionDb. --- RocksDbSharp/Transaction.cs | 103 ++++++++++++++++++++++++ RocksDbSharp/TransactionDb.cs | 112 +++++++++++++++++++++++++++ RocksDbSharp/TransactionDbOptions.cs | 46 +++++++++++ RocksDbSharp/TransactionOptions.cs | 57 ++++++++++++++ 4 files changed, 318 insertions(+) create mode 100644 RocksDbSharp/Transaction.cs create mode 100644 RocksDbSharp/TransactionDb.cs create mode 100644 RocksDbSharp/TransactionDbOptions.cs create mode 100644 RocksDbSharp/TransactionOptions.cs diff --git a/RocksDbSharp/Transaction.cs b/RocksDbSharp/Transaction.cs new file mode 100644 index 0000000..e393ef2 --- /dev/null +++ b/RocksDbSharp/Transaction.cs @@ -0,0 +1,103 @@ +using System; +using System.Text; + +namespace RocksDbSharp +{ + public class Transaction : IDisposable + { + ReadOptions DefaultReadOptions { get; set; } + WriteOptions DefaultWriteOptions { get; set; } + TransactionOptions Options { get; set; } + + public IntPtr Handle { get; private set; } + + internal Transaction(IntPtr h, WriteOptions wo, TransactionOptions to) + { + Handle = h; + DefaultReadOptions = new ReadOptions(); + DefaultWriteOptions = wo; + Options = to; + } + + public void Dispose() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transaction_destroy(Handle); + Handle = IntPtr.Zero; + } + + public void Commit() + { + IntPtr err; + Native.Instance.rocksdb_transaction_commit(Handle, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Rollback() + { + IntPtr err; + Native.Instance.rocksdb_transaction_rollback(Handle, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Put(string key, string val, Encoding enc = null) + { + IntPtr err; + Native.Instance.rocksdb_transaction_put(Handle, key, val, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Put(byte[] key, byte[] val) + { + IntPtr err; + Native.Instance.rocksdb_transaction_put(Handle, key, new UIntPtr((ulong)key.Length), val, new UIntPtr((ulong)val.Length), out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public string Get(string key, ReadOptions o = null, Encoding enc = null) + { + IntPtr err; + var result = Native.Instance.rocksdb_transaction_get(Handle, (o ?? DefaultReadOptions).Handle, key, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + + return result; + } + + public byte[] Get(byte[] key, ReadOptions o = null) + { + IntPtr err; + var result = Native.Instance.rocksdb_transaction_get(Handle, (o ?? DefaultReadOptions).Handle, key, (ulong)key.Length, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + + return result; + } + + public void Remove(string key, Encoding enc = null) + { + IntPtr err; + Native.Instance.rocksdb_transaction_delete(Handle, key, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Remove(byte[] key) + { + IntPtr err; + Native.Instance.rocksdb_transaction_delete(Handle, key, new UIntPtr((ulong)key.Length), out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public Iterator NewIterator(ReadOptions readOptions = null) + { + IntPtr iteratorHandle = Native.Instance.rocksdb_transaction_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle); + return new Iterator(iteratorHandle, readOptions); + } + } +} diff --git a/RocksDbSharp/TransactionDb.cs b/RocksDbSharp/TransactionDb.cs new file mode 100644 index 0000000..2cf031e --- /dev/null +++ b/RocksDbSharp/TransactionDb.cs @@ -0,0 +1,112 @@ +using System; +using System.Text; + +namespace RocksDbSharp +{ + public class TransactionDb : IDisposable + { + public static TransactionDb Open(DbOptions dbo, TransactionDbOptions tdbo, string name) + { + IntPtr handle, err; + handle = Native.Instance.rocksdb_transactiondb_open(dbo.Handle, tdbo.Handle, name, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + + return new TransactionDb(handle, dbo, tdbo, name); + } + + public IntPtr Handle { get; private set; } + internal DbOptions DbOptions { get; private set; } + internal TransactionDbOptions TDbOptions { get; private set; } + public string Name { get; private set; } + ReadOptions DefaultReadOptions { get; set; } + WriteOptions DefaultWriteOptions { get; set; } + + internal TransactionDb(IntPtr h, DbOptions db_options, TransactionDbOptions tdb_options, string name) + { + Handle = h; + DbOptions = db_options; + TDbOptions = tdb_options; + Name = name; + DefaultReadOptions = new ReadOptions(); + DefaultWriteOptions = new WriteOptions(); + } + + public void Dispose() + { + Close(); + } + + public void Close() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transactiondb_close(Handle); + Handle = IntPtr.Zero; + } + + public Transaction BeginTransaction(WriteOptions wo, TransactionOptions to, Transaction prev = null) + { + IntPtr handle = Native.Instance.rocksdb_transaction_begin(Handle, wo.Handle, to.Handle, (prev != null) ? prev.Handle : IntPtr.Zero); + + return new Transaction(handle, wo, to); + } + + public Snapshot CreateSnapshot() + { + IntPtr snapshotHandle = Native.Instance.rocksdb_transactiondb_create_snapshot(Handle); + return new Snapshot(Handle, snapshotHandle, () => Native.Instance.rocksdb_transactiondb_release_snapshot(Handle, snapshotHandle)); + } + + public void Put(string key, string val, WriteOptions wo = null, Encoding enc = null) + { + IntPtr err; + Native.Instance.rocksdb_transactiondb_put(Handle, (wo ?? DefaultWriteOptions).Handle, key, val, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Put(byte[] key, byte[] val, WriteOptions wo = null) + { + IntPtr err; + Native.Instance.rocksdb_transactiondb_put(Handle, (wo ?? DefaultWriteOptions).Handle, key, new UIntPtr((ulong)key.Length), val, new UIntPtr((ulong)val.Length), out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public string Get(string key, ReadOptions o = null, Encoding enc = null) + { + IntPtr err; + var result = Native.Instance.rocksdb_transactiondb_get(Handle, (o ?? DefaultReadOptions).Handle, key, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + + return result; + } + + public byte[] Get(byte[] key, ReadOptions o = null) + { + IntPtr err; + var result = Native.Instance.rocksdb_transactiondb_get(Handle, (o ?? DefaultReadOptions).Handle, key, (ulong)key.Length, out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + + return result; + } + + public void Remove(string key, WriteOptions wo = null, Encoding enc = null) + { + IntPtr err; + Native.Instance.rocksdb_transactiondb_delete(Handle, (wo ?? DefaultWriteOptions).Handle, key, out err, enc); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + + public void Remove(byte[] key, WriteOptions wo = null) + { + IntPtr err; + Native.Instance.rocksdb_transactiondb_delete(Handle, (wo ?? DefaultWriteOptions).Handle, key, new UIntPtr((ulong)key.Length), out err); + if (err != IntPtr.Zero) + throw new RocksDbException(err); + } + } +} diff --git a/RocksDbSharp/TransactionDbOptions.cs b/RocksDbSharp/TransactionDbOptions.cs new file mode 100644 index 0000000..f371c61 --- /dev/null +++ b/RocksDbSharp/TransactionDbOptions.cs @@ -0,0 +1,46 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionDbOptions + { + public IntPtr Handle { get; private set; } + + public TransactionDbOptions() + { + Handle = Native.Instance.rocksdb_transactiondb_options_create(); + } + + ~TransactionDbOptions() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transactiondb_options_destroy(Handle); + Handle = IntPtr.Zero; + } + + public TransactionDbOptions SetMaxNumLocks(long max_num_locks) + { + Native.Instance.rocksdb_transactiondb_options_set_max_num_locks(Handle, max_num_locks); + return this; + } + + public TransactionDbOptions SetNumStripes(ulong num_stripes) + { + Native.Instance.rocksdb_transactiondb_options_set_num_stripes(Handle, new UIntPtr(num_stripes)); + return this; + } + + public TransactionDbOptions SetTransactionLockTimeout(long txn_lock_timeout) + { + Native.Instance.rocksdb_transactiondb_options_set_transaction_lock_timeout(Handle, txn_lock_timeout); + return this; + } + + public TransactionDbOptions SetDefaultLockTimeout(long default_lock_timeout) + { + Native.Instance.rocksdb_transactiondb_options_set_default_lock_timeout(Handle, default_lock_timeout); + return this; + } + + } +} diff --git a/RocksDbSharp/TransactionOptions.cs b/RocksDbSharp/TransactionOptions.cs new file mode 100644 index 0000000..e2e5bfd --- /dev/null +++ b/RocksDbSharp/TransactionOptions.cs @@ -0,0 +1,57 @@ +using System; + +namespace RocksDbSharp +{ + public class TransactionOptions + { + public IntPtr Handle { get; private set; } + + public TransactionOptions() + { + Handle = Native.Instance.rocksdb_transaction_options_create(); + } + + ~TransactionOptions() + { + if (Handle != IntPtr.Zero) + Native.Instance.rocksdb_transaction_options_destroy(Handle); + Handle = IntPtr.Zero; + } + + public TransactionOptions SetSetSnapshot(bool v) + { + Native.Instance.rocksdb_transaction_options_set_set_snapshot(Handle, v); + return this; + } + + public TransactionOptions SetDeadlockDetect(bool v) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect(Handle, v); + return this; + } + + public TransactionOptions SetLockTimeout(long timeout) + { + Native.Instance.rocksdb_transaction_options_set_lock_timeout(Handle, timeout); + return this; + } + + public TransactionOptions SetExpiration(long expiration) + { + Native.Instance.rocksdb_transaction_options_set_expiration(Handle, expiration); + return this; + } + + public TransactionOptions SetDeadlockDetectDepth(long depth) + { + Native.Instance.rocksdb_transaction_options_set_deadlock_detect_depth(Handle, depth); + return this; + } + + public TransactionOptions SetMaxWriteBatchSize(ulong size) + { + Native.Instance.rocksdb_transaction_options_set_max_write_batch_size(Handle, new UIntPtr(size)); + return this; + } + } +} From a1c53c9dd685959f3b03cdc8f9060b3f964cb7a0 Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Tue, 11 Jun 2019 16:37:22 +0200 Subject: [PATCH 4/7] Implement checkpoints for TransactionDb. --- RocksDbSharp/Native.Wrap.cs | 9 +++++++++ RocksDbSharp/TransactionDb.cs | 16 ++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index f79390a..aa7fb97 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -82,6 +82,15 @@ public IntPtr rocksdb_open_for_read_only_column_families( return result; } + public /*(rocksdb_checkpoint_t*)*/ IntPtr rocksdb_transactiondb_checkpoint_object_create( + /*(rocksdb_transactiondb_t*)*/ IntPtr txdb) + { + var result = rocksdb_transactiondb_checkpoint_object_create(txdb, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + public void rocksdb_checkpoint_create( /*(rocksdb_checkpoint_t*)*/ IntPtr checkpoint, /*(const char*)*/ string checkpoint_dir, diff --git a/RocksDbSharp/TransactionDb.cs b/RocksDbSharp/TransactionDb.cs index 2cf031e..e1f613a 100644 --- a/RocksDbSharp/TransactionDb.cs +++ b/RocksDbSharp/TransactionDb.cs @@ -51,6 +51,22 @@ public Transaction BeginTransaction(WriteOptions wo, TransactionOptions to, Tran return new Transaction(handle, wo, to); } + /// + /// Usage: + /// + /// + /// + public Checkpoint Checkpoint() + { + var checkpoint = Native.Instance.rocksdb_transactiondb_checkpoint_object_create(Handle); + return new Checkpoint(checkpoint); + } + public Snapshot CreateSnapshot() { IntPtr snapshotHandle = Native.Instance.rocksdb_transactiondb_create_snapshot(Handle); From ac7f123e8a6c1419bb17319ae50436ce3086b10f Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Wed, 12 Jun 2019 10:45:17 +0200 Subject: [PATCH 5/7] Add writebatch support for TransactionDb. --- RocksDbSharp/Native.Wrap.cs | 10 ++++++++++ RocksDbSharp/TransactionDb.cs | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index aa7fb97..4bfb5a8 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -270,6 +270,16 @@ public void rocksdb_write( throw new RocksDbException(errptr); } + public void rocksdb_transactiondb_write( + /*rocksdb_transactiondb_t**/ IntPtr db, + /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + /*(rocksdb_writebatch_t*)*/ IntPtr writeBatch) + { + rocksdb_transactiondb_write(db, writeOptions, writeBatch, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public void rocksdb_write_writebatch_wi( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, diff --git a/RocksDbSharp/TransactionDb.cs b/RocksDbSharp/TransactionDb.cs index e1f613a..49364eb 100644 --- a/RocksDbSharp/TransactionDb.cs +++ b/RocksDbSharp/TransactionDb.cs @@ -107,6 +107,11 @@ public byte[] Get(byte[] key, ReadOptions o = null) throw new RocksDbException(err); return result; + } + + public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_transactiondb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); } public void Remove(string key, WriteOptions wo = null, Encoding enc = null) From 469d891bc2e820781b0badae55980758443e5704 Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Wed, 12 Jun 2019 14:51:44 +0200 Subject: [PATCH 6/7] Add ColumnFamily support to TransactionDb. --- RocksDbSharp/Native.Marshaled.cs | 223 +++++++++++++++------------ RocksDbSharp/Native.Wrap.cs | 252 ++++++++++++++++++++++++++++++- RocksDbSharp/Transaction.cs | 125 ++++++++++----- RocksDbSharp/TransactionDb.cs | 182 +++++++++++++++------- 4 files changed, 584 insertions(+), 198 deletions(-) diff --git a/RocksDbSharp/Native.Marshaled.cs b/RocksDbSharp/Native.Marshaled.cs index ee64754..3d02e80 100644 --- a/RocksDbSharp/Native.Marshaled.cs +++ b/RocksDbSharp/Native.Marshaled.cs @@ -84,85 +84,51 @@ public void rocksdb_put( } } - public string rocksdb_get( - /*rocksdb_t**/ IntPtr db, - /*const rocksdb_readoptions_t**/ IntPtr read_options, - string key, - out IntPtr errptr, - ColumnFamilyHandle cf = null, - Encoding encoding = null) - { - if (encoding == null) - encoding = Encoding.UTF8; - unsafe - { - fixed (char* k = key) - { - int klength = key.Length; - int bklength = encoding.GetByteCount(k, klength); - var buffer = Marshal.AllocHGlobal(bklength); - byte* bk = (byte*)buffer.ToPointer(); - encoding.GetBytes(k, klength, bk, bklength); - UIntPtr sklength = (UIntPtr)bklength; - - var resultPtr = cf == null - ? rocksdb_get(db, read_options, bk, sklength, out UIntPtr bvlength, out errptr) - : rocksdb_get_cf(db, read_options, cf.Handle, bk, sklength, out bvlength, out errptr); -#if DEBUG - Zero(bk, bklength); -#endif - Marshal.FreeHGlobal(buffer); - - if (errptr != IntPtr.Zero) - return null; - if (resultPtr == IntPtr.Zero) - return null; - - return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding); - } - } - } - - public string rocksdb_transaction_get( + public void rocksdb_transaction_put( IntPtr txn, - IntPtr read_options, string key, + string val, out IntPtr errptr, + ColumnFamilyHandle cf = null, Encoding encoding = null) { - if (encoding == null) - encoding = Encoding.UTF8; unsafe { - fixed (char* k = key) + if (encoding == null) + encoding = Encoding.UTF8; + fixed (char* k = key, v = val) { var klength = key.Length; + var vlength = val.Length; var bklength = encoding.GetByteCount(k, klength); - var buffer = Marshal.AllocHGlobal(bklength); + var bvlength = encoding.GetByteCount(v, vlength); + var buffer = Marshal.AllocHGlobal(bklength + bvlength); var bk = (byte*)buffer.ToPointer(); encoding.GetBytes(k, klength, bk, bklength); + var bv = bk + bklength; + encoding.GetBytes(v, vlength, bv, bvlength); + + if (cf == null) + rocksdb_transaction_put(txn, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + else + rocksdb_transaction_put_cf(txn, cf.Handle, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); - var resultPtr = rocksdb_transaction_get(txn, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr); #if DEBUG Zero(bk, bklength); + Zero(bv, bvlength); #endif Marshal.FreeHGlobal(buffer); - - if (errptr != IntPtr.Zero) - return null; - if (resultPtr == IntPtr.Zero) - return null; - - return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); } } } - public void rocksdb_transaction_put( - IntPtr txn, + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr write_options, string key, string val, out IntPtr errptr, + ColumnFamilyHandle cf = null, Encoding encoding = null) { unsafe @@ -181,7 +147,11 @@ public void rocksdb_transaction_put( var bv = bk + bklength; encoding.GetBytes(v, vlength, bv, bvlength); - rocksdb_transaction_put(txn, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + if (cf == null) + rocksdb_transactiondb_put(txn_db, write_options, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + else + rocksdb_transactiondb_put_cf(txn_db, write_options, cf.Handle, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + #if DEBUG Zero(bk, bklength); Zero(bv, bvlength); @@ -191,21 +161,51 @@ public void rocksdb_transaction_put( } } - public void rocksdb_transaction_delete( - IntPtr txn, + public string rocksdb_get( + /*rocksdb_t**/ IntPtr db, + /*const rocksdb_readoptions_t**/ IntPtr read_options, string key, out IntPtr errptr, + ColumnFamilyHandle cf = null, Encoding encoding = null) { - var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); - rocksdb_transaction_delete(txn, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + if (encoding == null) + encoding = Encoding.UTF8; + unsafe + { + fixed (char* k = key) + { + int klength = key.Length; + int bklength = encoding.GetByteCount(k, klength); + var buffer = Marshal.AllocHGlobal(bklength); + byte* bk = (byte*)buffer.ToPointer(); + encoding.GetBytes(k, klength, bk, bklength); + UIntPtr sklength = (UIntPtr)bklength; + + var resultPtr = cf == null + ? rocksdb_get(db, read_options, bk, sklength, out UIntPtr bvlength, out errptr) + : rocksdb_get_cf(db, read_options, cf.Handle, bk, sklength, out bvlength, out errptr); +#if DEBUG + Zero(bk, bklength); +#endif + Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding); + } + } } - public string rocksdb_transactiondb_get( - IntPtr txn_db, + public string rocksdb_transaction_get( + IntPtr txn, IntPtr read_options, string key, out IntPtr errptr, + ColumnFamilyHandle cf = null, Encoding encoding = null) { if (encoding == null) @@ -220,7 +220,10 @@ public string rocksdb_transactiondb_get( var bk = (byte*)buffer.ToPointer(); encoding.GetBytes(k, klength, bk, bklength); - var resultPtr = rocksdb_transactiondb_get(txn_db, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr); + var resultPtr = cf == null + ? rocksdb_transaction_get(txn, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, bk, new UIntPtr((ulong)bklength), out bvlength, out errptr); + #if DEBUG Zero(bk, bklength); #endif @@ -235,53 +238,46 @@ public string rocksdb_transactiondb_get( } } } - - public void rocksdb_transactiondb_put( + + public string rocksdb_transactiondb_get( IntPtr txn_db, - IntPtr write_options, + IntPtr read_options, string key, - string val, out IntPtr errptr, + ColumnFamilyHandle cf = null, Encoding encoding = null) { + if (encoding == null) + encoding = Encoding.UTF8; unsafe { - if (encoding == null) - encoding = Encoding.UTF8; - fixed (char* k = key, v = val) + fixed (char* k = key) { var klength = key.Length; - var vlength = val.Length; var bklength = encoding.GetByteCount(k, klength); - var bvlength = encoding.GetByteCount(v, vlength); - var buffer = Marshal.AllocHGlobal(bklength + bvlength); + var buffer = Marshal.AllocHGlobal(bklength); var bk = (byte*)buffer.ToPointer(); encoding.GetBytes(k, klength, bk, bklength); - var bv = bk + bklength; - encoding.GetBytes(v, vlength, bv, bvlength); - rocksdb_transactiondb_put(txn_db, write_options, bk, new UIntPtr((ulong)bklength), bv, new UIntPtr((ulong)bvlength), out errptr); + var resultPtr = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, bk, new UIntPtr((ulong)bklength), out UIntPtr bvlength, out errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, bk, new UIntPtr((ulong)bklength), out bvlength, out errptr); + #if DEBUG Zero(bk, bklength); - Zero(bv, bvlength); #endif Marshal.FreeHGlobal(buffer); + + if (errptr != IntPtr.Zero) + return null; + if (resultPtr == IntPtr.Zero) + return null; + + return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength.ToUInt64(), encoding); } } } - public void rocksdb_transactiondb_delete( - IntPtr txn_db, - IntPtr write_options, - string key, - out IntPtr errptr, - Encoding encoding = null) - { - var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); - rocksdb_transactiondb_delete(txn_db, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); - } - - private unsafe string MarshalAndFreeRocksDbString(IntPtr resultPtr, long resultLength, Encoding encoding) { var result = CurrentFramework.CreateString((sbyte*)resultPtr.ToPointer(), 0, (int)resultLength, encoding); @@ -319,10 +315,13 @@ public byte[] rocksdb_transaction_get( IntPtr txn, IntPtr read_options, byte[] key, - ulong keyLength, - out IntPtr errptr) + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) { - var resultPtr = rocksdb_transaction_get(txn, read_options, key, new UIntPtr(keyLength), out UIntPtr valueLength, out errptr); + var resultPtr = cf == null + ? rocksdb_transaction_get(txn, read_options, key, new UIntPtr((ulong)keyLength), out UIntPtr valueLength, out errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, key, new UIntPtr((ulong)keyLength), out valueLength, out errptr); if (errptr != IntPtr.Zero) return null; @@ -339,10 +338,13 @@ public byte[] rocksdb_transactiondb_get( IntPtr txn_db, IntPtr read_options, byte[] key, - ulong keyLength, - out IntPtr errptr) + long keyLength, + out IntPtr errptr, + ColumnFamilyHandle cf = null) { - var resultPtr = rocksdb_transactiondb_get(txn_db, read_options, key, new UIntPtr(keyLength), out UIntPtr valueLength, out errptr); + var resultPtr = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, key, new UIntPtr((ulong)keyLength), out UIntPtr valueLength, out errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, key, new UIntPtr((ulong)keyLength), out valueLength, out errptr); if (errptr != IntPtr.Zero) return null; @@ -541,6 +543,37 @@ public void rocksdb_delete( rocksdb_delete_cf(db, writeOptions, cf.Handle, bkey, kLength, out errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + if (cf == null) + rocksdb_transaction_delete(txn, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + else + rocksdb_transaction_delete_cf(txn, cf.Handle, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + + + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr write_options, + string key, + out IntPtr errptr, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + var bkey = (encoding ?? Encoding.UTF8).GetBytes(key); + if (cf == null) + rocksdb_transactiondb_delete(txn_db, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + else + rocksdb_transactiondb_delete_cf(txn_db, cf.Handle, write_options, bkey, new UIntPtr((ulong)bkey.Length), out errptr); + } + public string rocksdb_options_statistics_get_string_marshaled(IntPtr opts) { return MarshalNullTermAsciiStr(rocksdb_options_statistics_get_string(opts)); diff --git a/RocksDbSharp/Native.Wrap.cs b/RocksDbSharp/Native.Wrap.cs index 4bfb5a8..1836c2e 100644 --- a/RocksDbSharp/Native.Wrap.cs +++ b/RocksDbSharp/Native.Wrap.cs @@ -22,6 +22,17 @@ public IntPtr rocksdb_open( return result; } + public IntPtr rocksdb_transactiondb_open( + IntPtr options, + IntPtr txn_db_options, + string name) + { + var result = rocksdb_transactiondb_open(options, txn_db_options, name, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + public IntPtr rocksdb_open_for_read_only( /* const rocksdb_options_t* */ IntPtr options, string name, @@ -83,9 +94,9 @@ public IntPtr rocksdb_open_for_read_only_column_families( } public /*(rocksdb_checkpoint_t*)*/ IntPtr rocksdb_transactiondb_checkpoint_object_create( - /*(rocksdb_transactiondb_t*)*/ IntPtr txdb) + /*(rocksdb_transactiondb_t*)*/ IntPtr txn_db) { - var result = rocksdb_transactiondb_checkpoint_object_create(txdb, out IntPtr errptr); + var result = rocksdb_transactiondb_checkpoint_object_create(txn_db, out IntPtr errptr); if (errptr != IntPtr.Zero) throw new RocksDbException(errptr); return result; @@ -147,6 +158,69 @@ public void rocksdb_put( throw new RocksDbException(errptr); } + public void rocksdb_transaction_put( + IntPtr txn, + string key, + string val, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + rocksdb_transaction_put(txn, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transaction_put( + IntPtr txn, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)keyLength; + UIntPtr svlength = (UIntPtr)valueLength; + if (cf == null) + rocksdb_transaction_put(txn, key, sklength, value, svlength, out errptr); + else + rocksdb_transaction_put_cf(txn, cf.Handle, key, sklength, value, svlength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr writeOptions, + string key, + string val, + ColumnFamilyHandle cf = null, + Encoding encoding = null) + { + rocksdb_transactiondb_put(txn_db, writeOptions, key, val, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_put( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keyLength, + byte[] value, + long valueLength, + ColumnFamilyHandle cf) + { + IntPtr errptr; + UIntPtr sklength = (UIntPtr)keyLength; + UIntPtr svlength = (UIntPtr)valueLength; + if (cf == null) + rocksdb_transactiondb_put(txn_db, writeOptions, key, sklength, value, svlength, out errptr); + else + rocksdb_transactiondb_put_cf(txn_db, writeOptions, cf.Handle, key, sklength, value, svlength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } public string rocksdb_get( /*rocksdb_t**/ IntPtr db, @@ -191,7 +265,95 @@ public byte[] rocksdb_get( throw new RocksDbException(errptr); return result; } + + public string rocksdb_transaction_get( + /*rocksdb_t**/ IntPtr txn, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_transaction_get(txn, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + long keyLength, + out long vallen, + ColumnFamilyHandle cf) + { + UIntPtr sklength = (UIntPtr)keyLength; + var result = cf == null + ? rocksdb_transaction_get(txn, read_options, key, sklength, out UIntPtr valLength, out IntPtr errptr) + : rocksdb_transaction_get_cf(txn, read_options, cf.Handle, key, sklength, out valLength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + vallen = (long)valLength; + return result; + } + + public byte[] rocksdb_transaction_get( + IntPtr txn, + IntPtr read_options, + byte[] key, + long keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transaction_get(txn, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public string rocksdb_transactiondb_get( + /*rocksdb_t**/ IntPtr txn_db, + /*const rocksdb_readoptions_t**/ IntPtr read_options, + string key, + ColumnFamilyHandle cf, + Encoding encoding = null) + { + var result = rocksdb_transactiondb_get(txn_db, read_options, key, out IntPtr errptr, cf, encoding); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + + public IntPtr rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + long keyLength, + out long vallen, + ColumnFamilyHandle cf) + { + UIntPtr sklength = (UIntPtr)keyLength; + var result = cf == null + ? rocksdb_transactiondb_get(txn_db, read_options, key, sklength, out UIntPtr valLength, out IntPtr errptr) + : rocksdb_transactiondb_get_cf(txn_db, read_options, cf.Handle, key, sklength, out valLength, out errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + vallen = (long)valLength; + return result; + } + public byte[] rocksdb_transactiondb_get( + IntPtr txn_db, + IntPtr read_options, + byte[] key, + long keyLength = 0, + ColumnFamilyHandle cf = null) + { + var result = rocksdb_transactiondb_get(txn_db, read_options, key, keyLength == 0 ? key.Length : keyLength, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + public KeyValuePair[] rocksdb_multi_get( IntPtr db, IntPtr read_options, @@ -236,6 +398,27 @@ public void rocksdb_delete( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + string key, + ColumnFamilyHandle cf) + { + rocksdb_transaction_delete(txn, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr writeOptions, + string key, + ColumnFamilyHandle cf) + { + rocksdb_transactiondb_delete(txn_db, writeOptions, key, out IntPtr errptr, cf); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public void rocksdb_delete( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, @@ -248,6 +431,29 @@ public void rocksdb_delete( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete( + IntPtr txn, + byte[] key, + long keylen) + { + UIntPtr sklength = (UIntPtr)keylen; + rocksdb_transaction_delete(txn, key, sklength, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keylen) + { + UIntPtr sklength = (UIntPtr)keylen; + rocksdb_transactiondb_delete(txn_db, writeOptions, key, sklength, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public void rocksdb_delete_cf( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, @@ -260,6 +466,29 @@ public void rocksdb_delete_cf( throw new RocksDbException(errptr); } + public void rocksdb_transaction_delete_cf( + IntPtr txn, + byte[] key, + long keylen, + ColumnFamilyHandle cf) + { + rocksdb_transaction_delete_cf(txn, cf.Handle, key, (UIntPtr)keylen, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + + public void rocksdb_transactiondb_delete_cf( + IntPtr txn_db, + IntPtr writeOptions, + byte[] key, + long keylen, + ColumnFamilyHandle cf) + { + rocksdb_transactiondb_delete_cf(txn_db, writeOptions, cf.Handle, key, (UIntPtr)keylen, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + } + public void rocksdb_write( /*rocksdb_t**/ IntPtr db, /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, @@ -270,12 +499,12 @@ public void rocksdb_write( throw new RocksDbException(errptr); } - public void rocksdb_transactiondb_write( - /*rocksdb_transactiondb_t**/ IntPtr db, - /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, + public void rocksdb_transactiondb_write( + /*rocksdb_transactiondb_t**/ IntPtr txn_db, + /*const rocksdb_writeoptions_t**/ IntPtr writeOptions, /*(rocksdb_writebatch_t*)*/ IntPtr writeBatch) { - rocksdb_transactiondb_write(db, writeOptions, writeBatch, out IntPtr errptr); + rocksdb_transactiondb_write(txn_db, writeOptions, writeBatch, out IntPtr errptr); if (errptr != IntPtr.Zero) throw new RocksDbException(errptr); } @@ -321,6 +550,17 @@ public IntPtr rocksdb_create_column_family( return result; } + public IntPtr rocksdb_transactiondb_create_column_family( + IntPtr txn_db, + IntPtr column_family_options, + string column_family_name) + { + var result = rocksdb_transactiondb_create_column_family(txn_db, column_family_options, column_family_name, out IntPtr errptr); + if (errptr != IntPtr.Zero) + throw new RocksDbException(errptr); + return result; + } + public void rocksdb_drop_column_family( /*rocksdb_t**/ IntPtr db, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family_handle diff --git a/RocksDbSharp/Transaction.cs b/RocksDbSharp/Transaction.cs index e393ef2..f4bc502 100644 --- a/RocksDbSharp/Transaction.cs +++ b/RocksDbSharp/Transaction.cs @@ -1,13 +1,16 @@ using System; +using System.Runtime.InteropServices; using System.Text; +using Transitional; namespace RocksDbSharp { public class Transaction : IDisposable { - ReadOptions DefaultReadOptions { get; set; } - WriteOptions DefaultWriteOptions { get; set; } - TransactionOptions Options { get; set; } + internal ReadOptions DefaultReadOptions { get; set; } + internal WriteOptions WriteOptions { get; set; } + internal TransactionOptions Options { get; set; } + internal static Encoding DefaultEncoding => Encoding.UTF8; public IntPtr Handle { get; private set; } @@ -15,7 +18,7 @@ internal Transaction(IntPtr h, WriteOptions wo, TransactionOptions to) { Handle = h; DefaultReadOptions = new ReadOptions(); - DefaultWriteOptions = wo; + WriteOptions = wo; Options = to; } @@ -42,61 +45,103 @@ public void Rollback() throw new RocksDbException(err); } - public void Put(string key, string val, Encoding enc = null) + public void Put(string key, string value, ColumnFamilyHandle cf = null, Encoding encoding = null) { - IntPtr err; - Native.Instance.rocksdb_transaction_put(Handle, key, val, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + Native.Instance.rocksdb_transaction_put(Handle, key, value, cf, encoding ?? DefaultEncoding); } - public void Put(byte[] key, byte[] val) + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null) { - IntPtr err; - Native.Instance.rocksdb_transaction_put(Handle, key, new UIntPtr((ulong)key.Length), val, new UIntPtr((ulong)val.Length), out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf); } - public string Get(string key, ReadOptions o = null, Encoding enc = null) + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null) { - IntPtr err; - var result = Native.Instance.rocksdb_transaction_get(Handle, (o ?? DefaultReadOptions).Handle, key, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); - - return result; + Native.Instance.rocksdb_transaction_put(Handle, key, keyLength, value, valueLength, cf); } - public byte[] Get(byte[] key, ReadOptions o = null) + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) { - IntPtr err; - var result = Native.Instance.rocksdb_transaction_get(Handle, (o ?? DefaultReadOptions).Handle, key, (ulong)key.Length, out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); - - return result; + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); } - public void Remove(string key, Encoding enc = null) + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transaction_delete(Handle, key, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + return Get(key, key.GetLongLength(0), cf, readOptions); } - public void Remove(byte[] key) + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transaction_delete(Handle, key, new UIntPtr((ulong)key.Length), out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + return Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_transaction_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + return -1; + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + + public void Remove(string key, ColumnFamilyHandle cf = null) + { + Native.Instance.rocksdb_transaction_delete(Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null) + { + Remove(key, key.Length, cf); + } + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null) + { + if (cf == null) + Native.Instance.rocksdb_transaction_delete(Handle, key, keyLength); + else + Native.Instance.rocksdb_transaction_delete_cf(Handle, key, keyLength, cf); } - public Iterator NewIterator(ReadOptions readOptions = null) + public Iterator NewIterator(ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { - IntPtr iteratorHandle = Native.Instance.rocksdb_transaction_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle); + IntPtr iteratorHandle = cf == null + ? Native.Instance.rocksdb_transaction_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle) + : Native.Instance.rocksdb_transaction_create_iterator_cf(Handle, (readOptions ?? DefaultReadOptions).Handle, cf.Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator return new Iterator(iteratorHandle, readOptions); } } diff --git a/RocksDbSharp/TransactionDb.cs b/RocksDbSharp/TransactionDb.cs index 49364eb..438d4a5 100644 --- a/RocksDbSharp/TransactionDb.cs +++ b/RocksDbSharp/TransactionDb.cs @@ -1,44 +1,40 @@ using System; +using System.Collections.Generic; +using System.Dynamic; +using System.Runtime.InteropServices; using System.Text; +using Transitional; namespace RocksDbSharp { public class TransactionDb : IDisposable { - public static TransactionDb Open(DbOptions dbo, TransactionDbOptions tdbo, string name) - { - IntPtr handle, err; - handle = Native.Instance.rocksdb_transactiondb_open(dbo.Handle, tdbo.Handle, name, out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); - - return new TransactionDb(handle, dbo, tdbo, name); - } - public IntPtr Handle { get; private set; } internal DbOptions DbOptions { get; private set; } internal TransactionDbOptions TDbOptions { get; private set; } public string Name { get; private set; } - ReadOptions DefaultReadOptions { get; set; } - WriteOptions DefaultWriteOptions { get; set; } + internal ReadOptions DefaultReadOptions { get; set; } = new ReadOptions(); + internal WriteOptions DefaultWriteOptions { get; set; } = new WriteOptions(); + internal static Encoding DefaultEncoding => Encoding.UTF8; + private Dictionary columnFamilies; + + // Managed references to unmanaged resources that need to live at least as long as the db + internal dynamic References { get; } = new ExpandoObject(); - internal TransactionDb(IntPtr h, DbOptions db_options, TransactionDbOptions tdb_options, string name) + private TransactionDb(IntPtr h, DbOptions db_options, TransactionDbOptions txn_db_options, dynamic cfOptionsRefs, Dictionary columnFamilies = null) { Handle = h; DbOptions = db_options; - TDbOptions = tdb_options; - Name = name; - DefaultReadOptions = new ReadOptions(); - DefaultWriteOptions = new WriteOptions(); + TDbOptions = txn_db_options; + References.CfOptions = cfOptionsRefs; + this.columnFamilies = columnFamilies ?? new Dictionary(); } public void Dispose() { - Close(); - } + foreach (var cfh in columnFamilies.Values) + cfh.Dispose(); - public void Close() - { if (Handle != IntPtr.Zero) Native.Instance.rocksdb_transactiondb_close(Handle); Handle = IntPtr.Zero; @@ -51,6 +47,12 @@ public Transaction BeginTransaction(WriteOptions wo, TransactionOptions to, Tran return new Transaction(handle, wo, to); } + public static TransactionDb Open(DbOptions options, TransactionDbOptions txn_db_options, string path) + { + IntPtr db = Native.Instance.rocksdb_transactiondb_open(options.Handle, txn_db_options.Handle, path); + return new TransactionDb(db, options, txn_db_options, null); + } + /// /// Usage: /// Native.Instance.rocksdb_transactiondb_release_snapshot(Handle, snapshotHandle)); } - public void Put(string key, string val, WriteOptions wo = null, Encoding enc = null) + public ColumnFamilyHandle CreateColumnFamily(ColumnFamilyOptions cfOptions, string name) + { + var cfh = Native.Instance.rocksdb_transactiondb_create_column_family(Handle, cfOptions.Handle, name); + var cfhw = new ColumnFamilyHandleInternal(cfh); + columnFamilies.Add(name, cfhw); + return cfhw; + } + + public ColumnFamilyHandle GetDefaultColumnFamily() + { + return GetColumnFamily(ColumnFamilies.DefaultName); + } + + public ColumnFamilyHandle GetColumnFamily(string name) + { + if (columnFamilies == null) + throw new RocksDbSharpException("Database not opened for column families"); + return columnFamilies[name]; + } + + public void Put(string key, string value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null, Encoding encoding = null) + { + Native.Instance.rocksdb_transactiondb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, value, cf, encoding ?? DefaultEncoding); + } + + public void Put(byte[] key, byte[] value, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transactiondb_put(Handle, (wo ?? DefaultWriteOptions).Handle, key, val, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + Put(key, key.GetLongLength(0), value, value.GetLongLength(0), cf, writeOptions); } - public void Put(byte[] key, byte[] val, WriteOptions wo = null) + public void Put(byte[] key, long keyLength, byte[] value, long valueLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transactiondb_put(Handle, (wo ?? DefaultWriteOptions).Handle, key, new UIntPtr((ulong)key.Length), val, new UIntPtr((ulong)val.Length), out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + Native.Instance.rocksdb_transactiondb_put(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, value, valueLength, cf); } - public string Get(string key, ReadOptions o = null, Encoding enc = null) + public string Get(string key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null, Encoding encoding = null) { - IntPtr err; - var result = Native.Instance.rocksdb_transactiondb_get(Handle, (o ?? DefaultReadOptions).Handle, key, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); - - return result; + return Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, cf, encoding ?? DefaultEncoding); } - public byte[] Get(byte[] key, ReadOptions o = null) + public byte[] Get(byte[] key, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) { - IntPtr err; - var result = Native.Instance.rocksdb_transactiondb_get(Handle, (o ?? DefaultReadOptions).Handle, key, (ulong)key.Length, out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); - - return result; - } - + return Get(key, key.GetLongLength(0), cf, readOptions); + } + + public byte[] Get(byte[] key, long keyLength, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, cf); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + return Get(key, key.GetLongLength(0), buffer, offset, length, cf, readOptions); + } + + /// + /// Reads the contents of the database value associated with , if present, into the supplied + /// at up to bytes, returning the + /// length of the value in the database, or -1 if the key is not present. + /// + /// + /// + /// + /// + /// + /// + /// The actual length of the database field if it exists, otherwise -1 + public long Get(byte[] key, long keyLength, byte[] buffer, long offset, long length, ColumnFamilyHandle cf = null, ReadOptions readOptions = null) + { + unsafe + { + var ptr = Native.Instance.rocksdb_transactiondb_get(Handle, (readOptions ?? DefaultReadOptions).Handle, key, keyLength, out long valLength, cf); + if (ptr == IntPtr.Zero) + return -1; + var copyLength = Math.Min(length, valLength); + Marshal.Copy(ptr, buffer, (int)offset, (int)copyLength); + Native.Instance.rocksdb_free(ptr); + return valLength; + } + } + public void Write(WriteBatch writeBatch, WriteOptions writeOptions = null) { Native.Instance.rocksdb_transactiondb_write(Handle, (writeOptions ?? DefaultWriteOptions).Handle, writeBatch.Handle); } - public void Remove(string key, WriteOptions wo = null, Encoding enc = null) + public void Remove(string key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Native.Instance.rocksdb_transactiondb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, cf); + } + + public void Remove(byte[] key, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) + { + Remove(key, key.Length, cf, writeOptions); + } + + public void Remove(byte[] key, long keyLength, ColumnFamilyHandle cf = null, WriteOptions writeOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transactiondb_delete(Handle, (wo ?? DefaultWriteOptions).Handle, key, out err, enc); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + if (cf == null) + Native.Instance.rocksdb_transactiondb_delete(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength); + else + Native.Instance.rocksdb_transactiondb_delete_cf(Handle, (writeOptions ?? DefaultWriteOptions).Handle, key, keyLength, cf); } - public void Remove(byte[] key, WriteOptions wo = null) + public Iterator NewIterator(ReadOptions readOptions = null) { - IntPtr err; - Native.Instance.rocksdb_transactiondb_delete(Handle, (wo ?? DefaultWriteOptions).Handle, key, new UIntPtr((ulong)key.Length), out err); - if (err != IntPtr.Zero) - throw new RocksDbException(err); + IntPtr iteratorHandle = Native.Instance.rocksdb_transactiondb_create_iterator(Handle, (readOptions ?? DefaultReadOptions).Handle); + // Note: passing in read options here only to ensure that it is not collected before the iterator + return new Iterator(iteratorHandle, readOptions); } } } From 02e09b7b4d7f4de2ee0ab28472215d2dca2816d6 Mon Sep 17 00:00:00 2001 From: Stefan Rehm Date: Wed, 12 Jun 2019 15:01:26 +0200 Subject: [PATCH 7/7] Added tests for TransactionDbAdded tests for TransactionDb.. --- tests/RocksDbSharpTest/FunctionalTests.cs | 144 ++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/tests/RocksDbSharpTest/FunctionalTests.cs b/tests/RocksDbSharpTest/FunctionalTests.cs index 2b42779..5f63103 100644 --- a/tests/RocksDbSharpTest/FunctionalTests.cs +++ b/tests/RocksDbSharpTest/FunctionalTests.cs @@ -435,6 +435,150 @@ public void FunctionalTest() Directory.Delete(dbname, true); } + // TransactionDb tests + var testtxdb = Path.Combine(testdir, "txdb"); + var testtxcp = Path.Combine(testdir, "txcp"); + var txdbpath = Environment.ExpandEnvironmentVariables(testtxdb); + var txcppath = Environment.ExpandEnvironmentVariables(testtxcp); + var txdboptions = new TransactionDbOptions(); + + using (var db = TransactionDb.Open(options, txdboptions, txdbpath)) + { + // With strings + string value = db.Get("key"); + db.Put("key", "value"); + Assert.Equal("value", db.Get("key")); + Assert.Null(db.Get("non-existent-key")); + db.Remove("key"); + Assert.Null(db.Get("value")); + + // With bytes + db.Put(Encoding.UTF8.GetBytes("key"), Encoding.UTF8.GetBytes("value")); + Assert.True(BinaryComparer.Default.Equals(Encoding.UTF8.GetBytes("value"), db.Get(Encoding.UTF8.GetBytes("key")))); + // non-existent kiey + Assert.Null(db.Get(new byte[] { 0, 1, 2 })); + db.Remove(Encoding.UTF8.GetBytes("key")); + Assert.Null(db.Get(Encoding.UTF8.GetBytes("key"))); + + db.Put(Encoding.UTF8.GetBytes("key"), new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 }); + + // With buffers + var buffer = new byte[100]; + long length = db.Get(Encoding.UTF8.GetBytes("key"), buffer, 0, buffer.Length); + Assert.Equal(8, length); + Assert.Equal(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 }, buffer.Take((int)length).ToList()); + + buffer = new byte[5]; + length = db.Get(Encoding.UTF8.GetBytes("key"), buffer, 0, buffer.Length); + Assert.Equal(8, length); + Assert.Equal(new byte[] { 0, 1, 2, 3, 4 }, buffer.Take((int)Math.Min(buffer.Length, length))); + + length = db.Get(Encoding.UTF8.GetBytes("bogus"), buffer, 0, buffer.Length); + Assert.Equal(-1, length); + + // Write batches + // With strings + using (WriteBatch batch = new WriteBatch() + .Put("one", "uno") + .Put("two", "deuce") + .Put("two", "dos") + .Put("three", "tres")) + { + db.Write(batch); + } + Assert.Equal("uno", db.Get("one")); + + // With save point + using (WriteBatch batch = new WriteBatch()) + { + batch + .Put("hearts", "red") + .Put("diamonds", "red"); + batch.SetSavePoint(); + batch + .Put("clubs", "black"); + batch.SetSavePoint(); + batch + .Put("spades", "black"); + batch.RollbackToSavePoint(); + db.Write(batch); + } + Assert.Equal("red", db.Get("diamonds")); + Assert.Equal("black", db.Get("clubs")); + Assert.Null(db.Get("spades")); + + // Save a checkpoint + using (var cp = db.Checkpoint()) + { + cp.Save(txcppath); + } + + // With bytes + var utf8 = Encoding.UTF8; + using (WriteBatch batch = new WriteBatch() + .Put(utf8.GetBytes("four"), new byte[] { 4, 4, 4 }) + .Put(utf8.GetBytes("five"), new byte[] { 5, 5, 5 })) + { + db.Write(batch); + } + Assert.True(BinaryComparer.Default.Equals(new byte[] { 4, 4, 4 }, db.Get(utf8.GetBytes("four")))); + + // Snapshots + using (var snapshot = db.CreateSnapshot()) + { + var before = db.Get("one"); + db.Put("one", "1"); + + var useSnapshot = new ReadOptions() + .SetSnapshot(snapshot); + + // the database value was written + Assert.Equal("1", db.Get("one")); + // but the snapshot still sees the old version + var after = db.Get("one", readOptions: useSnapshot); + Assert.Equal(before, after); + } + + var two = db.Get("two"); + Assert.Equal("dos", two); + + // Iterators + using (var iterator = db.NewIterator( + readOptions: new ReadOptions() + .SetIterateUpperBound("t") + )) + { + iterator.Seek("k"); + Assert.True(iterator.Valid()); + Assert.Equal("key", iterator.StringKey()); + iterator.Next(); + Assert.True(iterator.Valid()); + Assert.Equal("one", iterator.StringKey()); + Assert.Equal("1", iterator.StringValue()); + iterator.Next(); + Assert.False(iterator.Valid()); + } + + // Transaction + using (var transaction = db.BeginTransaction(new WriteOptions(), new TransactionOptions())) + { + Assert.Equal("dos", transaction.Get("two")); + transaction.Put("two", "2"); + Assert.Equal("dos", db.Get("two")); + transaction.Commit(); + } + Assert.Equal("2", db.Get("two")); + } + + // Test reading checkpointed db + using (var cpdb = TransactionDb.Open(options, txdboptions, txcppath)) + { + Assert.Equal("red", cpdb.Get("diamonds")); + Assert.Equal("black", cpdb.Get("clubs")); + Assert.Null(cpdb.Get("spades")); + // Checkpoint occurred before these changes: + Assert.Null(cpdb.Get("four")); + } } class IntegerStringComparator : StringComparatorBase