diff --git a/log/src/main/java/dev/opendata/LogDb.java b/log/src/main/java/dev/opendata/LogDb.java index 74cf3ea..e92a767 100644 --- a/log/src/main/java/dev/opendata/LogDb.java +++ b/log/src/main/java/dev/opendata/LogDb.java @@ -39,19 +39,20 @@ public static LogDb open(LogDbConfig config) { long sealIntervalMs = config.segmentation().sealIntervalMs() != null ? config.segmentation().sealIntervalMs() : -1; + ReadVisibility readVisibility = config.readVisibility(); switch (storage) { case StorageConfig.InMemory() -> { try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.objectStoreInMemory()) { NativeInterop.LogHandle logHandle = NativeInterop.logOpen( - 0, null, objectStore.segment(), null, sealIntervalMs); + 0, null, objectStore.segment(), null, sealIntervalMs, readVisibility); return new LogDb(logHandle); } } case StorageConfig.SlateDb slateDb -> { try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.resolveObjectStore(slateDb.objectStore())) { NativeInterop.LogHandle logHandle = NativeInterop.logOpen( - 1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs); + 1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs, readVisibility); return new LogDb(logHandle); } } diff --git a/log/src/main/java/dev/opendata/LogDbConfig.java b/log/src/main/java/dev/opendata/LogDbConfig.java index 9fae90a..ea2ffe1 100644 --- a/log/src/main/java/dev/opendata/LogDbConfig.java +++ b/log/src/main/java/dev/opendata/LogDbConfig.java @@ -6,23 +6,35 @@ * Configuration for opening a {@link LogDb}. * *

This record holds all the settings needed to initialize a log instance, - * including storage backend configuration and segmentation settings. + * including storage backend configuration, segmentation settings, and read visibility. * - * @param storage storage backend configuration - * @param segmentation segmentation configuration + * @param storage storage backend configuration + * @param segmentation segmentation configuration + * @param readVisibility controls which data is visible to reads */ public record LogDbConfig( StorageConfig storage, - SegmentConfig segmentation + SegmentConfig segmentation, + ReadVisibility readVisibility ) { /** - * Creates a config with the specified storage and default segmentation. + * Creates a config with the specified storage, default segmentation, and default read visibility. * * @param storage storage backend configuration */ public LogDbConfig(StorageConfig storage) { - this(storage, SegmentConfig.DEFAULT); + this(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY); + } + + /** + * Creates a config with the specified storage and segmentation, and default read visibility. + * + * @param storage storage backend configuration + * @param segmentation segmentation configuration + */ + public LogDbConfig(StorageConfig storage, SegmentConfig segmentation) { + this(storage, segmentation, ReadVisibility.MEMORY); } public LogDbConfig { @@ -32,6 +44,9 @@ public LogDbConfig(StorageConfig storage) { if (segmentation == null) { throw new IllegalArgumentException("segmentation must not be null"); } + if (readVisibility == null) { + throw new IllegalArgumentException("readVisibility must not be null"); + } } /** diff --git a/log/src/main/java/dev/opendata/NativeInterop.java b/log/src/main/java/dev/opendata/NativeInterop.java index 65cf488..3b72b8b 100644 --- a/log/src/main/java/dev/opendata/NativeInterop.java +++ b/log/src/main/java/dev/opendata/NativeInterop.java @@ -202,7 +202,7 @@ static ObjectStoreHandle resolveObjectStore(ObjectStoreConfig config) { static LogHandle logOpen(int storageType, String slatedbPath, MemorySegment objectStore, String settingsPath, - long sealIntervalMs) { + long sealIntervalMs, ReadVisibility readVisibility) { try (Arena arena = Arena.ofConfined()) { MemorySegment config = opendata_log_config_t.allocate(arena); opendata_log_config_t.storage_type(config, (byte) storageType); @@ -210,6 +210,7 @@ static LogHandle logOpen(int storageType, String slatedbPath, opendata_log_config_t.object_store(config, objectStore); opendata_log_config_t.settings_path(config, marshalNullableCString(arena, settingsPath)); opendata_log_config_t.seal_interval_ms(config, sealIntervalMs); + opendata_log_config_t.read_visibility(config, marshalReadVisibility(readVisibility)); MemorySegment outLog = arena.allocate(Native.C_POINTER); checkResult(Native.opendata_log_open(arena, config, outLog)); @@ -458,6 +459,13 @@ private static void marshalRecords(Arena arena, Record[] records, } } + private static byte marshalReadVisibility(ReadVisibility readVisibility) { + return switch (readVisibility) { + case MEMORY -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_MEMORY(); + case REMOTE -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_REMOTE(); + }; + } + private static MemorySegment marshalSeqRange(Arena arena, long startSequence) { MemorySegment seqRange = opendata_log_seq_range_t.allocate(arena); diff --git a/log/src/main/java/dev/opendata/ReadVisibility.java b/log/src/main/java/dev/opendata/ReadVisibility.java new file mode 100644 index 0000000..dead9e0 --- /dev/null +++ b/log/src/main/java/dev/opendata/ReadVisibility.java @@ -0,0 +1,14 @@ +package dev.opendata; + +/** + * Controls which data is visible to reads on a {@link LogDb}. + * + *

+ */ +public enum ReadVisibility { + MEMORY, + REMOTE +} diff --git a/log/src/test/java/dev/opendata/LogDbConfigTest.java b/log/src/test/java/dev/opendata/LogDbConfigTest.java index 94b8d92..26f0ecf 100644 --- a/log/src/test/java/dev/opendata/LogDbConfigTest.java +++ b/log/src/test/java/dev/opendata/LogDbConfigTest.java @@ -18,16 +18,30 @@ void shouldCreateWithStorageAndSegmentation() { assertThat(config.storage()).isEqualTo(storage); assertThat(config.segmentation()).isEqualTo(segmentation); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test - void shouldCreateWithStorageOnlyUsingDefaultSegmentation() { + void shouldCreateWithAllParameters() { + var storage = new StorageConfig.InMemory(); + var segmentation = SegmentConfig.withSealInterval(3600_000); + + var config = new LogDbConfig(storage, segmentation, ReadVisibility.REMOTE); + + assertThat(config.storage()).isEqualTo(storage); + assertThat(config.segmentation()).isEqualTo(segmentation); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.REMOTE); + } + + @Test + void shouldCreateWithStorageOnlyUsingDefaults() { var storage = new StorageConfig.InMemory(); var config = new LogDbConfig(storage); assertThat(config.storage()).isEqualTo(storage); assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test @@ -36,6 +50,7 @@ void shouldCreateInMemoryConfig() { assertThat(config.storage()).isInstanceOf(StorageConfig.InMemory.class); assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT); + assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY); } @Test @@ -53,6 +68,14 @@ void shouldRejectNullSegmentation() { .hasMessageContaining("segmentation"); } + @Test + void shouldRejectNullReadVisibility() { + var storage = new StorageConfig.InMemory(); + assertThatThrownBy(() -> new LogDbConfig(storage, SegmentConfig.DEFAULT, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("readVisibility"); + } + @Test void shouldCreateSlateDbConfig() { var config = new LogDbConfig( diff --git a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java index 0e72157..dd31cef 100644 --- a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java +++ b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java @@ -660,6 +660,54 @@ void shouldAssignContiguousSequencesAcrossBatches() { } } + @Test + void shouldSeeDataAfterFlushWithRemoteVisibility(@TempDir Path tempDir) { + var storage = new StorageConfig.SlateDb( + "visibility-remote-test", + new ObjectStoreConfig.Local(tempDir.toString()) + ); + var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.REMOTE); + + byte[] key = "remote-vis-key".getBytes(StandardCharsets.UTF_8); + + try (LogDb log = LogDb.open(config)) { + log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8)); + + // Flush ensures data is durable — should be visible with REMOTE visibility + log.flush(); + + try (LogScanIterator iter = log.scan(key, 0)) { + List entries = collect(iter); + assertThat(entries).hasSize(2); + assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); + assertThat(new String(entries.get(1).value(), StandardCharsets.UTF_8)).isEqualTo("value-1"); + } + } + } + + @Test + void shouldSeeUnflushedWritesWithMemoryVisibility(@TempDir Path tempDir) { + var storage = new StorageConfig.SlateDb( + "visibility-memory-test", + new ObjectStoreConfig.Local(tempDir.toString()) + ); + var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY); + + byte[] key = "memory-vis-key".getBytes(StandardCharsets.UTF_8); + + try (LogDb log = LogDb.open(config)) { + log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + + // Data is written to memory — should be visible even without flush + try (LogScanIterator iter = log.scan(key, 0)) { + List entries = collect(iter); + assertThat(entries).hasSize(1); + assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); + } + } + } + @Test void shouldReturnSameResultForRepeatedHasNext() { try (LogDb log = LogDb.openInMemory()) {