Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public abstract class AbstractEdgeEncoder<T> implements EdgeEncoder<T> {
public static final byte INDEXED_EDGE_TYPE = EncodedEdgeType.INDEXED_EDGE_TYPE.getCode();
public static final byte IMMUTABLE_INDEXED_EDGE_TYPE =
EncodedEdgeType.IMMUTABLE_INDEXED_EDGE_TYPE.getCode();
public static final byte EDGE_CACHE_TYPE = EncodedEdgeType.EDGE_CACHE_TYPE.getCode();

public static final String INSERT_TS_KEY = "__InsertTs__";
public static final String DELETE_TS_KEY = "__DeleteTs__";
Expand Down Expand Up @@ -170,6 +171,59 @@ public static void encodeIndexedEdgeValueToBuffer(
}
}

// --- EdgeCache encoding
// Row key : xxhash32 | directedSrc | labelId | EDGE_CACHE | direction | cacheCode
// Qualifier : cacheValues...(ordered) | directedTgt
// Value : ts | (propertyHash, propertyValue)...

public static void encodeCacheEdgeKeyToBuffer(
Object directedSrc, Direction direction, int labelId, int cacheCode, EdgeBuffer buffer) {
buffer.encodeWithHash(directedSrc, labelId, EDGE_CACHE_TYPE);
buffer.encodeInt8(direction.getCode());
buffer.encodeInt32(cacheCode);
}

public static void encodeCacheEdgeQualifierToBuffer(
Cache cache,
long ts,
Object directedSrc,
Object directedTgt,
Map<String, Object> properties,
EdgeBuffer buffer) {
for (Index.Field field : cache.getFields()) {
Object value = resolveFieldValue(field.getName(), ts, directedSrc, directedTgt, properties);
buffer.encodeAny(value, field.getOrder());
}
buffer.encodeAny(directedTgt);
}

public static void encodeCacheEdgeValueToBuffer(
long ts, Map<String, Object> properties, EdgeBuffer buffer) {
buffer.encodeInt64(ts);
for (Map.Entry<String, Object> e : properties.entrySet()) {
buffer.encodeInt32(ValueUtils.stringHash(e.getKey()));
buffer.encodeAny(e.getValue());
}
}

private static Object resolveFieldValue(
String name,
long ts,
Object directedSrc,
Object directedTgt,
Map<String, Object> properties) {
switch (name) {
case "version":
return ts;
case "source":
return directedSrc;
case "target":
return directedTgt;
default:
return properties.get(name);
}
}

@Override
public List<KeyFieldValue<T>> encodeAllIndexedEdges(
long ts,
Expand All @@ -187,6 +241,23 @@ public List<KeyFieldValue<T>> encodeAllIndexedEdges(
.collect(Collectors.toList());
}

@Override
public List<KeyFieldValue<T>> encodeAllCacheEdges(
long ts,
Object src,
Object tgt,
Map<String, Object> props,
DirectionType dirType,
int labelId,
List<Cache> caches) {
return caches.stream()
.flatMap(
cache ->
dirType.getDirs().stream()
.map(dir -> encodeCacheEdge(ts, src, tgt, props, dir, labelId, cache)))
.collect(Collectors.toList());
}

// --- internal

String useAsBase64String(Consumer<EdgeBuffer> block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import com.kakao.actionbase.v2.core.edge.BulkLoadEdge;
import com.kakao.actionbase.v2.core.edge.Edge;
import com.kakao.actionbase.v2.core.metadata.*;
import com.kakao.actionbase.v2.core.metadata.Active;
import com.kakao.actionbase.v2.core.metadata.Direction;
import com.kakao.actionbase.v2.core.metadata.DirectionType;
import com.kakao.actionbase.v2.core.metadata.LabelDTO;
import com.kakao.actionbase.v2.core.metadata.LabelType;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -84,16 +88,23 @@ public static <T> List<KeyFieldValue<T>> bulkEncodeAll(
edges.add(new KeyFieldValue<>(key.key, key.field, value));
}

List<Cache> caches = label.getCaches();

if (active == Active.ACTIVE) {
// encode indexed edges
if (labelType == LabelType.INDEXED || labelType == LabelType.IMMUTABLE_INDEXED) {
// Keep existing code
edges.addAll(
encoder.encodeAllIndexedEdges(
castedEdge, label.getDirType(), labelId, label.getIndices()));

// Cache records are only supported on INDEXED labels (V3 EDGE type). V3 multi-hop queries
// rely on the wide-row EdgeCacheRecord written here to stay in sync with EdgeIndexRecord.
if (caches != null && !caches.isEmpty()) {
edges.addAll(
encoder.encodeAllCacheEdges(castedEdge, label.getDirType(), labelId, caches));
}
} else if (labelType == LabelType.MULTI_EDGE) {
// For MultiEdge, create separate OUT/IN edges based on direction and reuse existing encoder
// BOTH: Split into two edges: src->edgeId (OUT), edgeId->tgt (IN)
multiEdgeProps.put(SOURCE_FIELD_ON_STATE, castedEdge.getSrc());
multiEdgeProps.put(TARGET_FIELD_ON_STATE, castedEdge.getTgt());

Expand All @@ -106,17 +117,30 @@ public static <T> List<KeyFieldValue<T>> bulkEncodeAll(

edges.addAll(
encoder.encodeAllIndexedEdges(inEdge, DirectionType.IN, labelId, label.getIndices()));

if (caches != null && !caches.isEmpty()) {
edges.addAll(encoder.encodeAllCacheEdges(outEdge, DirectionType.OUT, labelId, caches));
edges.addAll(encoder.encodeAllCacheEdges(inEdge, DirectionType.IN, labelId, caches));
}
} else if (label.getDirType() == DirectionType.OUT) {
// OUT: Create src->edgeId edge
Edge outEdge = new Edge(castedEdge.getTs(), castedEdge.getSrc(), edgeId, multiEdgeProps);
edges.addAll(
encoder.encodeAllIndexedEdges(
outEdge, DirectionType.OUT, labelId, label.getIndices()));

if (caches != null && !caches.isEmpty()) {
edges.addAll(encoder.encodeAllCacheEdges(outEdge, DirectionType.OUT, labelId, caches));
}
} else if (label.getDirType() == DirectionType.IN) {
// IN: Create edgeId->tgt edge
Edge inEdge = new Edge(castedEdge.getTs(), edgeId, castedEdge.getTgt(), multiEdgeProps);
edges.addAll(
encoder.encodeAllIndexedEdges(inEdge, DirectionType.IN, labelId, label.getIndices()));

if (caches != null && !caches.isEmpty()) {
edges.addAll(encoder.encodeAllCacheEdges(inEdge, DirectionType.IN, labelId, caches));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,38 @@ public KeyFieldValue<byte[]> encodeIndexedEdge(
return new KeyFieldValue<>(key, value);
}

@Override
public KeyFieldValue<byte[]> encodeCacheEdge(
long ts,
Object src,
Object tgt,
Map<String, Object> props,
Direction dir,
int labelId,
Cache cache) {
Object directedSrc;
Object directedTgt;
if (dir == Direction.OUT) {
directedSrc = src;
directedTgt = tgt;
} else {
directedSrc = tgt;
directedTgt = src;
}

byte[] key =
useAsByteArray(
buffer ->
encodeCacheEdgeKeyToBuffer(directedSrc, dir, labelId, cache.getCode(), buffer));
byte[] field =
useAsByteArray(
buffer ->
encodeCacheEdgeQualifierToBuffer(
cache, ts, directedSrc, directedTgt, props, buffer));
byte[] value = useAsByteArray(buffer -> encodeCacheEdgeValueToBuffer(ts, props, buffer));
return new KeyFieldValue<>(key, field, value);
}

@Override
public EncodedKey<byte[]> encodeIndexedEdgeKeyPrefix(
Object directedSrc, Direction dir, int labelId, Index index, Consumer<EdgeBuffer> block) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.kakao.actionbase.v2.core.code;

import com.kakao.actionbase.v2.core.code.hbase.ValueUtils;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Defines a cache specification used to build {@code EdgeCacheRecord} (wide row) for multi-hop
* queries. The {@link #code} is derived at runtime by {@code XXHash32(cache)} with seed 0, which
* matches {@code XXHash32Wrapper.default.stringHash(cache)} in V3.
*
* <p>JSON shape (compatible with V3 {@code Cache.kt}):
*
* <pre>
* {
* "cache": "top_created_at",
* "fields": [{"name": "created_at", "order": "DESC"}],
* "limit": 100,
* "comment": "..."
* }
* </pre>
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Cache implements Serializable {

public static final int DEFAULT_LIMIT = 100;
public static final String DEFAULT_COMMENT = "";

@JsonProperty("cache")
private final String cache;

@JsonProperty("fields")
private final List<Index.Field> fields;

@JsonProperty("limit")
private final int limit;

@JsonProperty("comment")
private final String comment;

@JsonIgnore private final int code;

@JsonCreator
public Cache(
@JsonProperty("cache") String cache,
@JsonProperty("fields") List<Index.Field> fields,
@JsonProperty("limit") Integer limit,
@JsonProperty("comment") String comment) {
if (cache == null || cache.isEmpty()) {
throw new IllegalArgumentException("Cache name must not be empty");
}
int resolvedLimit = limit == null ? DEFAULT_LIMIT : limit;
if (resolvedLimit <= 0) {
throw new IllegalArgumentException("Cache limit must be positive, got: " + resolvedLimit);
}
this.cache = cache;
this.fields = fields == null ? Collections.emptyList() : fields;
this.limit = resolvedLimit;
this.comment = comment == null ? DEFAULT_COMMENT : comment;
this.code = ValueUtils.stringHash(cache);
}

public Cache(String cache, List<Index.Field> fields) {
this(cache, fields, null, null);
}

public String getCache() {
return cache;
}

public List<Index.Field> getFields() {
return fields;
}

public int getLimit() {
return limit;
}

public String getComment() {
return comment;
}

public int getCode() {
return code;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Cache)) return false;
Cache other = (Cache) obj;
return limit == other.limit
&& cache.equals(other.cache)
&& fields.equals(other.fields)
&& comment.equals(other.comment);
}

@Override
public int hashCode() {
return Objects.hash(cache, fields, limit, comment);
}

@Override
public String toString() {
return "Cache{cache='"
+ cache
+ "', fields="
+ fields
+ ", limit="
+ limit
+ ", comment='"
+ comment
+ "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,28 @@ default List<KeyFieldValue<T>> encodeAllIndexedEdges(
return encodeAllIndexedEdges(
edge.getTs(), edge.getSrc(), edge.getTgt(), edge.getProps(), dirType, labelId, indices);
}

KeyFieldValue<T> encodeCacheEdge(
long ts,
Object src,
Object tgt,
Map<String, Object> props,
Direction dir,
int labelId,
Cache cache);

List<KeyFieldValue<T>> encodeAllCacheEdges(
long ts,
Object src,
Object tgt,
Map<String, Object> props,
DirectionType dirType,
int labelId,
List<Cache> caches);

default List<KeyFieldValue<T>> encodeAllCacheEdges(
Edge edge, DirectionType dirType, int labelId, List<Cache> caches) {
return encodeAllCacheEdges(
edge.getTs(), edge.getSrc(), edge.getTgt(), edge.getProps(), dirType, labelId, caches);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ public KeyFieldValue<String> encodeIndexedEdge(
return new KeyFieldValue<>(key, field, value);
}

@Override
public KeyFieldValue<String> encodeCacheEdge(
long ts,
Object src,
Object tgt,
Map<String, Object> props,
Direction dir,
int labelId,
Cache cache) {
Object directedSrc;
Object directedTgt;
if (dir == Direction.OUT) {
directedSrc = src;
directedTgt = tgt;
} else {
directedSrc = tgt;
directedTgt = src;
}
String key =
useAsBase64String(
buffer ->
encodeCacheEdgeKeyToBuffer(directedSrc, dir, labelId, cache.getCode(), buffer));
String field =
useAsHexString(
buffer ->
encodeCacheEdgeQualifierToBuffer(
cache, ts, directedSrc, directedTgt, props, buffer));
String value = useAsBase64String(buffer -> encodeCacheEdgeValueToBuffer(ts, props, buffer));
return new KeyFieldValue<>(key, field, value);
}

@Override
public EncodedKey<String> encodeIndexedEdgeKeyPrefix(
Object directedSrc, Direction dir, int labelId, Index index, Consumer<EdgeBuffer> block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public enum EncodedEdgeType {
HASH_EDGE_TYPE((byte) -3),
INDEXED_EDGE_TYPE((byte) -4),
IMMUTABLE_INDEXED_EDGE_TYPE((byte) -5),
EDGE_CACHE_TYPE((byte) -6),
;

private static final HashMap<Byte, EncodedEdgeType> CODE_TO_VALUE_MAP = new HashMap<>();
Expand Down
Loading
Loading