Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public CompactStorageAdapter(@Nonnull final Config config,
* @param layer the layer of the node to fetch
* @param primaryKey the primary key of the node to fetch
*
* @return a future that will complete with the fetched {@link AbstractNode}
* @return a future that will complete with the fetched {@link AbstractNode} or {@code null} if the node cannot
* be fetched
*
* @throws IllegalStateException if the node cannot be found in the database for the given key
*/
Expand All @@ -101,7 +102,7 @@ protected CompletableFuture<AbstractNode<NodeReference>> fetchNodeInternal(@Nonn
return readTransaction.get(keyBytes)
.thenApply(valueBytes -> {
if (valueBytes == null) {
throw new IllegalStateException("cannot fetch node");
return null;
}
return nodeFromRaw(storageTransform, layer, primaryKey, keyBytes, valueBytes);
});
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SplittableRandom;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -89,8 +89,6 @@ public class HNSW {
@Nonnull
private static final Logger logger = LoggerFactory.getLogger(HNSW.class);

@Nonnull
private final Random random;
@Nonnull
private final Subspace subspace;
@Nonnull
Expand Down Expand Up @@ -141,7 +139,6 @@ public HNSW(@Nonnull final Subspace subspace,
@Nonnull final Config config,
@Nonnull final OnWriteListener onWriteListener,
@Nonnull final OnReadListener onReadListener) {
this.random = new Random(config.getRandomSeed());
this.subspace = subspace;
this.executor = executor;
this.config = config;
Expand Down Expand Up @@ -581,7 +578,7 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
return onReadListener.onAsyncRead(
storageAdapter.fetchNode(readTransaction, storageTransform, layer,
nodeReference.getPrimaryKey()))
.thenApply(node -> biMapFunction.apply(nodeReference, node));
.thenApply(node -> biMapFunction.apply(nodeReference, Objects.requireNonNull(node)));
}

/**
Expand Down Expand Up @@ -748,19 +745,35 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
@Nonnull
public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @Nonnull final Tuple newPrimaryKey,
@Nonnull final RealVector newVector) {
final int insertionLayer = insertionLayer();
final SplittableRandom random = random(newPrimaryKey);
final int insertionLayer = insertionLayer(random);
if (logger.isTraceEnabled()) {
logger.trace("new node with key={} selected to be inserted into layer={}", newPrimaryKey, insertionLayer);
}

return StorageAdapter.fetchAccessInfo(getConfig(), transaction, getSubspace(), getOnReadListener())
.thenCompose(accessInfo -> {
final AccessInfo currentAccessInfo;
.thenCombine(exists(transaction, newPrimaryKey),
(accessInfo, nodeAlreadyExists) -> {
if (nodeAlreadyExists) {
if (logger.isDebugEnabled()) {
logger.debug("new record already exists in HNSW with key={} on layer={}",
newPrimaryKey, insertionLayer);
}
}
return new AccessInfoAndNodeExistence(accessInfo, nodeAlreadyExists);
})
.thenCompose(accessInfoAndNodeExistence -> {
if (accessInfoAndNodeExistence.isNodeExists()) {
return AsyncUtil.DONE;
}

final AccessInfo accessInfo = accessInfoAndNodeExistence.getAccessInfo();
final AffineOperator storageTransform = storageTransform(accessInfo);
final Transformed<RealVector> transformedNewVector = storageTransform.transform(newVector);
final Quantizer quantizer = quantizer(accessInfo);
final Estimator estimator = quantizer.estimator();

final AccessInfo currentAccessInfo;
if (accessInfo == null) {
// this is the first node
writeLonelyNodes(quantizer, transaction, newPrimaryKey, transformedNewVector,
Expand Down Expand Up @@ -817,10 +830,24 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
insertIntoLayers(transaction, storageTransform, quantizer, newPrimaryKey,
transformedNewVector, nodeReference, lMax, insertionLayer))
.thenCompose(ignored ->
addToStatsIfNecessary(transaction, currentAccessInfo, transformedNewVector));
addToStatsIfNecessary(random.split(), transaction, currentAccessInfo, transformedNewVector));
}).thenCompose(ignored -> AsyncUtil.DONE);
}

@Nonnull
@VisibleForTesting
CompletableFuture<Boolean> exists(@Nonnull final ReadTransaction readTransaction,
@Nonnull final Tuple primaryKey) {
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(0);

//
// Call fetchNode() to check for the node's existence; we are handing in the identity operator, since we don't
// care about the vector itself at all.
//
return storageAdapter.fetchNode(readTransaction, AffineOperator.identity(), 0, primaryKey)
.thenApply(Objects::nonNull);
}

/**
* Method to keep stats if necessary. Stats need to be kept and maintained when the client would like to use
* e.g. RaBitQ as RaBitQ needs a stable somewhat correct centroid in order to function properly.
Expand All @@ -832,21 +859,23 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
* in order to finally compute the centroid if {@link Config#getStatsThreshold()} number of vectors have been
* sampled and aggregated. That centroid is then used to update the access info.
*
* @param random a random to use
* @param transaction the transaction
* @param currentAccessInfo this current access info that was fetched as part of an insert
* @param transformedNewVector the new vector (in the transformed coordinate system) that may be added
* @return a future that returns {@code null} when completed
*/
@Nonnull
private CompletableFuture<Void> addToStatsIfNecessary(@Nonnull final Transaction transaction,
private CompletableFuture<Void> addToStatsIfNecessary(@Nonnull final SplittableRandom random,
@Nonnull final Transaction transaction,
@Nonnull final AccessInfo currentAccessInfo,
@Nonnull final Transformed<RealVector> transformedNewVector) {
if (getConfig().isUseRaBitQ() && !currentAccessInfo.canUseRaBitQ()) {
if (shouldSampleVector()) {
if (shouldSampleVector(random)) {
StorageAdapter.appendSampledVector(transaction, getSubspace(),
1, transformedNewVector, onWriteListener);
}
if (shouldMaintainStats()) {
if (shouldMaintainStats(random)) {
return StorageAdapter.consumeSampledVectors(transaction, getSubspace(),
50, onReadListener)
.thenApply(sampledVectors -> {
Expand Down Expand Up @@ -1512,6 +1541,15 @@ private StorageAdapter<? extends NodeReference> getStorageAdapterForLayer(final
getOnReadListener());
}

@Nonnull
private SplittableRandom random(@Nonnull final Tuple primaryKey) {
if (config.isDeterministicSeeding()) {
return new SplittableRandom(primaryKey.hashCode());
} else {
return new SplittableRandom(System.nanoTime());
}
}

/**
* Calculates a random layer for a new element to be inserted.
* <p>
Expand All @@ -1521,20 +1559,20 @@ private StorageAdapter<? extends NodeReference> getStorageAdapterForLayer(final
* is {@code floor(-ln(u) * lambda)}, where {@code u} is a uniform random
* number and {@code lambda} is a normalization factor derived from a system
* configuration parameter {@code M}.
*
* @param random a random to use
* @return a non-negative integer representing the randomly selected layer.
*/
private int insertionLayer() {
private int insertionLayer(@Nonnull final SplittableRandom random) {
double lambda = 1.0 / Math.log(getConfig().getM());
double u = 1.0 - random.nextDouble(); // Avoid log(0)
return (int) Math.floor(-Math.log(u) * lambda);
}

private boolean shouldSampleVector() {
private boolean shouldSampleVector(@Nonnull final SplittableRandom random) {
return random.nextDouble() < getConfig().getSampleVectorStatsProbability();
}

private boolean shouldMaintainStats() {
private boolean shouldMaintainStats(@Nonnull final SplittableRandom random) {
return random.nextDouble() < getConfig().getMaintainStatsProbability();
}

Expand All @@ -1546,4 +1584,24 @@ private static <T> List<T> drain(@Nonnull Queue<T> queue) {
}
return resultBuilder.build();
}

private static class AccessInfoAndNodeExistence {
@Nullable
private final AccessInfo accessInfo;
private final boolean nodeExists;

public AccessInfoAndNodeExistence(@Nullable final AccessInfo accessInfo, final boolean nodeExists) {
this.accessInfo = accessInfo;
this.nodeExists = nodeExists;
}

@Nullable
public AccessInfo getAccessInfo() {
return accessInfo;
}

public boolean isNodeExists() {
return nodeExists;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.linear.AffineOperator;
import com.apple.foundationdb.linear.DoubleRealVector;
import com.apple.foundationdb.linear.FloatRealVector;
import com.apple.foundationdb.linear.HalfRealVector;
import com.apple.foundationdb.linear.Quantizer;
import com.apple.foundationdb.linear.RealVector;
import com.apple.foundationdb.linear.Transformed;
Expand Down Expand Up @@ -59,7 +57,6 @@
* @param <N> the type of {@link NodeReference} this storage adapter manages
*/
interface StorageAdapter<N extends NodeReference> {
ImmutableList<VectorType> VECTOR_TYPES = ImmutableList.copyOf(VectorType.values());

/**
* Subspace for data.
Expand Down Expand Up @@ -199,29 +196,24 @@ static RealVector vectorFromTuple(@Nonnull final Config config, @Nonnull final T
/**
* Creates a {@link RealVector} from a byte array.
* <p>
* This method interprets the input byte array by interpreting the first byte of the array as the precision shift.
* The byte array must have the proper size, i.e. the invariant {@code (bytesLength - 1) % precision == 0} must
* hold.
* This method interprets the input byte array by interpreting the first byte of the array.
* It the delegates to {@link RealVector#fromBytes(VectorType, byte[])}.
* @param config an HNSW config
* @param vectorBytes the non-null byte array to convert.
* @return a new {@link RealVector} instance created from the byte array.
* @throws com.google.common.base.VerifyException if the length of {@code vectorBytes} does not meet the invariant
* {@code (bytesLength - 1) % precision == 0}
*/
@Nonnull
static RealVector vectorFromBytes(@Nonnull final Config config, @Nonnull final byte[] vectorBytes) {
final byte vectorTypeOrdinal = vectorBytes[0];
switch (fromVectorTypeOrdinal(vectorTypeOrdinal)) {
case HALF:
return HalfRealVector.fromBytes(vectorBytes);
case SINGLE:
return FloatRealVector.fromBytes(vectorBytes);
case DOUBLE:
return DoubleRealVector.fromBytes(vectorBytes);
switch (RealVector.fromVectorTypeOrdinal(vectorTypeOrdinal)) {
case RABITQ:
Verify.verify(config.isUseRaBitQ());
return EncodedRealVector.fromBytes(vectorBytes, config.getNumDimensions(),
config.getRaBitQNumExBits());
case HALF:
case SINGLE:
case DOUBLE:
return RealVector.fromBytes(vectorBytes);
default:
throw new RuntimeException("unable to serialize vector");
}
Expand Down Expand Up @@ -251,11 +243,6 @@ static Tuple tupleFromVector(@Nonnull final RealVector vector) {
return Tuple.from(vector.getRawData());
}

@Nonnull
static VectorType fromVectorTypeOrdinal(final int ordinal) {
return VECTOR_TYPES.get(ordinal);
}

@Nonnull
static CompletableFuture<AccessInfo> fetchAccessInfo(@Nonnull final Config config,
@Nonnull final ReadTransaction readTransaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

package com.apple.foundationdb.linear;

import com.google.common.base.Preconditions;
import com.apple.foundationdb.half.Half;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import javax.annotation.Nonnull;

Expand All @@ -34,6 +35,8 @@
* data type conversions and raw data representation.
*/
public interface RealVector {
ImmutableList<VectorType> VECTOR_TYPES = ImmutableList.copyOf(VectorType.values());

/**
* Returns the number of elements in the vector, i.e. the number of dimensions.
* @return the number of dimensions
Expand Down Expand Up @@ -189,4 +192,47 @@
}
return withData(result);
}

@Nonnull
static VectorType fromVectorTypeOrdinal(final int ordinal) {
return VECTOR_TYPES.get(ordinal);
}

/**
* Creates a {@link RealVector} from a byte array.
* <p>
* This method interprets the input byte array by interpreting the first byte of the array as the type of vector.
* It then delegates to {@link #fromBytes(VectorType, byte[])} to do the actual deserialization.
*
* @param vectorBytes the non-null byte array to convert.
* @return a new {@link RealVector} instance created from the byte array.
*/
@Nonnull
static RealVector fromBytes(@Nonnull final byte[] vectorBytes) {
final byte vectorTypeOrdinal = vectorBytes[0];
return fromBytes(fromVectorTypeOrdinal(vectorTypeOrdinal), vectorBytes);
}

/**
* Creates a {@link RealVector} from a byte array.
* <p>
* This implementation dispatches to the actual logic that deserialize a byte array to a vector which is located in
* the respective implementations of {@link RealVector}.
* @param vectorType the vector type of the serialized vector
* @param vectorBytes the non-null byte array to convert.
* @return a new {@link RealVector} instance created from the byte array.
*/
@Nonnull
static RealVector fromBytes(@Nonnull final VectorType vectorType, @Nonnull final byte[] vectorBytes) {
switch (vectorType) {
case HALF:
return HalfRealVector.fromBytes(vectorBytes);
case SINGLE:
return FloatRealVector.fromBytes(vectorBytes);
case DOUBLE:
return DoubleRealVector.fromBytes(vectorBytes);
default:
throw new RuntimeException("unable to deserialize vector");

Check warning on line 235 in fdb-extensions/src/main/java/com/apple/foundationdb/linear/RealVector.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-extensions/src/main/java/com/apple/foundationdb/linear/RealVector.java#L235

Throw of generic exception RuntimeException https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3738%2Fnormen662%2Fvector-index-maintainence%3AHEAD&id=1BD774B474B934FDBAB377621DE60A36
}
}
}
Loading
Loading