Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions log/src/main/java/dev/opendata/LogDb.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,20 @@ public static LogDb open(LogDbConfig config) {
long sealIntervalMs = config.segmentation().sealIntervalMs() != null
? config.segmentation().sealIntervalMs()
: -1;
ReadVisibility readVisibility = config.readVisibility();

switch (storage) {
case StorageConfig.InMemory() -> {
try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.objectStoreInMemory()) {
NativeInterop.LogHandle logHandle = NativeInterop.logOpen(
0, null, objectStore.segment(), null, sealIntervalMs);
0, null, objectStore.segment(), null, sealIntervalMs, readVisibility);
return new LogDb(logHandle);
}
}
case StorageConfig.SlateDb slateDb -> {
try (NativeInterop.ObjectStoreHandle objectStore = NativeInterop.resolveObjectStore(slateDb.objectStore())) {
NativeInterop.LogHandle logHandle = NativeInterop.logOpen(
1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs);
1, slateDb.path(), objectStore.segment(), slateDb.settingsPath(), sealIntervalMs, readVisibility);
return new LogDb(logHandle);
}
}
Expand Down
27 changes: 21 additions & 6 deletions log/src/main/java/dev/opendata/LogDbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,35 @@
* Configuration for opening a {@link LogDb}.
*
* <p>This record holds all the settings needed to initialize a log instance,
* including storage backend configuration and segmentation settings.
* including storage backend configuration, segmentation settings, and read visibility.
*
* @param storage storage backend configuration
* @param segmentation segmentation configuration
* @param storage storage backend configuration
* @param segmentation segmentation configuration
* @param readVisibility controls which data is visible to reads
*/
public record LogDbConfig(
StorageConfig storage,
SegmentConfig segmentation
SegmentConfig segmentation,
ReadVisibility readVisibility
) {

/**
* Creates a config with the specified storage and default segmentation.
* Creates a config with the specified storage, default segmentation, and default read visibility.
*
* @param storage storage backend configuration
*/
public LogDbConfig(StorageConfig storage) {
this(storage, SegmentConfig.DEFAULT);
this(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY);
}

/**
* Creates a config with the specified storage and segmentation, and default read visibility.
*
* @param storage storage backend configuration
* @param segmentation segmentation configuration
*/
public LogDbConfig(StorageConfig storage, SegmentConfig segmentation) {
this(storage, segmentation, ReadVisibility.MEMORY);
}

public LogDbConfig {
Expand All @@ -32,6 +44,9 @@ public LogDbConfig(StorageConfig storage) {
if (segmentation == null) {
throw new IllegalArgumentException("segmentation must not be null");
}
if (readVisibility == null) {
throw new IllegalArgumentException("readVisibility must not be null");
}
}

/**
Expand Down
10 changes: 9 additions & 1 deletion log/src/main/java/dev/opendata/NativeInterop.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,15 @@ static ObjectStoreHandle resolveObjectStore(ObjectStoreConfig config) {

static LogHandle logOpen(int storageType, String slatedbPath,
MemorySegment objectStore, String settingsPath,
long sealIntervalMs) {
long sealIntervalMs, ReadVisibility readVisibility) {
try (Arena arena = Arena.ofConfined()) {
MemorySegment config = opendata_log_config_t.allocate(arena);
opendata_log_config_t.storage_type(config, (byte) storageType);
opendata_log_config_t.slatedb_path(config, marshalNullableCString(arena, slatedbPath));
opendata_log_config_t.object_store(config, objectStore);
opendata_log_config_t.settings_path(config, marshalNullableCString(arena, settingsPath));
opendata_log_config_t.seal_interval_ms(config, sealIntervalMs);
opendata_log_config_t.read_visibility(config, marshalReadVisibility(readVisibility));

MemorySegment outLog = arena.allocate(Native.C_POINTER);
checkResult(Native.opendata_log_open(arena, config, outLog));
Expand Down Expand Up @@ -458,6 +459,13 @@ private static void marshalRecords(Arena arena, Record[] records,
}
}

private static byte marshalReadVisibility(ReadVisibility readVisibility) {
return switch (readVisibility) {
case MEMORY -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_MEMORY();
case REMOTE -> (byte) Native.OPENDATA_LOG_READ_VISIBILITY_REMOTE();
};
}

private static MemorySegment marshalSeqRange(Arena arena, long startSequence) {
MemorySegment seqRange = opendata_log_seq_range_t.allocate(arena);

Expand Down
14 changes: 14 additions & 0 deletions log/src/main/java/dev/opendata/ReadVisibility.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dev.opendata;

/**
* Controls which data is visible to reads on a {@link LogDb}.
*
* <ul>
* <li>{@link #MEMORY} — reads include in-memory (uncommitted) data.
* <li>{@link #REMOTE} — reads only see data confirmed durable by the storage engine.
* </ul>
*/
public enum ReadVisibility {
MEMORY,
REMOTE
}
25 changes: 24 additions & 1 deletion log/src/test/java/dev/opendata/LogDbConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,30 @@ void shouldCreateWithStorageAndSegmentation() {

assertThat(config.storage()).isEqualTo(storage);
assertThat(config.segmentation()).isEqualTo(segmentation);
assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY);
}

@Test
void shouldCreateWithStorageOnlyUsingDefaultSegmentation() {
void shouldCreateWithAllParameters() {
var storage = new StorageConfig.InMemory();
var segmentation = SegmentConfig.withSealInterval(3600_000);

var config = new LogDbConfig(storage, segmentation, ReadVisibility.REMOTE);

assertThat(config.storage()).isEqualTo(storage);
assertThat(config.segmentation()).isEqualTo(segmentation);
assertThat(config.readVisibility()).isEqualTo(ReadVisibility.REMOTE);
}

@Test
void shouldCreateWithStorageOnlyUsingDefaults() {
var storage = new StorageConfig.InMemory();

var config = new LogDbConfig(storage);

assertThat(config.storage()).isEqualTo(storage);
assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT);
assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY);
}

@Test
Expand All @@ -36,6 +50,7 @@ void shouldCreateInMemoryConfig() {

assertThat(config.storage()).isInstanceOf(StorageConfig.InMemory.class);
assertThat(config.segmentation()).isEqualTo(SegmentConfig.DEFAULT);
assertThat(config.readVisibility()).isEqualTo(ReadVisibility.MEMORY);
}

@Test
Expand All @@ -53,6 +68,14 @@ void shouldRejectNullSegmentation() {
.hasMessageContaining("segmentation");
}

@Test
void shouldRejectNullReadVisibility() {
var storage = new StorageConfig.InMemory();
assertThatThrownBy(() -> new LogDbConfig(storage, SegmentConfig.DEFAULT, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("readVisibility");
}

@Test
void shouldCreateSlateDbConfig() {
var config = new LogDbConfig(
Expand Down
48 changes: 48 additions & 0 deletions log/src/test/java/dev/opendata/LogDbIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,54 @@ void shouldAssignContiguousSequencesAcrossBatches() {
}
}

@Test
void shouldSeeDataAfterFlushWithRemoteVisibility(@TempDir Path tempDir) {
var storage = new StorageConfig.SlateDb(
"visibility-remote-test",
new ObjectStoreConfig.Local(tempDir.toString())
);
var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.REMOTE);

byte[] key = "remote-vis-key".getBytes(StandardCharsets.UTF_8);

try (LogDb log = LogDb.open(config)) {
log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8));
log.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8));

// Flush ensures data is durable — should be visible with REMOTE visibility
log.flush();

try (LogScanIterator iter = log.scan(key, 0)) {
List<LogEntry> entries = collect(iter);
assertThat(entries).hasSize(2);
assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0");
assertThat(new String(entries.get(1).value(), StandardCharsets.UTF_8)).isEqualTo("value-1");
}
}
}

@Test
void shouldSeeUnflushedWritesWithMemoryVisibility(@TempDir Path tempDir) {
var storage = new StorageConfig.SlateDb(
"visibility-memory-test",
new ObjectStoreConfig.Local(tempDir.toString())
);
var config = new LogDbConfig(storage, SegmentConfig.DEFAULT, ReadVisibility.MEMORY);

byte[] key = "memory-vis-key".getBytes(StandardCharsets.UTF_8);

try (LogDb log = LogDb.open(config)) {
log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8));

// Data is written to memory — should be visible even without flush
try (LogScanIterator iter = log.scan(key, 0)) {
List<LogEntry> entries = collect(iter);
assertThat(entries).hasSize(1);
assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0");
}
}
}

@Test
void shouldReturnSameResultForRepeatedHasNext() {
try (LogDb log = LogDb.openInMemory()) {
Expand Down