Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions fdb-extensions/fdb-extensions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,44 @@ dependencies {
testFixturesAnnotationProcessor(libs.autoService)
}

def siftSmallFile = layout.buildDirectory.file('downloads/siftsmall.tar.gz')
def extractDir = layout.buildDirectory.dir("extracted")
// Describe all files here
def siftDownloads = [
siftsmall: [
url : 'https://huggingface.co/datasets/vecdata/siftsmall/resolve/3106e1b83049c44713b1ce06942d0ab474bbdfb6/siftsmall.tar.gz',
dest: layout.buildDirectory.file("downloads/siftsmall.tar.gz")
],
sift1mbase: [
url : 'https://huggingface.co/datasets/qbo-odp/sift1m/resolve/main/sift_base.fvecs',
dest: layout.buildDirectory.file("downloads/sift_base.fvecs")
],
sift1mgroundtruth: [
url : 'https://huggingface.co/datasets/qbo-odp/sift1m/resolve/main/sift_groundtruth.ivecs',
dest: layout.buildDirectory.file("downloads/sift_groundtruth.ivecs")
],
sift1mquery: [
url : 'https://huggingface.co/datasets/qbo-odp/sift1m/resolve/main/sift_query.fvecs',
dest: layout.buildDirectory.file("downloads/sift_query.fvecs")
],
]

// Task that downloads the CSV exactly once unless it changed
tasks.register('downloadSiftSmall', de.undercouch.gradle.tasks.download.Download) {
src 'https://huggingface.co/datasets/vecdata/siftsmall/resolve/3106e1b83049c44713b1ce06942d0ab474bbdfb6/siftsmall.tar.gz'
dest siftSmallFile.get().asFile
onlyIfModified true
tempAndMove true
retries 3
// Register one Download task per entry
def downloadTasks = siftDownloads.collect { name, cfg ->
tasks.register("download${name.capitalize()}", Download) {
src cfg.url
dest cfg.dest.get().asFile
onlyIfModified true
tempAndMove true
retries 3
overwrite false
outputs.file(dest)
}
}

def extractDir = layout.buildDirectory.dir("extracted")

tasks.register('extractSiftSmall', Copy) {
dependsOn 'downloadSiftSmall'
from(tarTree(resources.gzip(siftSmallFile)))
dependsOn downloadTasks
from(tarTree(resources.gzip(siftDownloads.siftsmall.dest)))
into extractDir

doLast {
Expand All @@ -72,6 +95,7 @@ tasks.register('extractSiftSmall', Copy) {
}

test {
dependsOn downloadTasks
dependsOn tasks.named('extractSiftSmall')
inputs.dir extractDir
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public CompactStorageAdapter(@Nonnull final Config config,
* @param layer the layer of the node to fetch
* @param primaryKey the primary key of the node to fetch
*
* @return a future that will complete with the fetched {@link AbstractNode}
* @return a future that will complete with the fetched {@link AbstractNode} or {@code null} if the node cannot
* be fetched
*
* @throws IllegalStateException if the node cannot be found in the database for the given key
*/
Expand All @@ -101,7 +102,7 @@ protected CompletableFuture<AbstractNode<NodeReference>> fetchNodeInternal(@Nonn
return readTransaction.get(keyBytes)
.thenApply(valueBytes -> {
if (valueBytes == null) {
throw new IllegalStateException("cannot fetch node");
return null;
}
return nodeFromRaw(storageTransform, layer, primaryKey, keyBytes, valueBytes);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
return onReadListener.onAsyncRead(
storageAdapter.fetchNode(readTransaction, storageTransform, layer,
nodeReference.getPrimaryKey()))
.thenApply(node -> biMapFunction.apply(nodeReference, node));
.thenApply(node -> biMapFunction.apply(nodeReference, Objects.requireNonNull(node)));
}

/**
Expand Down Expand Up @@ -754,6 +754,18 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
}

return StorageAdapter.fetchAccessInfo(getConfig(), transaction, getSubspace(), getOnReadListener())
.thenCombine(exists(transaction, newPrimaryKey),
(accessInfo, nodeAlreadyExists) -> {
if (nodeAlreadyExists) {
if (logger.isInfoEnabled()) {
logger.info("new record already exists in HNSW with key={} on layer={}", newPrimaryKey,
insertionLayer);
}

throw new IllegalStateException("key already exists");
}
return accessInfo;
})
.thenCompose(accessInfo -> {
final AccessInfo currentAccessInfo;
final AffineOperator storageTransform = storageTransform(accessInfo);
Expand Down Expand Up @@ -821,6 +833,15 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
}).thenCompose(ignored -> AsyncUtil.DONE);
}

@Nonnull
@VisibleForTesting
CompletableFuture<Boolean> exists(@Nonnull final ReadTransaction readTransaction,
@Nonnull final Tuple primaryKey) {
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(0);
return storageAdapter.fetchNode(readTransaction, AffineOperator.identity(), 0, primaryKey)
.thenApply(Objects::nonNull);
}

/**
* Method to keep stats if necessary. Stats need to be kept and maintained when the client would like to use
* e.g. RaBitQ as RaBitQ needs a stable somewhat correct centroid in order to function properly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ public InliningStorageAdapter(@Nonnull final Config config,

return AsyncUtil.collect(readTransaction.getRange(Range.startsWith(rangeKey),
ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL), readTransaction.getExecutor())
.thenApply(keyValues -> nodeFromRaw(storageTransform, layer, primaryKey, keyValues));
.thenApply(keyValues -> {
if (keyValues.isEmpty()) {
return null;
}
return nodeFromRaw(storageTransform, layer, primaryKey, keyValues);
});
}

/**
Expand Down
Loading
Loading