@@ -581,7 +581,7 @@ private Quantizer quantizer(@Nullable final AccessInfo accessInfo) {
581581 return onReadListener .onAsyncRead (
582582 storageAdapter .fetchNode (readTransaction , storageTransform , layer ,
583583 nodeReference .getPrimaryKey ()))
584- .thenApply (node -> biMapFunction .apply (nodeReference , node ));
584+ .thenApply (node -> biMapFunction .apply (nodeReference , Objects . requireNonNull ( node ) ));
585585 }
586586
587587 /**
@@ -754,13 +754,28 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
754754 }
755755
756756 return StorageAdapter .fetchAccessInfo (getConfig (), transaction , getSubspace (), getOnReadListener ())
757- .thenCompose (accessInfo -> {
758- final AccessInfo currentAccessInfo ;
757+ .thenCombine (exists (transaction , newPrimaryKey ),
758+ (accessInfo , nodeAlreadyExists ) -> {
759+ if (nodeAlreadyExists ) {
760+ if (logger .isDebugEnabled ()) {
761+ logger .debug ("new record already exists in HNSW with key={} on layer={}" ,
762+ newPrimaryKey , insertionLayer );
763+ }
764+ }
765+ return new AccessInfoAndNodeExistence (accessInfo , nodeAlreadyExists );
766+ })
767+ .thenCompose (accessInfoAndNodeExistence -> {
768+ if (accessInfoAndNodeExistence .isNodeExists ()) {
769+ return AsyncUtil .DONE ;
770+ }
771+
772+ final AccessInfo accessInfo = accessInfoAndNodeExistence .getAccessInfo ();
759773 final AffineOperator storageTransform = storageTransform (accessInfo );
760774 final Transformed <RealVector > transformedNewVector = storageTransform .transform (newVector );
761775 final Quantizer quantizer = quantizer (accessInfo );
762776 final Estimator estimator = quantizer .estimator ();
763777
778+ final AccessInfo currentAccessInfo ;
764779 if (accessInfo == null ) {
765780 // this is the first node
766781 writeLonelyNodes (quantizer , transaction , newPrimaryKey , transformedNewVector ,
@@ -821,6 +836,15 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
821836 }).thenCompose (ignored -> AsyncUtil .DONE );
822837 }
823838
839+ @ Nonnull
840+ @ VisibleForTesting
841+ CompletableFuture <Boolean > exists (@ Nonnull final ReadTransaction readTransaction ,
842+ @ Nonnull final Tuple primaryKey ) {
843+ final StorageAdapter <? extends NodeReference > storageAdapter = getStorageAdapterForLayer (0 );
844+ return storageAdapter .fetchNode (readTransaction , AffineOperator .identity (), 0 , primaryKey )
845+ .thenApply (Objects ::nonNull );
846+ }
847+
824848 /**
825849 * Method to keep stats if necessary. Stats need to be kept and maintained when the client would like to use
826850 * e.g. RaBitQ as RaBitQ needs a stable somewhat correct centroid in order to function properly.
@@ -1546,4 +1570,24 @@ private static <T> List<T> drain(@Nonnull Queue<T> queue) {
15461570 }
15471571 return resultBuilder .build ();
15481572 }
1573+
1574+ private static class AccessInfoAndNodeExistence {
1575+ @ Nullable
1576+ private final AccessInfo accessInfo ;
1577+ private final boolean nodeExists ;
1578+
1579+ public AccessInfoAndNodeExistence (@ Nullable final AccessInfo accessInfo , final boolean nodeExists ) {
1580+ this .accessInfo = accessInfo ;
1581+ this .nodeExists = nodeExists ;
1582+ }
1583+
1584+ @ Nullable
1585+ public AccessInfo getAccessInfo () {
1586+ return accessInfo ;
1587+ }
1588+
1589+ public boolean isNodeExists () {
1590+ return nodeExists ;
1591+ }
1592+ }
15491593}
0 commit comments