From c6d285a32ab23f09639e7e37dd051a2a82e3386d Mon Sep 17 00:00:00 2001 From: Jason Gustafson <12502538+hachikuji@users.noreply.github.com> Date: Fri, 6 Mar 2026 10:51:01 -0800 Subject: [PATCH 1/4] Expose log read visibility --- log/src/main/java/dev/opendata/LogDb.java | 5 +- .../main/java/dev/opendata/LogDbConfig.java | 27 ++++++-- .../main/java/dev/opendata/NativeInterop.java | 10 ++- .../java/dev/opendata/ReadVisibility.java | 14 ++++ .../java/dev/opendata/LogDbConfigTest.java | 25 +++++++- .../dev/opendata/LogDbIntegrationTest.java | 64 +++++++++++++++++++ 6 files changed, 135 insertions(+), 10 deletions(-) create mode 100644 log/src/main/java/dev/opendata/ReadVisibility.java diff --git a/log/src/main/java/dev/opendata/LogDb.java b/log/src/main/java/dev/opendata/LogDb.java index 74cf3ea..e92a767 100644 --- a/log/src/main/java/dev/opendata/LogDb.java +++ b/log/src/main/java/dev/opendata/LogDb.java @@ -39,19 +39,20 @@ public static LogDb open(LogDbConfig config) { long sealIntervalMs = config.segmentation().sealIntervalMs() != null ? config.segmentation().sealIntervalMs() : -1; + ReadVisibility readVisibility = config.readVisibility(); switch (storage) { case StorageConfig.InMemory() -> { try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.objectStoreInMemory()) { NativeInterop.LogHandle logHandle = NativeInterop.logOpen( - 0, null, objectStore.segment(), null, sealIntervalMs); + 0, null, objectStore.segment(), null, sealIntervalMs, readVisibility); return new LogDb(logHandle); } } case StorageConfig.SlateDb slateDb -> { try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.resolveObjectStore(slateDb.objectStore())) { NativeInterop.LogHandle logHandle = NativeInterop.logOpen( - 1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs); + 1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs, readVisibility); return new LogDb(logHandle); } } diff --git a/log/src/main/java/dev/opendata/LogDbConfig.java b/log/src/main/java/dev/opendata/LogDbConfig.java index 9fae90a..ea2ffe1 100644 --- a/log/src/main/java/dev/opendata/LogDbConfig.java +++ b/log/src/main/java/dev/opendata/LogDbConfig.java @@ -6,23 +6,35 @@ * Configuration for opening a {@link LogDb}. * *

This record holds all the settings needed to initialize a log instance, - * including storage backend configuration and segmentation settings. + * including storage backend configuration, segmentation settings, and read visibility. * - * @param storage storage backend configuration - * @param segmentation segmentation configuration + * @param storage storage backend configuration + * @param segmentation segmentation configuration + * @param readVisibility controls which data is visible to reads */ public record LogDbConfig( StorageConfig storage, - SegmentConfig segmentation + SegmentConfig segmentation, + ReadVisibility readVisibility ) { /** - * Creates a config with the specified storage and default segmentation. + * Creates a config with the specified storage, default segmentation, and default read visibility. * * @param storage storage backend configuration */ public LogDbConfig(StorageConfig storage) { - this(storage, SegmentConfig.DEFAULT); + this(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY); + } + + /** + * Creates a config with the specified storage and segmentation, and default read visibility. + * + * @param storage storage backend configuration + * @param segmentation segmentation configuration + */ + public LogDbConfig(StorageConfig storage, SegmentConfig segmentation) { + this(storage, segmentation, ReadVisibility.MEMORY); } public LogDbConfig { @@ -32,6 +44,9 @@ public LogDbConfig(StorageConfig storage) { if (segmentation == null) { throw new IllegalArgumentException("segmentation must not be null"); } + if (readVisibility == null) { + throw new IllegalArgumentException("readVisibility must not be null"); + } } /** diff --git a/log/src/main/java/dev/opendata/NativeInterop.java b/log/src/main/java/dev/opendata/NativeInterop.java index 65cf488..3b72b8b 100644 --- a/log/src/main/java/dev/opendata/NativeInterop.java +++ b/log/src/main/java/dev/opendata/NativeInterop.java @@ -202,7 +202,7 @@ static ObjectStoreHandle resolveObjectStore(ObjectStoreConfig config) { static LogHandle logOpen(int storageType, String slatedbPath, MemorySegment objectStore, String settingsPath, - long sealIntervalMs) { + long sealIntervalMs, ReadVisibility readVisibility) { try (Arena arena = Arena.ofConfined()) { MemorySegment config = opendata_log_config_t.allocate(arena); opendata_log_config_t.storage_type(config, (byte) storageType); @@ -210,6 +210,7 @@ static LogHandle logOpen(int storageType, String slatedbPath, opendata_log_config_t.object_store(config, objectStore); opendata_log_config_t.settings_path(config, marshalNullableCString(arena, settingsPath)); opendata_log_config_t.seal_interval_ms(config, sealIntervalMs); + opendata_log_config_t.read_visibility(config, marshalReadVisibility(readVisibility)); MemorySegment outLog = arena.allocate(Native.C_POINTER); checkResult(Native.opendata_log_open(arena, config, outLog)); @@ -458,6 +459,13 @@ private static void marshalRecords(Arena arena, Record[] records, } } + private static byte marshalReadVisibility(ReadVisibility readVisibility) { + return switch (readVisibility) { + case MEMORY -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_MEMORY(); + case REMOTE -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_REMOTE(); + }; + } + private static MemorySegment marshalSeqRange(Arena arena, long startSequence) { MemorySegment seqRange = opendata_log_seq_range_t.allocate(arena); diff --git a/log/src/main/java/dev/opendata/ReadVisibility.java b/log/src/main/java/dev/opendata/ReadVisibility.java new file mode 100644 index 0000000..dead9e0 --- /dev/null +++ b/log/src/main/java/dev/opendata/ReadVisibility.java @@ -0,0 +1,14 @@ +package dev.opendata; + +/** + * Controls which data is visible to reads on a {@link LogDb}. + * + *

+ */ +public enum ReadVisibility { + MEMORY, + REMOTE +} diff --git a/log/src/test/java/dev/opendata/LogDbConfigTest.java b/log/src/test/java/dev/opendata/LogDbConfigTest.java index 94b8d92..26f0ecf 100644 --- a/log/src/test/java/dev/opendata/LogDbConfigTest.java +++ b/log/src/test/java/dev/opendata/LogDbConfigTest.java @@ -18,16 +18,30 @@ void shouldCreateWithStorageAndSegmentation() { assertThat(config.storage()).isEqualTo(storage); assertThat(config.segmentation()).isEqualTo(segmentation); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test - void shouldCreateWithStorageOnlyUsingDefaultSegmentation() { + void shouldCreateWithAllParameters() { + var storage = new StorageConfig.InMemory(); + var segmentation = SegmentConfig.withSealInterval(3600_000); + + var config = new LogDbConfig(storage, segmentation, ReadVisibility.REMOTE); + + assertThat(config.storage()).isEqualTo(storage); + assertThat(config.segmentation()).isEqualTo(segmentation); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.REMOTE); + } + + @Test + void shouldCreateWithStorageOnlyUsingDefaults() { var storage = new StorageConfig.InMemory(); var config = new LogDbConfig(storage); assertThat(config.storage()).isEqualTo(storage); assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test @@ -36,6 +50,7 @@ void shouldCreateInMemoryConfig() { assertThat(config.storage()).isInstanceOf(StorageConfig.InMemory.class); assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test @@ -53,6 +68,14 @@ void shouldRejectNullSegmentation() { .hasMessageContaining("segmentation"); } + @Test + void shouldRejectNullReadVisibility() { + var storage = new StorageConfig.InMemory(); + assertThatThrownBy(() -> new LogDbConfig(storage, SegmentConfig.DEFAULT, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("readVisibility"); + } + @Test void shouldCreateSlateDbConfig() { var config = new LogDbConfig( diff --git a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java index 0e72157..369def2 100644 --- a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java +++ b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java @@ -6,7 +6,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -660,6 +662,68 @@ void shouldAssignContiguousSequencesAcrossBatches() { } } + @Test + void shouldNotSeeUnflushedWritesWithRemoteVisibility(@TempDir Path tempDir) throws IOException { + // Use a long flush_interval so SlateDB won't auto-flush during the test + Path settingsFile = tempDir.resolve("slatedb.toml"); + Files.writeString(settingsFile, "flush_interval = \"1h\"\n"); + + var storage = new StorageConfig.SlateDb( + "visibility-remote-test", + new ObjectStoreConfig.Local(tempDir.toString()), + settingsFile.toString() + ); + var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.REMOTE); + + byte[] key = "remote-vis-key".getBytes(StandardCharsets.UTF_8); + + try (LogDb log = LogDb.open(config)) { + log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + + // Data is written but not yet durable — should not be visible + try (LogScanIterator iter = log.scan(key, 0)) { + List entries = collect(iter); + assertThat(entries).isEmpty(); + } + + // Flush makes data durable — should now be visible + log.flush(); + + try (LogScanIterator iter = log.scan(key, 0)) { + List entries = collect(iter); + assertThat(entries).hasSize(1); + assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); + } + } + } + + @Test + void shouldSeeUnflushedWritesWithMemoryVisibility(@TempDir Path tempDir) throws IOException { + // Use a long flush_interval so SlateDB won't auto-flush during the test + Path settingsFile = tempDir.resolve("slatedb.toml"); + Files.writeString(settingsFile, "flush_interval = \"1h\"\n"); + + var storage = new StorageConfig.SlateDb( + "visibility-memory-test", + new ObjectStoreConfig.Local(tempDir.toString()), + settingsFile.toString() + ); + var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY); + + byte[] key = "memory-vis-key".getBytes(StandardCharsets.UTF_8); + + try (LogDb log = LogDb.open(config)) { + log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + + // Data is written to memory — should be visible even without flush + try (LogScanIterator iter = log.scan(key, 0)) { + List entries = collect(iter); + assertThat(entries).hasSize(1); + assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); + } + } + } + @Test void shouldReturnSameResultForRepeatedHasNext() { try (LogDb log = LogDb.openInMemory()) { From f708c150e81684e10736bbfeff42d6adab6fe9d4 Mon Sep 17 00:00:00 2001 From: Jason Gustafson <12502538+hachikuji@users.noreply.github.com> Date: Fri, 6 Mar 2026 10:56:00 -0800 Subject: [PATCH 2/4] Change CI build to use the pr 298 branch --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21df7cb..f3c2216 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,8 @@ jobs: uses: actions/checkout@v4 - name: Checkout opendata - run: git clone --depth 1 https://github.com/opendata-oss/opendata.git ../opendata + # TODO: revert to main after opendata-oss/opendata#298 merges + run: git clone --depth 1 --branch log-c-read-visibility-enum https://github.com/opendata-oss/opendata.git ../opendata - name: Install Rust toolchain uses: dtolnay/rust-toolchain@stable From 0d3e32c255b3942bb1250b9586550bcdc57fac79 Mon Sep 17 00:00:00 2001 From: Jason Gustafson <12502538+hachikuji@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:15:10 -0800 Subject: [PATCH 3/4] Revert "Change CI build to use the pr 298 branch" This reverts commit f708c150e81684e10736bbfeff42d6adab6fe9d4. --- .github/workflows/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3c2216..21df7cb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,8 +29,7 @@ jobs: uses: actions/checkout@v4 - name: Checkout opendata - # TODO: revert to main after opendata-oss/opendata#298 merges - run: git clone --depth 1 --branch log-c-read-visibility-enum https://github.com/opendata-oss/opendata.git ../opendata + run: git clone --depth 1 https://github.com/opendata-oss/opendata.git ../opendata - name: Install Rust toolchain uses: dtolnay/rust-toolchain@stable From 2383901d464fe8c374af8d7799f3065c2f372878 Mon Sep 17 00:00:00 2001 From: Jason Gustafson <12502538+hachikuji@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:39:54 -0800 Subject: [PATCH 4/4] Fix failing test --- .../dev/opendata/LogDbIntegrationTest.java | 32 +++++-------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java index 369def2..dd31cef 100644 --- a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java +++ b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java @@ -6,9 +6,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -663,15 +661,10 @@ void shouldAssignContiguousSequencesAcrossBatches() { } @Test - void shouldNotSeeUnflushedWritesWithRemoteVisibility(@TempDir Path tempDir) throws IOException { - // Use a long flush_interval so SlateDB won't auto-flush during the test - Path settingsFile = tempDir.resolve("slatedb.toml"); - Files.writeString(settingsFile, "flush_interval = \"1h\"\n"); - + void shouldSeeDataAfterFlushWithRemoteVisibility(@TempDir Path tempDir) { var storage = new StorageConfig.SlateDb( "visibility-remote-test", - new ObjectStoreConfig.Local(tempDir.toString()), - settingsFile.toString() + new ObjectStoreConfig.Local(tempDir.toString()) ); var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.REMOTE); @@ -679,34 +672,25 @@ void shouldNotSeeUnflushedWritesWithRemoteVisibility(@TempDir Path tempDir) thro try (LogDb log = LogDb.open(config)) { log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8)); - // Data is written but not yet durable — should not be visible - try (LogScanIterator iter = log.scan(key, 0)) { - List entries = collect(iter); - assertThat(entries).isEmpty(); - } - - // Flush makes data durable — should now be visible + // Flush ensures data is durable — should be visible with REMOTE visibility log.flush(); try (LogScanIterator iter = log.scan(key, 0)) { List entries = collect(iter); - assertThat(entries).hasSize(1); + assertThat(entries).hasSize(2); assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); + assertThat(new String(entries.get(1).value(), StandardCharsets.UTF_8)).isEqualTo("value-1"); } } } @Test - void shouldSeeUnflushedWritesWithMemoryVisibility(@TempDir Path tempDir) throws IOException { - // Use a long flush_interval so SlateDB won't auto-flush during the test - Path settingsFile = tempDir.resolve("slatedb.toml"); - Files.writeString(settingsFile, "flush_interval = \"1h\"\n"); - + void shouldSeeUnflushedWritesWithMemoryVisibility(@TempDir Path tempDir) { var storage = new StorageConfig.SlateDb( "visibility-memory-test", - new ObjectStoreConfig.Local(tempDir.toString()), - settingsFile.toString() + new ObjectStoreConfig.Local(tempDir.toString()) ); var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY);