diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/AbstractEdgeEncoder.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/AbstractEdgeEncoder.java index 3de238ea..96d0b05b 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/AbstractEdgeEncoder.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/AbstractEdgeEncoder.java @@ -24,6 +24,7 @@ public abstract class AbstractEdgeEncoder implements EdgeEncoder { public static final byte INDEXED_EDGE_TYPE = EncodedEdgeType.INDEXED_EDGE_TYPE.getCode(); public static final byte IMMUTABLE_INDEXED_EDGE_TYPE = EncodedEdgeType.IMMUTABLE_INDEXED_EDGE_TYPE.getCode(); + public static final byte EDGE_CACHE_TYPE = EncodedEdgeType.EDGE_CACHE_TYPE.getCode(); public static final String INSERT_TS_KEY = "__InsertTs__"; public static final String DELETE_TS_KEY = "__DeleteTs__"; @@ -170,6 +171,59 @@ public static void encodeIndexedEdgeValueToBuffer( } } + // --- EdgeCache encoding + // Row key : xxhash32 | directedSrc | labelId | EDGE_CACHE | direction | cacheCode + // Qualifier : cacheValues...(ordered) | directedTgt + // Value : ts | (propertyHash, propertyValue)... + + public static void encodeCacheEdgeKeyToBuffer( + Object directedSrc, Direction direction, int labelId, int cacheCode, EdgeBuffer buffer) { + buffer.encodeWithHash(directedSrc, labelId, EDGE_CACHE_TYPE); + buffer.encodeInt8(direction.getCode()); + buffer.encodeInt32(cacheCode); + } + + public static void encodeCacheEdgeQualifierToBuffer( + Cache cache, + long ts, + Object directedSrc, + Object directedTgt, + Map properties, + EdgeBuffer buffer) { + for (Index.Field field : cache.getFields()) { + Object value = resolveFieldValue(field.getName(), ts, directedSrc, directedTgt, properties); + buffer.encodeAny(value, field.getOrder()); + } + buffer.encodeAny(directedTgt); + } + + public static void encodeCacheEdgeValueToBuffer( + long ts, Map properties, EdgeBuffer buffer) { + buffer.encodeInt64(ts); + for (Map.Entry e : properties.entrySet()) { + buffer.encodeInt32(ValueUtils.stringHash(e.getKey())); + buffer.encodeAny(e.getValue()); + } + } + + private static Object resolveFieldValue( + String name, + long ts, + Object directedSrc, + Object directedTgt, + Map properties) { + switch (name) { + case "version": + return ts; + case "source": + return directedSrc; + case "target": + return directedTgt; + default: + return properties.get(name); + } + } + @Override public List> encodeAllIndexedEdges( long ts, @@ -187,6 +241,23 @@ public List> encodeAllIndexedEdges( .collect(Collectors.toList()); } + @Override + public List> encodeAllCacheEdges( + long ts, + Object src, + Object tgt, + Map props, + DirectionType dirType, + int labelId, + List caches) { + return caches.stream() + .flatMap( + cache -> + dirType.getDirs().stream() + .map(dir -> encodeCacheEdge(ts, src, tgt, props, dir, labelId, cache))) + .collect(Collectors.toList()); + } + // --- internal String useAsBase64String(Consumer block) { diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoder.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoder.java index b51b3d6f..34c54bdb 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoder.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoder.java @@ -2,7 +2,11 @@ import com.kakao.actionbase.v2.core.edge.BulkLoadEdge; import com.kakao.actionbase.v2.core.edge.Edge; -import com.kakao.actionbase.v2.core.metadata.*; +import com.kakao.actionbase.v2.core.metadata.Active; +import com.kakao.actionbase.v2.core.metadata.Direction; +import com.kakao.actionbase.v2.core.metadata.DirectionType; +import com.kakao.actionbase.v2.core.metadata.LabelDTO; +import com.kakao.actionbase.v2.core.metadata.LabelType; import java.util.ArrayList; import java.util.HashMap; @@ -84,6 +88,8 @@ public static List> bulkEncodeAll( edges.add(new KeyFieldValue<>(key.key, key.field, value)); } + List caches = label.getCaches(); + if (active == Active.ACTIVE) { // encode indexed edges if (labelType == LabelType.INDEXED || labelType == LabelType.IMMUTABLE_INDEXED) { @@ -91,9 +97,14 @@ public static List> bulkEncodeAll( edges.addAll( encoder.encodeAllIndexedEdges( castedEdge, label.getDirType(), labelId, label.getIndices())); + + // Cache records are only supported on INDEXED labels (V3 EDGE type). V3 multi-hop queries + // rely on the wide-row EdgeCacheRecord written here to stay in sync with EdgeIndexRecord. + if (caches != null && !caches.isEmpty()) { + edges.addAll( + encoder.encodeAllCacheEdges(castedEdge, label.getDirType(), labelId, caches)); + } } else if (labelType == LabelType.MULTI_EDGE) { - // For MultiEdge, create separate OUT/IN edges based on direction and reuse existing encoder - // BOTH: Split into two edges: src->edgeId (OUT), edgeId->tgt (IN) multiEdgeProps.put(SOURCE_FIELD_ON_STATE, castedEdge.getSrc()); multiEdgeProps.put(TARGET_FIELD_ON_STATE, castedEdge.getTgt()); @@ -106,17 +117,30 @@ public static List> bulkEncodeAll( edges.addAll( encoder.encodeAllIndexedEdges(inEdge, DirectionType.IN, labelId, label.getIndices())); + + if (caches != null && !caches.isEmpty()) { + edges.addAll(encoder.encodeAllCacheEdges(outEdge, DirectionType.OUT, labelId, caches)); + edges.addAll(encoder.encodeAllCacheEdges(inEdge, DirectionType.IN, labelId, caches)); + } } else if (label.getDirType() == DirectionType.OUT) { // OUT: Create src->edgeId edge Edge outEdge = new Edge(castedEdge.getTs(), castedEdge.getSrc(), edgeId, multiEdgeProps); edges.addAll( encoder.encodeAllIndexedEdges( outEdge, DirectionType.OUT, labelId, label.getIndices())); + + if (caches != null && !caches.isEmpty()) { + edges.addAll(encoder.encodeAllCacheEdges(outEdge, DirectionType.OUT, labelId, caches)); + } } else if (label.getDirType() == DirectionType.IN) { // IN: Create edgeId->tgt edge Edge inEdge = new Edge(castedEdge.getTs(), edgeId, castedEdge.getTgt(), multiEdgeProps); edges.addAll( encoder.encodeAllIndexedEdges(inEdge, DirectionType.IN, labelId, label.getIndices())); + + if (caches != null && !caches.isEmpty()) { + edges.addAll(encoder.encodeAllCacheEdges(inEdge, DirectionType.IN, labelId, caches)); + } } } diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BytesKeyValueEdgeEncoder.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BytesKeyValueEdgeEncoder.java index 8f47d89c..8a742402 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BytesKeyValueEdgeEncoder.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/BytesKeyValueEdgeEncoder.java @@ -115,6 +115,38 @@ public KeyFieldValue encodeIndexedEdge( return new KeyFieldValue<>(key, value); } + @Override + public KeyFieldValue encodeCacheEdge( + long ts, + Object src, + Object tgt, + Map props, + Direction dir, + int labelId, + Cache cache) { + Object directedSrc; + Object directedTgt; + if (dir == Direction.OUT) { + directedSrc = src; + directedTgt = tgt; + } else { + directedSrc = tgt; + directedTgt = src; + } + + byte[] key = + useAsByteArray( + buffer -> + encodeCacheEdgeKeyToBuffer(directedSrc, dir, labelId, cache.getCode(), buffer)); + byte[] field = + useAsByteArray( + buffer -> + encodeCacheEdgeQualifierToBuffer( + cache, ts, directedSrc, directedTgt, props, buffer)); + byte[] value = useAsByteArray(buffer -> encodeCacheEdgeValueToBuffer(ts, props, buffer)); + return new KeyFieldValue<>(key, field, value); + } + @Override public EncodedKey encodeIndexedEdgeKeyPrefix( Object directedSrc, Direction dir, int labelId, Index index, Consumer block) { diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/Cache.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/Cache.java new file mode 100644 index 00000000..956f6966 --- /dev/null +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/Cache.java @@ -0,0 +1,122 @@ +package com.kakao.actionbase.v2.core.code; + +import com.kakao.actionbase.v2.core.code.hbase.ValueUtils; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Defines a cache specification used to build {@code EdgeCacheRecord} (wide row) for multi-hop + * queries. The {@link #code} is derived at runtime by {@code XXHash32(cache)} with seed 0, which + * matches {@code XXHash32Wrapper.default.stringHash(cache)} in V3. + * + *

JSON shape (compatible with V3 {@code Cache.kt}): + * + *

+ * {
+ *   "cache": "top_created_at",
+ *   "fields": [{"name": "created_at", "order": "DESC"}],
+ *   "limit": 100,
+ *   "comment": "..."
+ * }
+ * 
+ */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class Cache implements Serializable { + + public static final int DEFAULT_LIMIT = 100; + public static final String DEFAULT_COMMENT = ""; + + @JsonProperty("cache") + private final String cache; + + @JsonProperty("fields") + private final List fields; + + @JsonProperty("limit") + private final int limit; + + @JsonProperty("comment") + private final String comment; + + @JsonIgnore private final int code; + + @JsonCreator + public Cache( + @JsonProperty("cache") String cache, + @JsonProperty("fields") List fields, + @JsonProperty("limit") Integer limit, + @JsonProperty("comment") String comment) { + if (cache == null || cache.isEmpty()) { + throw new IllegalArgumentException("Cache name must not be empty"); + } + int resolvedLimit = limit == null ? DEFAULT_LIMIT : limit; + if (resolvedLimit <= 0) { + throw new IllegalArgumentException("Cache limit must be positive, got: " + resolvedLimit); + } + this.cache = cache; + this.fields = fields == null ? Collections.emptyList() : fields; + this.limit = resolvedLimit; + this.comment = comment == null ? DEFAULT_COMMENT : comment; + this.code = ValueUtils.stringHash(cache); + } + + public Cache(String cache, List fields) { + this(cache, fields, null, null); + } + + public String getCache() { + return cache; + } + + public List getFields() { + return fields; + } + + public int getLimit() { + return limit; + } + + public String getComment() { + return comment; + } + + public int getCode() { + return code; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Cache)) return false; + Cache other = (Cache) obj; + return limit == other.limit + && cache.equals(other.cache) + && fields.equals(other.fields) + && comment.equals(other.comment); + } + + @Override + public int hashCode() { + return Objects.hash(cache, fields, limit, comment); + } + + @Override + public String toString() { + return "Cache{cache='" + + cache + + "', fields=" + + fields + + ", limit=" + + limit + + ", comment='" + + comment + + "'}"; + } +} diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/EdgeEncoder.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/EdgeEncoder.java index 9ae0bfc4..c1a8b39b 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/EdgeEncoder.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/EdgeEncoder.java @@ -76,4 +76,28 @@ default List> encodeAllIndexedEdges( return encodeAllIndexedEdges( edge.getTs(), edge.getSrc(), edge.getTgt(), edge.getProps(), dirType, labelId, indices); } + + KeyFieldValue encodeCacheEdge( + long ts, + Object src, + Object tgt, + Map props, + Direction dir, + int labelId, + Cache cache); + + List> encodeAllCacheEdges( + long ts, + Object src, + Object tgt, + Map props, + DirectionType dirType, + int labelId, + List caches); + + default List> encodeAllCacheEdges( + Edge edge, DirectionType dirType, int labelId, List caches) { + return encodeAllCacheEdges( + edge.getTs(), edge.getSrc(), edge.getTgt(), edge.getProps(), dirType, labelId, caches); + } } diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/StringKeyFieldValueEdgeEncoder.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/StringKeyFieldValueEdgeEncoder.java index ec6a3630..10e4d0d8 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/StringKeyFieldValueEdgeEncoder.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/code/StringKeyFieldValueEdgeEncoder.java @@ -110,6 +110,37 @@ public KeyFieldValue encodeIndexedEdge( return new KeyFieldValue<>(key, field, value); } + @Override + public KeyFieldValue encodeCacheEdge( + long ts, + Object src, + Object tgt, + Map props, + Direction dir, + int labelId, + Cache cache) { + Object directedSrc; + Object directedTgt; + if (dir == Direction.OUT) { + directedSrc = src; + directedTgt = tgt; + } else { + directedSrc = tgt; + directedTgt = src; + } + String key = + useAsBase64String( + buffer -> + encodeCacheEdgeKeyToBuffer(directedSrc, dir, labelId, cache.getCode(), buffer)); + String field = + useAsHexString( + buffer -> + encodeCacheEdgeQualifierToBuffer( + cache, ts, directedSrc, directedTgt, props, buffer)); + String value = useAsBase64String(buffer -> encodeCacheEdgeValueToBuffer(ts, props, buffer)); + return new KeyFieldValue<>(key, field, value); + } + @Override public EncodedKey encodeIndexedEdgeKeyPrefix( Object directedSrc, Direction dir, int labelId, Index index, Consumer block) { diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/EncodedEdgeType.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/EncodedEdgeType.java index b3a24731..2562e090 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/EncodedEdgeType.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/EncodedEdgeType.java @@ -8,6 +8,7 @@ public enum EncodedEdgeType { HASH_EDGE_TYPE((byte) -3), INDEXED_EDGE_TYPE((byte) -4), IMMUTABLE_INDEXED_EDGE_TYPE((byte) -5), + EDGE_CACHE_TYPE((byte) -6), ; private static final HashMap CODE_TO_VALUE_MAP = new HashMap<>(); diff --git a/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/LabelDTO.java b/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/LabelDTO.java index 0a447054..b4a132d5 100644 --- a/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/LabelDTO.java +++ b/codec-java/src/main/java/com/kakao/actionbase/v2/core/metadata/LabelDTO.java @@ -1,5 +1,6 @@ package com.kakao.actionbase.v2.core.metadata; +import com.kakao.actionbase.v2.core.code.Cache; import com.kakao.actionbase.v2.core.code.Index; import com.kakao.actionbase.v2.core.code.hbase.ValueUtils; import com.kakao.actionbase.v2.core.types.EdgeSchema; @@ -36,6 +37,9 @@ public class LabelDTO implements Serializable { @JsonProperty("indices") final List indices; + @JsonProperty("caches") + final List caches; + @JsonProperty("groups") final List groups; @@ -60,6 +64,7 @@ public LabelDTO( @JsonProperty("storage") String storage, @JsonProperty("indices") List indices, @JsonProperty("groups") List groups, + @JsonProperty("caches") List caches, @JsonProperty("event") boolean event, @JsonProperty("readOnly") boolean readOnly, @JsonProperty("mode") MutationMode mode) { @@ -70,6 +75,7 @@ public LabelDTO( this.dirType = dirType; this.storage = storage; this.indices = indices; + this.caches = caches; this.groups = groups; this.event = event; this.readOnly = readOnly; @@ -80,7 +86,7 @@ public LabelDTO( public LabelDTO copy(String name, String storage) { return new LabelDTO( - name, desc, type, schema, dirType, storage, indices, groups, event, readOnly, mode); + name, desc, type, schema, dirType, storage, indices, groups, caches, event, readOnly, mode); } public String getName() { @@ -111,6 +117,10 @@ public List getIndices() { return indices; } + public List getCaches() { + return caches; + } + public List getGroups() { return groups; } diff --git a/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoderTests.java b/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoderTests.java index 3fe650fb..8bd88301 100644 --- a/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoderTests.java +++ b/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/BulkEdgeEncoderTests.java @@ -22,7 +22,7 @@ public class BulkEdgeEncoderTests { ObjectMapper objectMapper = new ObjectMapper(); static final String labelJsonString = - "{\"name\":\"gift.like_product_v1\",\"desc\":\"Gift Wish\",\"type\":\"INDEXED\",\"schema\":{\"src\":{\"type\":\"LONG\"},\"tgt\":{\"type\":\"STRING\"},\"fields\":[{\"name\":\"created_at\",\"type\":\"LONG\",\"nullable\":false},{\"name\":\"permission\",\"type\":\"STRING\",\"nullable\":true},{\"name\":\"memo\",\"type\":\"STRING\",\"nullable\":true}]},\"dirType\":\"BOTH\",\"storage\":\"hbase_sandbox\",\"indices\":[{\"id\":0,\"name\":\"created_at_desc\",\"fields\":[{\"name\":\"created_at\",\"order\":\"DESC\"}]}],\"event\":false,\"readOnly\":false}"; + "{\"name\":\"gift.like_product_v1\",\"desc\":\"Gift Wish\",\"type\":\"INDEXED\",\"schema\":{\"src\":{\"type\":\"LONG\"},\"tgt\":{\"type\":\"STRING\"},\"fields\":[{\"name\":\"created_at\",\"type\":\"LONG\",\"nullable\":false},{\"name\":\"permission\",\"type\":\"STRING\",\"nullable\":true},{\"name\":\"memo\",\"type\":\"STRING\",\"nullable\":true}]},\"dirType\":\"BOTH\",\"storage\":\"hbase_sandbox\",\"indices\":[{\"id\":0,\"name\":\"created_at_desc\",\"fields\":[{\"name\":\"created_at\",\"order\":\"DESC\"}]}],\"caches\":[{\"cache\":\"top_created_at\",\"fields\":[{\"name\":\"created_at\",\"order\":\"DESC\"}],\"limit\":100}],\"event\":false,\"readOnly\":false}"; static final String edgeJsonString = "{\"active\":true,\"ts\":1,\"src\":123,\"tgt\":\"Coffee10\",\"props\":{\"created_at\":1, \"permission\":\"public\", \"memo\":\"for good morning\"}}"; @@ -32,6 +32,9 @@ void testFetchIndexedLabelAndEncodeEdges() throws JsonProcessingException { LabelDTO newLabel = label.copy("gift.like_product_v1_20240402_132500", "gift.like_product_v1_20240402_132500"); + assertEquals(1, newLabel.getCaches().size()); + assertEquals("top_created_at", newLabel.getCaches().get(0).getCache()); + BulkLoadEdge edge = objectMapper.readValue(edgeJsonString, BulkLoadEdge.class); Edge expectedEdge = edge.ensureType(newLabel.getSchema()); @@ -41,59 +44,60 @@ void testFetchIndexedLabelAndEncodeEdges() throws JsonProcessingException { List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, newLabel); - // 1 item for the hash edge. - // 2 items for indexed edges within a single index, covering both OUTBOUND and INBOUND - // directions. - // 2 items for counter edges, one each for OUTBOUND and INBOUND directions. - assertEquals(5, encodedEdges.size()); - - // bytes encoded edges should not have field. - encodedEdges.forEach(kv -> assertNull(kv.field)); + // 1 hash edge + 2 indexed (OUT/IN) + 2 cache (OUT/IN) + 2 counter (OUT/IN) + assertEquals(7, encodedEdges.size()); List expectedCounterKeys = Arrays.asList( encoder.encodeCounterEdgeKey(expectedEdge, Direction.OUT, newLabel.getId()), encoder.encodeCounterEdgeKey(expectedEdge, Direction.IN, newLabel.getId())); - encodedEdges.forEach( - kv -> { - if (kv.key.length != 0) { - DecodedEdge decodedEdge = DecodedEdge.from(kv, Collections.emptyMap()); - assertEquals(newLabel.getId(), decodedEdge.getLabelId()); - assertEquals(expectedEdge.getTs(), decodedEdge.getTs()); - if (decodedEdge.getType() == EncodedEdgeType.HASH_EDGE_TYPE) { - // hash edge - assertEquals(expectedEdge.getSrc(), decodedEdge.getSrc()); - assertEquals(expectedEdge.getTgt(), decodedEdge.getTgt()); - decodedEdge - .getPropertyAsMap() - .forEach((k, v) -> assertEquals(expectedEdge.getTs(), v.version)); - } else if (decodedEdge.getType() == EncodedEdgeType.INDEXED_EDGE_TYPE - && decodedEdge.getDirection() == Direction.OUT) { - // OUTBOUND of indexed edge - assertEquals(expectedEdge.getSrc(), decodedEdge.getSrc()); - assertEquals(expectedEdge.getTgt(), decodedEdge.getTgt()); - decodedEdge - .getPropertyAsMap() - .forEach((k, v) -> assertEquals(VersionValue.NO_VERSION, v.version)); - } else if (decodedEdge.getType() == EncodedEdgeType.INDEXED_EDGE_TYPE - && decodedEdge.getDirection() == Direction.IN) { - // INBOUND of indexed edge - assertEquals(expectedEdge.getSrc(), decodedEdge.getTgt()); - assertEquals(expectedEdge.getTgt(), decodedEdge.getSrc()); - decodedEdge - .getPropertyAsMap() - .forEach((k, v) -> assertEquals(VersionValue.NO_VERSION, v.version)); - } else { - fail(); - } - System.out.println(decodedEdge); - } else { - long matchCount = - expectedCounterKeys.stream().filter(k -> Arrays.equals(k, kv.value)).count(); - assertEquals(1, matchCount); - } - }); + int cacheRowCount = 0; + + for (KeyFieldValue kv : encodedEdges) { + // Cache rows are the only ones carrying a non-null qualifier in the bytes encoder; + // round-trip via V3 decoder is covered by V2MultiEdgeBulkLoadTest.testEdgeCache*. + if (kv.field != null) { + cacheRowCount++; + continue; + } + assertNull(kv.field); + + if (kv.key.length != 0) { + DecodedEdge decodedEdge = DecodedEdge.from(kv, Collections.emptyMap()); + assertEquals(newLabel.getId(), decodedEdge.getLabelId()); + assertEquals(expectedEdge.getTs(), decodedEdge.getTs()); + if (decodedEdge.getType() == EncodedEdgeType.HASH_EDGE_TYPE) { + assertEquals(expectedEdge.getSrc(), decodedEdge.getSrc()); + assertEquals(expectedEdge.getTgt(), decodedEdge.getTgt()); + decodedEdge + .getPropertyAsMap() + .forEach((k, v) -> assertEquals(expectedEdge.getTs(), v.version)); + } else if (decodedEdge.getType() == EncodedEdgeType.INDEXED_EDGE_TYPE + && decodedEdge.getDirection() == Direction.OUT) { + assertEquals(expectedEdge.getSrc(), decodedEdge.getSrc()); + assertEquals(expectedEdge.getTgt(), decodedEdge.getTgt()); + decodedEdge + .getPropertyAsMap() + .forEach((k, v) -> assertEquals(VersionValue.NO_VERSION, v.version)); + } else if (decodedEdge.getType() == EncodedEdgeType.INDEXED_EDGE_TYPE + && decodedEdge.getDirection() == Direction.IN) { + assertEquals(expectedEdge.getSrc(), decodedEdge.getTgt()); + assertEquals(expectedEdge.getTgt(), decodedEdge.getSrc()); + decodedEdge + .getPropertyAsMap() + .forEach((k, v) -> assertEquals(VersionValue.NO_VERSION, v.version)); + } else { + fail(); + } + } else { + long matchCount = + expectedCounterKeys.stream().filter(k -> Arrays.equals(k, kv.value)).count(); + assertEquals(1, matchCount); + } + } + + assertEquals(2, cacheRowCount, "expected 2 cache rows (OUT/IN)"); } @Test @@ -112,10 +116,8 @@ void testFetchIndexedLabelAndEncodeEdgesOutboundOnly() throws JsonProcessingExce List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, newLabel); - // 1 item for the hash edge. - // 1 item for indexed edges within a single index, covering only OUTBOUND direction. - // 1 item for counter edges, only for OUTBOUND direction. - assertEquals(3, encodedEdges.size()); + // 1 hash + 1 indexed(OUT) + 1 cache(OUT) + 1 counter(OUT) + assertEquals(4, encodedEdges.size()); } @Test @@ -133,14 +135,13 @@ void testFetchIndexedLabelAndEncodeEdgesInboundOnly() throws JsonProcessingExcep List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, newLabel); - // 1 item for the hash edge. - // 1 item for indexed edges within a single index, covering only INBOUND direction. - // 1 item for counter edges, only for INBOUND direction. - assertEquals(3, encodedEdges.size()); + // 1 hash + 1 indexed(IN) + 1 cache(IN) + 1 counter(IN) + assertEquals(4, encodedEdges.size()); } @Test void testFetchHashLabelAndEncodeEdges() throws JsonProcessingException { + // HASH labels never emit cache rows even when the `caches` field is set on the label. String labelJsonString1 = labelJsonString.replace("\"type\":\"INDEXED\"", "\"type\":\"HASH\""); LabelDTO label = objectMapper.readValue(labelJsonString1, LabelDTO.class); LabelDTO newLabel = @@ -154,8 +155,7 @@ void testFetchHashLabelAndEncodeEdges() throws JsonProcessingException { List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, newLabel); - // 1 item for the hash edge. - // 2 items for counter edges, one each for OUTBOUND and INBOUND directions. + // 1 hash + 2 counter (no indexed, no cache) assertEquals(3, encodedEdges.size()); } @@ -168,15 +168,13 @@ void testInactiveEdgeOnIndexedLabel() throws JsonProcessingException { String edgeJsonString1 = edgeJsonString.replace("\"active\":true", "\"active\":false"); BulkLoadEdge edge = objectMapper.readValue(edgeJsonString1, BulkLoadEdge.class); - System.out.println(edge); - EdgeEncoderFactory factory = new EdgeEncoderFactory(1); EdgeEncoder encoder = factory.bytesKeyValueEdgeEncoder; List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, newLabel); - // 1 item for the hash edge. + // Inactive edge on an INDEXED label with caches: only the hash tombstone is emitted. assertEquals(1, encodedEdges.size()); } @@ -190,8 +188,6 @@ void testInactiveEdgeOnHashLabel() throws JsonProcessingException { String edgeJsonString1 = edgeJsonString.replace("\"active\":true", "\"active\":false"); BulkLoadEdge edge = objectMapper.readValue(edgeJsonString1, BulkLoadEdge.class); - System.out.println(edge); - EdgeEncoderFactory factory = new EdgeEncoderFactory(1); EdgeEncoder encoder = factory.bytesKeyValueEdgeEncoder; @@ -201,4 +197,31 @@ void testInactiveEdgeOnHashLabel() throws JsonProcessingException { // 1 item for the hash edge. assertEquals(1, encodedEdges.size()); } + + /** + * Backward-compatibility: a label JSON without a `caches` entry must still deserialize, and the + * bulk encoder must skip cache-row generation without error. + */ + @Test + void testIndexedLabelWithoutCachesProducesNoCacheRows() throws JsonProcessingException { + String labelJsonWithoutCaches = + labelJsonString.replaceAll( + ",\"caches\":\\[\\{\"cache\":\"top_created_at\",\"fields\":\\[\\{\"name\":\"created_at\",\"order\":\"DESC\"}],\"limit\":100}]", + ""); + assertFalse(labelJsonWithoutCaches.contains("caches")); + + LabelDTO label = objectMapper.readValue(labelJsonWithoutCaches, LabelDTO.class); + assertNull(label.getCaches()); + + BulkLoadEdge edge = objectMapper.readValue(edgeJsonString, BulkLoadEdge.class); + + EdgeEncoderFactory factory = new EdgeEncoderFactory(1); + EdgeEncoder encoder = factory.bytesKeyValueEdgeEncoder; + + List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, label); + + // 1 hash + 2 indexed(OUT/IN) + 2 counter(OUT/IN) — no cache rows. + assertEquals(5, encodedEdges.size()); + encodedEdges.forEach(kv -> assertNull(kv.field, "no cache rows should be produced")); + } } diff --git a/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/MultiEdgeBulkEdgeEncoderTests.java b/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/MultiEdgeBulkEdgeEncoderTests.java index 1bdb6756..713891e6 100644 --- a/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/MultiEdgeBulkEdgeEncoderTests.java +++ b/codec-java/src/test/java/com/kakao/actionbase/v2/core/code/MultiEdgeBulkEdgeEncoderTests.java @@ -1,11 +1,12 @@ package com.kakao.actionbase.v2.core.code; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.kakao.actionbase.v2.core.edge.BulkLoadEdge; import com.kakao.actionbase.v2.core.metadata.LabelDTO; -import java.util.Base64; import java.util.List; import org.junit.jupiter.api.Test; @@ -66,6 +67,13 @@ public class MultiEdgeBulkEdgeEncoderTests { + " ]\n" + " }\n" + " ],\n" + + " \"caches\": [\n" + + " {\n" + + " \"cache\": \"top_created_at\",\n" + + " \"fields\": [{\"name\": \"created_at\", \"order\": \"DESC\"}],\n" + + " \"limit\": 100\n" + + " }\n" + + " ],\n" + " \"event\": false,\n" + " \"readOnly\": false\n" + "}"; @@ -89,31 +97,42 @@ void testMultiEdge() throws JsonProcessingException { LabelDTO label = objectMapper.readValue(labelJsonString, LabelDTO.class); BulkLoadEdge edge = objectMapper.readValue(edgeJsonString, BulkLoadEdge.class); + assertEquals(1, label.getCaches().size()); + EdgeEncoderFactory factory = new EdgeEncoderFactory(1); EdgeEncoder encoder = factory.bytesKeyValueEdgeEncoder; List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, label); - // 1 EdgeState - // 2 EdgeIndex (OUT, IN) - // 2 EdgeCount (OUT, IN) - assertEquals(5, encodedEdges.size()); + // 1 EdgeState + 2 EdgeIndex (OUT/IN) + 2 EdgeCache (OUT/IN) + 2 EdgeCount (OUT/IN) + assertEquals(7, encodedEdges.size()); + + long cacheRowCount = encodedEdges.stream().filter(kv -> kv.field != null).count(); + assertEquals(2, cacheRowCount, "expected 2 cache rows (OUT/IN)"); + } + + /** + * Backward-compatibility: a MULTI_EDGE label JSON without a `caches` entry must still deserialize + * and the bulk encoder must skip cache-row generation. + */ + @Test + void testMultiEdgeWithoutCaches() throws JsonProcessingException { + String labelJsonString1 = + labelJsonString.replaceAll("(?s),\\s*\"caches\":\\s*\\[[^\\]]+\\]", ""); + assertTrue(!labelJsonString1.contains("\"caches\"")); - encodedEdges.forEach( - kv -> { - String key = Base64.getEncoder().encodeToString(kv.getKey()); - String value = Base64.getEncoder().encodeToString(kv.getValue()); - System.out.println(key + ", " + value); - }); - - // see [com.kakao.actionbase.core.bulkload.V2MultiEdgeBulkLoadTest] - // jb3NsSyAAAAAAAAAASuiY3G2KX0sgAAAAAAAAAE=, - // KYEsgAAAAAAAAAEr1Wc4JSyAAAAAAAAAASyAAAAAAAAAASsLXozXNGZvciBnb29kIG1vcm5pbmcALIAAAAAAAAABKyqUN440cHVibGljACyAAAAAAAAAASvEPlKJLIAAAAAAAAB7LIAAAAAAAAABK0noVpM0Q29mZmVlMTAALIAAAAAAAAABK5JB3jEsgAAAAAAAAAEsgAAAAAAAAAE= - // XskydSyAAAAAAAAAeyuiY3G2KXwpgitptzPH03/////////+LIAAAAAAAAAB, - // LIAAAAAAAAABK9VnOCUsgAAAAAAAAAErC16M1zRmb3IgZ29vZCBtb3JuaW5nACsqlDeONHB1YmxpYwArxD5SiSyAAAAAAAAAeytJ6FaTNENvZmZlZTEwAA== - // 4IU4UDRDb2ZmZWUxMAAromNxtil8KYMrabczx9N//////////iyAAAAAAAAAAQ==, - // LIAAAAAAAAABK9VnOCUsgAAAAAAAAAErC16M1zRmb3IgZ29vZCBtb3JuaW5nACsqlDeONHB1YmxpYwArxD5SiSyAAAAAAAAAeytJ6FaTNENvZmZlZTEwAA== - // , dc/qIyyAAAAAAAAAeyuiY3G2KX4pgg== - // , 7s9/xDRDb2ZmZWUxMAAromNxtil+KYM= + LabelDTO label = objectMapper.readValue(labelJsonString1, LabelDTO.class); + assertNull(label.getCaches()); + + BulkLoadEdge edge = objectMapper.readValue(edgeJsonString, BulkLoadEdge.class); + + EdgeEncoderFactory factory = new EdgeEncoderFactory(1); + EdgeEncoder encoder = factory.bytesKeyValueEdgeEncoder; + + List> encodedEdges = BulkEdgeEncoder.bulkEncodeAll(encoder, edge, label); + + // 1 EdgeState + 2 EdgeIndex (OUT/IN) + 2 EdgeCount (OUT/IN) — no cache rows. + assertEquals(5, encodedEdges.size()); + assertEquals(0, encodedEdges.stream().filter(kv -> kv.field != null).count()); } } diff --git a/codec-java/src/test/java/com/kakao/actionbase/v2/core/metadata/LabelDTOTests.java b/codec-java/src/test/java/com/kakao/actionbase/v2/core/metadata/LabelDTOTests.java index 8d033c51..513278a0 100644 --- a/codec-java/src/test/java/com/kakao/actionbase/v2/core/metadata/LabelDTOTests.java +++ b/codec-java/src/test/java/com/kakao/actionbase/v2/core/metadata/LabelDTOTests.java @@ -35,6 +35,7 @@ void testLabelSerialization() throws JsonProcessingException { "created_at", Collections.singletonList(new Index.Field("created_at", Order.DESC)))), Collections.emptyList(), + Collections.emptyList(), false, false, MutationMode.ASYNC); @@ -42,7 +43,7 @@ void testLabelSerialization() throws JsonProcessingException { String jsonString = objectMapper.writeValueAsString(label); assertEquals( - "{\"name\":\"test.test\",\"desc\":\"desc\",\"type\":\"INDEXED\",\"schema\":{\"src\":{\"type\":\"LONG\",\"desc\":\"\"},\"tgt\":{\"type\":\"STRING\",\"desc\":\"\"},\"fields\":[{\"name\":\"created_at\",\"type\":\"LONG\",\"nullable\":false,\"desc\":\"\"}]},\"dirType\":\"BOTH\",\"storage\":\"test_storage\",\"indices\":[{\"name\":\"created_at\",\"fields\":[{\"name\":\"created_at\",\"order\":\"DESC\"}],\"desc\":\"\"}],\"groups\":[],\"event\":false,\"readOnly\":false,\"mode\":\"ASYNC\"}", + "{\"name\":\"test.test\",\"desc\":\"desc\",\"type\":\"INDEXED\",\"schema\":{\"src\":{\"type\":\"LONG\",\"desc\":\"\"},\"tgt\":{\"type\":\"STRING\",\"desc\":\"\"},\"fields\":[{\"name\":\"created_at\",\"type\":\"LONG\",\"nullable\":false,\"desc\":\"\"}]},\"dirType\":\"BOTH\",\"storage\":\"test_storage\",\"indices\":[{\"name\":\"created_at\",\"fields\":[{\"name\":\"created_at\",\"order\":\"DESC\"}],\"desc\":\"\"}],\"groups\":[],\"caches\":[],\"event\":false,\"readOnly\":false,\"mode\":\"ASYNC\"}", jsonString); } diff --git a/core/src/test/kotlin/com/kakao/actionbase/core/bulkload/V2MultiEdgeBulkLoadTest.kt b/core/src/test/kotlin/com/kakao/actionbase/core/bulkload/V2MultiEdgeBulkLoadTest.kt index 87e41baa..67e82ce7 100644 --- a/core/src/test/kotlin/com/kakao/actionbase/core/bulkload/V2MultiEdgeBulkLoadTest.kt +++ b/core/src/test/kotlin/com/kakao/actionbase/core/bulkload/V2MultiEdgeBulkLoadTest.kt @@ -1,9 +1,11 @@ package com.kakao.actionbase.core.bulkload import com.kakao.actionbase.core.codec.XXHash32Wrapper +import com.kakao.actionbase.core.edge.mapper.EdgeCacheRecordMapper import com.kakao.actionbase.core.edge.mapper.EdgeCountRecordMapper import com.kakao.actionbase.core.edge.mapper.EdgeIndexRecordMapper import com.kakao.actionbase.core.edge.mapper.EdgeStateRecordMapper +import com.kakao.actionbase.core.edge.record.EdgeCacheRecord import com.kakao.actionbase.core.edge.record.EdgeCountRecord import com.kakao.actionbase.core.edge.record.EdgeIndexRecord import com.kakao.actionbase.core.edge.record.EdgeStateRecord @@ -43,6 +45,7 @@ class V2MultiEdgeBulkLoadTest { private val stateDecoder: EdgeStateRecordMapper.Decoder = EdgeStateRecordMapper.create().decoder private val indexDecoder: EdgeIndexRecordMapper.Decoder = EdgeIndexRecordMapper.create().decoder private val countDecoder: EdgeCountRecordMapper.Decoder = EdgeCountRecordMapper.create().decoder + private val cacheDecoder: EdgeCacheRecordMapper.Decoder = EdgeCacheRecordMapper.create().decoder @Test fun testEdgeState() { @@ -210,4 +213,103 @@ class V2MultiEdgeBulkLoadTest { assertEquals(expected, edgeCountRecord) } + + /** + * Round-trip verification of bulk-encoded MULTI_EDGE cache row (OUT direction) via V3 decoder. + * + * Bytes captured from `MultiEdgeBulkEdgeEncoderTests.testMultiEdgeWithCaches` with + * `caches: [{"cache":"top_created_at","fields":[{"name":"created_at","order":"DESC"}],"limit":100}]`. + * + * For MULTI_EDGE OUT: directedSource = properties._source (original src = 123L), + * directedTarget = record.key.source (edge id = 1L). + */ + @Test + fun testEdgeCacheOut() { + val key0 = "GrcIXiyAAAAAAAAAeyuiY3G2KXopgivvyJjC" + val qualifier0 = "03/////////+LIAAAAAAAAAB" + val value0 = + "LIAAAAAAAAABK9VnOCUsgAAAAAAAAAErC16M1zRmb3IgZ29vZCBtb3JuaW5nACsqlDeONHB1YmxpYwArxD5SiSyAAAAAAAAAeytJ6FaTNENvZmZlZTEwAA==" + val key = Base64.getDecoder().decode(key0) + val qualifier = Base64.getDecoder().decode(qualifier0) + val value = Base64.getDecoder().decode(value0) + + val edgeCacheRecord = cacheDecoder.decode(key, qualifier, value) + + val expected = + EdgeCacheRecord( + key = + EdgeCacheRecord.Key.of( + directedSource = 123L, + tableCode = xxHash32Wrapper.stringHash("gift.like_product_v1_20240402_132500"), + direction = Direction.OUT, + cacheCode = xxHash32Wrapper.stringHash("top_created_at"), + ), + qualifier = + EdgeCacheRecord.Qualifier( + cacheValues = listOf(EdgeCacheRecord.Qualifier.CacheValue(value = 1L, order = Order.DESC)), + directedTarget = 1L, // edge id + ), + value = + EdgeCacheRecord.Value( + version = 1L, + properties = + mapOf( + xxHash32Wrapper.stringHash("_source") to 123L, + xxHash32Wrapper.stringHash("_target") to "Coffee10", + xxHash32Wrapper.stringHash("created_at") to 1L, + xxHash32Wrapper.stringHash("permission") to "public", + xxHash32Wrapper.stringHash("memo") to "for good morning", + ), + ), + ) + + assertEquals(expected, edgeCacheRecord) + } + + /** + * For MULTI_EDGE IN: directedSource = properties._target (original tgt = "Coffee10"), + * directedTarget = record.key.source (edge id = 1L). + */ + @Test + fun testEdgeCacheIn() { + val key0 = "FK89BDRDb2ZmZWUxMAAromNxtil6KYMr78iYwg==" + val qualifier0 = "03/////////+LIAAAAAAAAAB" + val value0 = + "LIAAAAAAAAABK9VnOCUsgAAAAAAAAAErC16M1zRmb3IgZ29vZCBtb3JuaW5nACsqlDeONHB1YmxpYwArxD5SiSyAAAAAAAAAeytJ6FaTNENvZmZlZTEwAA==" + val key = Base64.getDecoder().decode(key0) + val qualifier = Base64.getDecoder().decode(qualifier0) + val value = Base64.getDecoder().decode(value0) + + val edgeCacheRecord = cacheDecoder.decode(key, qualifier, value) + + val expected = + EdgeCacheRecord( + key = + EdgeCacheRecord.Key.of( + directedSource = "Coffee10", + tableCode = xxHash32Wrapper.stringHash("gift.like_product_v1_20240402_132500"), + direction = Direction.IN, + cacheCode = xxHash32Wrapper.stringHash("top_created_at"), + ), + qualifier = + EdgeCacheRecord.Qualifier( + cacheValues = listOf(EdgeCacheRecord.Qualifier.CacheValue(value = 1L, order = Order.DESC)), + directedTarget = 1L, // edge id + ), + value = + EdgeCacheRecord.Value( + version = 1L, + properties = + mapOf( + xxHash32Wrapper.stringHash("_source") to 123L, + xxHash32Wrapper.stringHash("_target") to "Coffee10", + xxHash32Wrapper.stringHash("created_at") to 1L, + xxHash32Wrapper.stringHash("permission") to "public", + xxHash32Wrapper.stringHash("memo") to "for good morning", + ), + ), + ) + + assertEquals(expected, edgeCacheRecord) + } }