Skip to content

Commit 983be80

Browse files
authored
Require plain json object as metadata (#348)
* Require plain json object as metadata
1 parent 1bcc300 commit 983be80

File tree

5 files changed

+73
-110
lines changed

5 files changed

+73
-110
lines changed

docs/api/appending-events.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,18 +256,17 @@ This feature is only available in KurrentDB 25.1 and later.
256256
You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.
257257

258258
::: warning
259-
Currently, metadata must be valid JSON. Binary metadata will not be supported in
260-
this version. This limitation ensures compatibility with KurrentDB's metadata
261-
handling and will be removed in the next major release.
259+
Metadata must be a valid JSON object, using string keys and string values only.
260+
Binary metadata is not supported in this version to maintain compatibility with
261+
KurrentDB's metadata handling. This restriction will be lifted in the next major
262+
release.
262263
:::
263264

264265
```java
265266
JsonMapper mapper = new JsonMapper();
266267

267268
Map<String, Object> metadata = new HashMap<>();
268-
metadata.put("timestamp", Instant.now().toString());
269-
metadata.put("source", "OrderProcessingSystem");
270-
metadata.put("version", 1.0);
269+
metadata.put("source", "OrderProcessingSystem");
271270

272271
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
273272

src/main/java/io/kurrent/dbclient/DynamicValueMapper.java

Lines changed: 14 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,8 @@
22

33
import com.fasterxml.jackson.core.type.TypeReference;
44
import com.fasterxml.jackson.databind.json.JsonMapper;
5-
import com.google.protobuf.ByteString;
6-
import com.google.protobuf.Timestamp;
7-
import com.google.protobuf.Duration;
85
import com.google.protobuf.Value;
96

10-
import java.time.Instant;
11-
import java.time.LocalDateTime;
12-
import java.time.ZonedDateTime;
137
import java.util.Collections;
148
import java.util.Map;
159
import java.util.stream.Collectors;
@@ -25,6 +19,7 @@ public class DynamicValueMapper {
2519
*
2620
* @param jsonMetadata the source metadata as JSON bytes
2721
* @return a map with DynamicValue objects
22+
* @throws IllegalArgumentException if any metadata value is not a string
2823
*/
2924
public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
3025
if (jsonMetadata == null || jsonMetadata.length == 0)
@@ -35,7 +30,7 @@ public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
3530
});
3631
return mapToValueMap(metadata);
3732
} catch (Exception e) {
38-
return Collections.emptyMap();
33+
throw new IllegalArgumentException(e);
3934
}
4035
}
4136

@@ -44,85 +39,26 @@ public static Map<String, Value> mapJsonToValueMap(byte[] jsonMetadata) {
4439
*
4540
* @param metadata the source metadata map
4641
* @return a map with DynamicValue objects
42+
* @throws IllegalArgumentException if any metadata value is not a string
4743
*/
48-
public static Map<String, Value> mapToValueMap(Map<String, Object> metadata) {
44+
public static Map<String, Value> mapToValueMap(Map<String, ?> metadata) {
4945
if (metadata == null) {
5046
return Collections.emptyMap();
5147
}
5248

53-
return metadata.entrySet().stream()
54-
.collect(Collectors.toMap(
55-
Map.Entry::getKey,
56-
entry -> mapToValue(entry.getValue())
57-
));
58-
}
59-
60-
/**
61-
* Converts a Java object to a DynamicValue protobuf message.
62-
*
63-
* @param source the source object
64-
* @return the corresponding DynamicValue
65-
*/
66-
public static Value mapToValue(Object source) {
67-
if (source == null) {
68-
return Value.newBuilder()
69-
.setNullValue(com.google.protobuf.NullValue.NULL_VALUE)
70-
.build();
49+
for (Map.Entry<String, ?> entry : metadata.entrySet()) {
50+
if (entry.getValue() != null && !(entry.getValue() instanceof String)) {
51+
throw new IllegalArgumentException(
52+
String.format("Metadata value for key '%s' must be a string, but was %s",
53+
entry.getKey(),
54+
entry.getValue().getClass().getSimpleName())
55+
);
56+
}
7157
}
7258

7359
Value.Builder builder = Value.newBuilder();
7460

75-
if (source instanceof String) {
76-
return builder.setStringValue((String) source).build();
77-
} else if (source instanceof Boolean) {
78-
return builder.setBoolValue((Boolean) source).build();
79-
} else if (source instanceof Integer) {
80-
return builder.setNumberValue((Integer) source).build();
81-
} else if (source instanceof Long) {
82-
return builder.setNumberValue((Long) source).build();
83-
} else if (source instanceof Float) {
84-
return builder.setNumberValue((Float) source).build();
85-
} else if (source instanceof Double) {
86-
return builder.setNumberValue((Double) source).build();
87-
} else if (source instanceof Instant) {
88-
Instant instant = (Instant) source;
89-
return builder.setStringValue(
90-
Timestamp.newBuilder()
91-
.setSeconds(instant.getEpochSecond())
92-
.setNanos(instant.getNano())
93-
.build().toString()
94-
).build();
95-
} else if (source instanceof LocalDateTime) {
96-
LocalDateTime localDateTime = (LocalDateTime) source;
97-
Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant();
98-
return builder.setStringValue(
99-
Timestamp.newBuilder()
100-
.setSeconds(instant.getEpochSecond())
101-
.setNanos(instant.getNano())
102-
.build().toString()
103-
).build();
104-
} else if (source instanceof ZonedDateTime) {
105-
ZonedDateTime zonedDateTime = (ZonedDateTime) source;
106-
Instant instant = zonedDateTime.toInstant();
107-
return builder.setStringValue(
108-
Timestamp.newBuilder()
109-
.setSeconds(instant.getEpochSecond())
110-
.setNanos(instant.getNano())
111-
.build().toString()
112-
).build();
113-
} else if (source instanceof java.time.Duration) {
114-
java.time.Duration duration = (java.time.Duration) source;
115-
return builder.setStringValue(
116-
Duration.newBuilder()
117-
.setSeconds(duration.getSeconds())
118-
.setNanos(duration.getNano())
119-
.build().toString()
120-
).build();
121-
} else if (source instanceof byte[]) {
122-
return builder.setStringValue(ByteString.copyFrom((byte[]) source).toStringUtf8()).build();
123-
} else {
124-
// For any other type, convert to string
125-
return builder.setStringValue(source.toString()).build();
126-
}
61+
return metadata.entrySet().stream()
62+
.collect(Collectors.toMap(Map.Entry::getKey, entry -> builder.setStringValue((String) entry.getValue()).build()));
12763
}
12864
}

src/main/java/io/kurrent/dbclient/MultiStreamAppend.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.google.protobuf.ByteString;
44
import com.google.protobuf.Value;
5-
import com.google.rpc.ErrorInfo;
65
import io.grpc.StatusRuntimeException;
76
import io.grpc.stub.StreamObserver;
87
import io.grpc.stub.ClientResponseObserver;

src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import org.slf4j.LoggerFactory;
66

77
import java.io.IOException;
8-
import java.time.Instant;
98
import java.util.*;
109
import java.util.concurrent.ExecutionException;
1110

@@ -36,7 +35,7 @@ public static void cleanup() {
3635
}
3736

3837
@Test
39-
public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException {
38+
public void testBadEventMetadata() throws ExecutionException, InterruptedException, IOException {
4039
KurrentDBClient client = getDefaultClient();
4140

4241
Optional<ServerVersion> version = client.getServerVersion().get();
@@ -48,16 +47,50 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept
4847

4948
// Arrange
5049
String streamName1 = generateName();
51-
String streamName2 = generateName();
5250

5351
Map<String, Object> metadata = new HashMap<>();
5452
metadata.put("stringProperty", "hello world");
5553
metadata.put("intProperty", 42);
56-
metadata.put("longProperty", 9876543210L);
57-
metadata.put("booleanProperty", true);
58-
metadata.put("doubleProperty", 3.14159);
59-
metadata.put("nullProperty", null);
60-
metadata.put("timestampProperty", Instant.now().toString());
54+
55+
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
56+
57+
EventData event1 = EventData.builderAsJson("event-a", "{\"data\":\"test1\"}".getBytes())
58+
.metadataAsBytes(metadataBytes)
59+
.build();
60+
61+
List<EventData> events1 = Collections.singletonList(event1);
62+
63+
List<AppendStreamRequest> requests = Collections.singletonList(
64+
new AppendStreamRequest(streamName1, events1.iterator(), StreamState.noStream())
65+
);
66+
67+
// Act & Assert
68+
Assertions.assertThrows(IllegalArgumentException.class, () -> {
69+
try {
70+
client.multiStreamAppend(requests.iterator()).get();
71+
} catch (ExecutionException e) {
72+
throw e.getCause();
73+
}
74+
});
75+
}
76+
77+
@Test
78+
public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException {
79+
KurrentDBClient client = getDefaultClient();
80+
81+
Optional<ServerVersion> version = client.getServerVersion().get();
82+
83+
Assumptions.assumeTrue(
84+
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
85+
"Multi-stream append is not supported server versions below 25.0.0"
86+
);
87+
88+
// Arrange
89+
String streamName1 = generateName();
90+
String streamName2 = generateName();
91+
92+
Map<String, String> metadata = new HashMap<>();
93+
metadata.put("stringProperty", "hello world");
6194

6295
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
6396

@@ -94,12 +127,6 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept
94127

95128
Map deserializedMetadata = mapper.readValue(readMetadata, Map.class);
96129
Assertions.assertEquals(metadata.get("stringProperty"), deserializedMetadata.get("stringProperty"));
97-
Assertions.assertEquals(metadata.get("intProperty"), deserializedMetadata.get("intProperty"));
98-
Assertions.assertEquals(metadata.get("longProperty"), ((Number) deserializedMetadata.get("longProperty")).longValue());
99-
Assertions.assertEquals(metadata.get("booleanProperty"), deserializedMetadata.get("booleanProperty"));
100-
Assertions.assertEquals((Double) metadata.get("doubleProperty"), ((Number) deserializedMetadata.get("doubleProperty")).doubleValue(), 0.00001);
101-
Assertions.assertEquals(metadata.get("timestampProperty"), deserializedMetadata.get("timestampProperty"));
102-
Assertions.assertNull(deserializedMetadata.get("nullProperty"));
103130

104131
List<ResolvedEvent> readEvents2 = client.readStream(streamName2, ReadStreamOptions.get()).get().getEvents();
105132
Assertions.assertEquals(1, readEvents2.size());

src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws T
344344
}
345345

346346
@Test
347-
default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable {
347+
default void testMultiStreamAppendIsInstrumentedWithErrors() throws Throwable {
348348
KurrentDBClient client = getDefaultClient();
349349

350350
Optional<ServerVersion> version = client.getServerVersion().get();
@@ -377,20 +377,22 @@ default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable
377377
StreamState.streamExists()
378378
);
379379

380-
MultiStreamAppendResponse result = client.multiStreamAppend(
381-
Arrays.asList(request1, request2).iterator()
382-
).get();
380+
WrongExpectedVersionException actualException = null;
381+
try {
382+
MultiStreamAppendResponse result = client.multiStreamAppend(
383+
Arrays.asList(request1, request2).iterator()
384+
).get();
385+
} catch (ExecutionException e) {
386+
if (e.getCause() instanceof WrongExpectedVersionException)
387+
actualException = (WrongExpectedVersionException) e.getCause();
388+
}
383389

384-
Assertions.assertNotNull(result);
385-
Assertions.assertFalse(result.getResults().isEmpty());
386-
Assertions.assertTrue(result.getPosition() > 0);
390+
// Ensure WrongExpectedVersionException was thrown.
391+
Assertions.assertNotNull(actualException);
387392

388393
List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
389394
Assertions.assertEquals(1, spans.size());
390395

391-
ReadableSpan span = spans.get(0);
392-
393-
Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode());
394-
Assertions.assertEquals(SpanKind.CLIENT, span.getKind());
396+
assertErroneousSpanHasExpectedAttributes(spans.get(0), actualException);
395397
}
396398
}

0 commit comments

Comments
 (0)