From 9d450716939344c5040af7b87dfd73682c804648 Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Sun, 21 Sep 2025 11:11:18 -0700 Subject: [PATCH 1/2] Pure refactoring --- .../util/hnsw/ConcurrentHnswMerger.java | 6 +- .../util/hnsw/IncrementalHnswGraphMerger.java | 91 +++++++++++++------ .../util/hnsw/MergingHnswGraphBuilder.java | 73 +-------------- .../lucene/util/hnsw/UpdateGraphsUtils.java | 65 +++++++++++++ 4 files changed, 135 insertions(+), 100 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java index f6b88424a76e..a99b0325ad7a 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java @@ -57,11 +57,11 @@ protected HnswBuilder createBuilder(KnnVectorValues mergedVectorValues, int maxO OnHeapHnswGraph graph; BitSet initializedNodes = null; - if (graphReaders.size() == 0) { + if (initGraphReaders.size() == 0) { graph = new OnHeapHnswGraph(M, maxOrd); } else { - graphReaders.sort(Comparator.comparingInt(GraphReader::graphSize).reversed()); - GraphReader initGraphReader = graphReaders.get(0); + initGraphReaders.sort(Comparator.comparingInt(GraphReader::graphSize).reversed()); + GraphReader initGraphReader = initGraphReaders.get(0); KnnVectorsReader initReader = initGraphReader.reader(); MergeState.DocMap initDocMap = initGraphReader.initDocMap(); int initGraphSize = initGraphReader.graphSize(); diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java index a29afeb615f2..ddfe52ee4aec 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java @@ -47,7 +47,7 @@ public class IncrementalHnswGraphMerger implements HnswGraphMerger { protected final int M; protected final int beamWidth; - protected List graphReaders = new ArrayList<>(); + protected List initGraphReaders = new ArrayList<>(); private int numReaders = 0; /** Represents a vector reader that contains graph info. */ @@ -98,7 +98,7 @@ public IncrementalHnswGraphMerger addReader( candidateVectorCount = vectorValues.size(); } } - graphReaders.add(new GraphReader(reader, docMap, candidateVectorCount)); + initGraphReaders.add(new GraphReader(reader, docMap, candidateVectorCount)); return this; } @@ -112,57 +112,49 @@ public IncrementalHnswGraphMerger addReader( */ protected HnswBuilder createBuilder(KnnVectorValues mergedVectorValues, int maxOrd) throws IOException { - if (graphReaders.size() == 0) { + GraphMergeContext mergeContext = prepareGraphMerge(mergedVectorValues, maxOrd); + if (mergeContext.initGraphs == null || mergeContext.initGraphs.length == 0) { return HnswGraphBuilder.create( - scorerSupplier, M, beamWidth, HnswGraphBuilder.randSeed, maxOrd); - } - graphReaders.sort(Comparator.comparingInt(GraphReader::graphSize).reversed()); - - final BitSet initializedNodes = - graphReaders.size() == numReaders ? null : new FixedBitSet(maxOrd); - int[][] ordMaps = getNewOrdMapping(mergedVectorValues, initializedNodes); - HnswGraph[] graphs = new HnswGraph[graphReaders.size()]; - for (int i = 0; i < graphReaders.size(); i++) { - HnswGraph graph = ((HnswGraphProvider) graphReaders.get(i).reader).getGraph(fieldInfo.name); - if (graph.size() == 0) { - throw new IllegalStateException("Graph should not be empty"); - } - graphs[i] = graph; + scorerSupplier, M, beamWidth, HnswGraphBuilder.randSeed, mergeContext.maxOrd); } return MergingHnswGraphBuilder.fromGraphs( scorerSupplier, beamWidth, HnswGraphBuilder.randSeed, - graphs, - ordMaps, - maxOrd, - initializedNodes); + mergeContext.initGraphs, + mergeContext.oldToNewOrdinalMaps, + mergeContext.maxOrd, + mergeContext.initializedNodes); } - protected final int[][] getNewOrdMapping( + /** + * Get old -> new ordinal mapping for all graphs that are participated in initialization. + * A.k.a for all graphs has been added to {@link #initGraphReaders} + */ + protected final int[][] getOldToNewOrdMapping( KnnVectorValues mergedVectorValues, BitSet initializedNodes) throws IOException { - final int numGraphs = graphReaders.size(); + final int numGraphs = initGraphReaders.size(); IntIntHashMap[] newDocIdToOldOrdinals = new IntIntHashMap[numGraphs]; final int[][] oldToNewOrdinalMap = new int[numGraphs][]; for (int i = 0; i < numGraphs; i++) { KnnVectorValues.DocIndexIterator vectorsIter = null; switch (fieldInfo.getVectorEncoding()) { case BYTE -> - vectorsIter = graphReaders.get(i).reader.getByteVectorValues(fieldInfo.name).iterator(); + vectorsIter = initGraphReaders.get(i).reader.getByteVectorValues(fieldInfo.name).iterator(); case FLOAT32 -> vectorsIter = - graphReaders.get(i).reader.getFloatVectorValues(fieldInfo.name).iterator(); + initGraphReaders.get(i).reader.getFloatVectorValues(fieldInfo.name).iterator(); } - newDocIdToOldOrdinals[i] = new IntIntHashMap(graphReaders.get(i).graphSize); - MergeState.DocMap docMap = graphReaders.get(i).initDocMap(); + newDocIdToOldOrdinals[i] = new IntIntHashMap(initGraphReaders.get(i).graphSize); + MergeState.DocMap docMap = initGraphReaders.get(i).initDocMap(); for (int docId = vectorsIter.nextDoc(); docId != NO_MORE_DOCS; docId = vectorsIter.nextDoc()) { int newDocId = docMap.get(docId); newDocIdToOldOrdinals[i].put(newDocId, vectorsIter.index()); } - oldToNewOrdinalMap[i] = new int[graphReaders.get(i).graphSize]; + oldToNewOrdinalMap[i] = new int[initGraphReaders.get(i).graphSize]; } KnnVectorValues.DocIndexIterator mergedVectorIterator = mergedVectorValues.iterator(); @@ -184,6 +176,31 @@ protected final int[][] getNewOrdMapping( return oldToNewOrdinalMap; } + /** + * Prepare the context for merging graphs. + * It sorts on {@link #initGraphReaders} by reverse size such that we will use the first one as the base graph, then + * prepare everything we need into {@link GraphMergeContext} + */ + protected GraphMergeContext prepareGraphMerge(KnnVectorValues mergedVectorValues, int maxOrd) throws IOException { + if (initGraphReaders.size() == 0) { + return new GraphMergeContext(null, null, maxOrd, null); + } + initGraphReaders.sort(Comparator.comparingInt(GraphReader::graphSize).reversed()); + + final BitSet initializedNodes = + initGraphReaders.size() == numReaders ? null : new FixedBitSet(maxOrd); + int[][] ordMaps = getOldToNewOrdMapping(mergedVectorValues, initializedNodes); + HnswGraph[] graphs = new HnswGraph[initGraphReaders.size()]; + for (int i = 0; i < initGraphReaders.size(); i++) { + HnswGraph graph = ((HnswGraphProvider) initGraphReaders.get(i).reader).getGraph(fieldInfo.name); + if (graph.size() == 0) { + throw new IllegalStateException("Graph should not be empty"); + } + graphs[i] = graph; + } + return new GraphMergeContext(graphs, ordMaps, maxOrd, initializedNodes); + } + @Override public OnHeapHnswGraph merge( KnnVectorValues mergedVectorValues, InfoStream infoStream, int maxOrd) throws IOException { @@ -204,4 +221,22 @@ private static boolean hasDeletes(Bits liveDocs) { } return false; } + + /** + * A helper class to hold the context of merging graphs. + * @param initGraphs graphs that will be participated in initialization. For now, it's all graphs that does not have any deletion. + * If there are no such graphs, it will be null. + * @param oldToNewOrdinalMaps for each graph in {@code initGraphs}, it's the old to new ordinal mapping. + * @param maxOrd max ordinal of the new (to be created) graph + * @param initializedNodes all new ordinals that are included in the {@code initGraphs}, they might have already been initialized, + * as part of the very first graph, or will be initialized in a later process, e.g. see + * {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)} + * Note: in case of {@code initGraphs} is non-null but this field is null, it means all ordinals are/will be initialized. + */ + protected record GraphMergeContext(HnswGraph[] initGraphs, int[][] oldToNewOrdinalMaps, int maxOrd, BitSet initializedNodes) { + + public boolean allInitialized() { + return initGraphs != null && initializedNodes == null; + } + } } diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java index 08366927b247..05d3a297bfea 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java @@ -17,40 +17,12 @@ package org.apache.lucene.util.hnsw; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - import java.io.IOException; -import org.apache.lucene.internal.hppc.IntCursor; -import org.apache.lucene.internal.hppc.IntHashSet; import org.apache.lucene.util.BitSet; /** - * A graph builder that is used during segments' merging. - * - *

This builder uses a smart algorithm to merge multiple graphs into a single graph. The - * algorithm is based on the idea that if we know where we want to insert a node, we have a good - * idea of where we want to insert its neighbors. - * - *

The algorithm is based on the following steps: - * - *

    - *
  • Get all graphs that don't have deletions and sort them by size desc. - *
  • Copy the largest graph to the new graph (gL). - *
  • For each remaining small graph (gS): - *
      - *
    • Find the nodes that best cover gS: join set `j`. These nodes will be inserted into gL - * as usual: by searching gL to find the best candidates `w` to which connect the nodes. - *
    • For each remaining node in gS: - *
        - *
      • We provide eps to search in gL. We form `eps` by the union of the node's - * neighbors in gS and the node's neighbors' neighbors in gL. We also limit - * beamWidth (efConstruction to M*3) - *
      - *
    - *
- * - *

We expect the size of join set `j` to be small, around 1/5 to 1/2 of the size of gS. For the - * rest of the nodes in gS, we expect savings by performing lighter searches in gL. + * A graph builder that is used during segments' merging. It enables efficient merging of graphs + * using {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)}.} method. * * @lucene.experimental */ @@ -85,7 +57,7 @@ private MergingHnswGraphBuilder( * @param totalNumberOfVectors the total number of vectors in the new graph, this should include * all vectors expected to be added to the graph in the future * @param initializedNodes the nodes will be initialized through the merging, if null, all nodes - * should be already initialized after {@link #updateGraph(HnswGraph, int[])} being called + * should be already initialized after {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)} being called * @return a new HnswGraphBuilder that is initialized with the provided HnswGraph * @throws IOException when reading the graph fails */ @@ -124,7 +96,7 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { + graphSizes); } for (int i = 1; i < graphs.length; i++) { - updateGraph(graphs[i], ordMaps[i]); + UpdateGraphsUtils.joinSetGraphMerge(graphs[i], hnsw, ordMaps[i], this); } // TODO: optimize to iterate only over unset bits in initializedNodes @@ -139,41 +111,4 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { return getCompletedGraph(); } - /** Merge the smaller graph into the current larger graph. */ - private void updateGraph(HnswGraph gS, int[] ordMapS) throws IOException { - int size = gS.size(); - IntHashSet j = UpdateGraphsUtils.computeJoinSet(gS); - - // for nodes that in the join set, add them directly to the graph - for (IntCursor node : j) { - addGraphNode(ordMapS[node.value]); - } - - // for each node outside of j set: - // form the entry points set for the node - // by joining the node's neighbours in gS with - // the node's neighbours' neighbours in gL - for (int u = 0; u < size; u++) { - if (j.contains(u)) { - continue; - } - IntHashSet eps = new IntHashSet(); - gS.seek(0, u); - for (int v = gS.nextNeighbor(); v != NO_MORE_DOCS; v = gS.nextNeighbor()) { - // if u's neighbour v is in the join set, or already added to gL (v < u), - // then we add v's neighbours from gL to the candidate list - if (v < u || j.contains(v)) { - int newv = ordMapS[v]; - eps.add(newv); - - hnsw.seek(0, newv); - int friendOrd; - while ((friendOrd = hnsw.nextNeighbor()) != NO_MORE_DOCS) { - eps.add(friendOrd); - } - } - } - addGraphNode(ordMapS[u], eps); - } - } } diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java index 667ba37b7e34..46fc5c2ebba6 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java @@ -20,6 +20,8 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; import java.io.IOException; + +import org.apache.lucene.internal.hppc.IntCursor; import org.apache.lucene.internal.hppc.IntHashSet; import org.apache.lucene.util.LongHeap; @@ -112,4 +114,67 @@ private static int decodeValue1(long encoded) { private static int decodeValue2(long encoded) { return (int) (encoded & 0xFFFFFFFFL); } + + /** + *

This method uses a smart algorithm to merge multiple graphs into a single graph. The + * algorithm is based on the idea that if we know where we want to insert a node, we have a good + * idea of where we want to insert its neighbors. + * + *

The algorithm is based on the following steps: + * + *

    + *
  • Get all graphs that don't have deletions and sort them by size desc. + *
  • Copy the largest graph to the new graph (gL). + *
  • For each remaining small graph (gS): + *
      + *
    • Find the nodes that best cover gS: join set `j`. These nodes will be inserted into gL + * as usual: by searching gL to find the best candidates `w` to which connect the nodes. + *
    • For each remaining node in gS: + *
        + *
      • We provide eps to search in gL. We form `eps` by the union of the node's + * neighbors in gS and the node's neighbors' neighbors in gL. We also limit + * beamWidth (efConstruction to M*3) + *
      + *
    + *
+ * + *

We expect the size of join set `j` to be small, around 1/5 to 1/2 of the size of gS. For the + * rest of the nodes in gS, we expect savings by performing lighter searches in gL. + */ + public static void joinSetGraphMerge(HnswGraph sourceGraph, HnswGraph destGraph, int[] oldToNewOrd, HnswBuilder graphBuilder) throws IOException { + int size = sourceGraph.size(); + IntHashSet j = computeJoinSet(sourceGraph); + + // for nodes that in the join set, add them directly to the graph + for (IntCursor node : j) { + graphBuilder.addGraphNode(oldToNewOrd[node.value]); + } + + // for each node outside of j set: + // form the entry points set for the node + // by joining the node's neighbours in gS with + // the node's neighbours' neighbours in gL + for (int u = 0; u < size; u++) { + if (j.contains(u)) { + continue; + } + IntHashSet eps = new IntHashSet(); + sourceGraph.seek(0, u); + for (int v = sourceGraph.nextNeighbor(); v != NO_MORE_DOCS; v = sourceGraph.nextNeighbor()) { + // if u's neighbour v is in the join set, or already added to gL (v < u), + // then we add v's neighbours from gL to the candidate list + if (v < u || j.contains(v)) { + int newv = oldToNewOrd[v]; + eps.add(newv); + + destGraph.seek(0, newv); + int friendOrd; + while ((friendOrd = destGraph.nextNeighbor()) != NO_MORE_DOCS) { + eps.add(friendOrd); + } + } + } + graphBuilder.addGraphNode(oldToNewOrd[u], eps); + } + } } From c33dabd2f1cef546cce0640f8c5c81308559f7a9 Mon Sep 17 00:00:00 2001 From: Patrick Zhai Date: Sun, 21 Sep 2025 16:32:15 -0700 Subject: [PATCH 2/2] Finish concurrent change --- .../util/hnsw/ConcurrentHnswMerger.java | 99 ++----------------- .../lucene/util/hnsw/GraphMergeContext.java | 22 +++++ .../util/hnsw/HnswConcurrentMergeBuilder.java | 49 +++++++-- .../lucene/util/hnsw/HnswGraphBuilder.java | 5 + .../util/hnsw/IncrementalHnswGraphMerger.java | 29 ++---- .../util/hnsw/MergingHnswGraphBuilder.java | 8 +- .../lucene/util/hnsw/UpdateGraphsUtils.java | 11 ++- .../lucene/util/hnsw/HnswGraphTestCase.java | 4 +- 8 files changed, 96 insertions(+), 131 deletions(-) create mode 100644 lucene/core/src/java/org/apache/lucene/util/hnsw/GraphMergeContext.java diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java index a99b0325ad7a..6e95da1549d5 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswMerger.java @@ -16,19 +16,10 @@ */ package org.apache.lucene.util.hnsw; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - import java.io.IOException; -import java.util.Comparator; -import org.apache.lucene.codecs.KnnVectorsReader; -import org.apache.lucene.codecs.hnsw.HnswGraphProvider; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.KnnVectorValues; -import org.apache.lucene.index.MergeState; -import org.apache.lucene.internal.hppc.IntIntHashMap; import org.apache.lucene.search.TaskExecutor; -import org.apache.lucene.util.BitSet; -import org.apache.lucene.util.FixedBitSet; /** This merger merges graph in a concurrent manner, by using {@link HnswConcurrentMergeBuilder} */ public class ConcurrentHnswMerger extends IncrementalHnswGraphMerger { @@ -54,89 +45,13 @@ public ConcurrentHnswMerger( @Override protected HnswBuilder createBuilder(KnnVectorValues mergedVectorValues, int maxOrd) throws IOException { - OnHeapHnswGraph graph; - BitSet initializedNodes = null; - - if (initGraphReaders.size() == 0) { - graph = new OnHeapHnswGraph(M, maxOrd); - } else { - initGraphReaders.sort(Comparator.comparingInt(GraphReader::graphSize).reversed()); - GraphReader initGraphReader = initGraphReaders.get(0); - KnnVectorsReader initReader = initGraphReader.reader(); - MergeState.DocMap initDocMap = initGraphReader.initDocMap(); - int initGraphSize = initGraphReader.graphSize(); - HnswGraph initializerGraph = ((HnswGraphProvider) initReader).getGraph(fieldInfo.name); - - if (initializerGraph.size() == 0) { - graph = new OnHeapHnswGraph(M, maxOrd); - } else { - initializedNodes = new FixedBitSet(maxOrd); - int[] oldToNewOrdinalMap = - getNewOrdMapping( - fieldInfo, - initReader, - initDocMap, - initGraphSize, - mergedVectorValues, - initializedNodes); - graph = InitializedHnswGraphBuilder.initGraph(initializerGraph, oldToNewOrdinalMap, maxOrd); - } - } + GraphMergeContext mergeContext = prepareGraphMerge(mergedVectorValues, maxOrd); return new HnswConcurrentMergeBuilder( - taskExecutor, numWorker, scorerSupplier, beamWidth, graph, initializedNodes); - } - - /** - * Creates a new mapping from old ordinals to new ordinals and returns the total number of vectors - * in the newly merged segment. - * - * @param mergedVectorValues vector values in the merged segment - * @param initializedNodes track what nodes have been initialized - * @return the mapping from old ordinals to new ordinals - * @throws IOException If an error occurs while reading from the merge state - */ - private static int[] getNewOrdMapping( - FieldInfo fieldInfo, - KnnVectorsReader initReader, - MergeState.DocMap initDocMap, - int initGraphSize, - KnnVectorValues mergedVectorValues, - BitSet initializedNodes) - throws IOException { - KnnVectorValues.DocIndexIterator initializerIterator = null; - - switch (fieldInfo.getVectorEncoding()) { - case BYTE -> initializerIterator = initReader.getByteVectorValues(fieldInfo.name).iterator(); - case FLOAT32 -> - initializerIterator = initReader.getFloatVectorValues(fieldInfo.name).iterator(); - } - - IntIntHashMap newIdToOldOrdinal = new IntIntHashMap(initGraphSize); - int maxNewDocID = -1; - for (int docId = initializerIterator.nextDoc(); - docId != NO_MORE_DOCS; - docId = initializerIterator.nextDoc()) { - int newId = initDocMap.get(docId); - maxNewDocID = Math.max(newId, maxNewDocID); - assert newIdToOldOrdinal.containsKey(newId) == false; - newIdToOldOrdinal.put(newId, initializerIterator.index()); - } - - if (maxNewDocID == -1) { - return new int[0]; - } - final int[] oldToNewOrdinalMap = new int[initGraphSize]; - KnnVectorValues.DocIndexIterator mergedVectorIterator = mergedVectorValues.iterator(); - for (int newDocId = mergedVectorIterator.nextDoc(); - newDocId <= maxNewDocID; - newDocId = mergedVectorIterator.nextDoc()) { - int oldOrd = newIdToOldOrdinal.getOrDefault(newDocId, -1); - if (oldOrd != -1) { - int newOrd = mergedVectorIterator.index(); - initializedNodes.set(newOrd); - oldToNewOrdinalMap[oldOrd] = newOrd; - } - } - return oldToNewOrdinalMap; + taskExecutor, + numWorker, + scorerSupplier, + beamWidth, + M, + mergeContext); } } diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/GraphMergeContext.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/GraphMergeContext.java new file mode 100644 index 000000000000..6f80eb289fc8 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/GraphMergeContext.java @@ -0,0 +1,22 @@ +package org.apache.lucene.util.hnsw; + +import org.apache.lucene.util.BitSet; + +/** + * A helper class to hold the context of merging graphs. + * + * @param initGraphs graphs that will be participated in initialization. For now, it's all graphs that does not have + * any deletion. If there are no such graphs, it will be null. + * @param oldToNewOrdinalMaps for each graph in {@code initGraphs}, it's the old to new ordinal mapping. + * @param maxOrd max ordinal of the new (to be created) graph + * @param initializedNodes all new ordinals that are included in the {@code initGraphs}, they might have already + * been initialized, as part of the very first graph, or will be initialized in a later process, e.g. see + * {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)} Note: in case of + * {@code initGraphs} is non-null but this field is null, it means all ordinals are/will be initialized. + */ +record GraphMergeContext(HnswGraph[] initGraphs, int[][] oldToNewOrdinalMaps, int maxOrd, BitSet initializedNodes) { + + public boolean allInitialized() { + return initGraphs != null && initializedNodes == null; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java index 9fbf69558e5a..bc1194677366 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswConcurrentMergeBuilder.java @@ -51,11 +51,13 @@ public HnswConcurrentMergeBuilder( int numWorker, RandomVectorScorerSupplier scorerSupplier, int beamWidth, - OnHeapHnswGraph hnsw, - BitSet initializedNodes) + int M, + GraphMergeContext graphMergeContext) throws IOException { + OnHeapHnswGraph hnsw = initGraph(M, graphMergeContext); this.taskExecutor = taskExecutor; AtomicInteger workProgress = new AtomicInteger(0); + AtomicInteger nextGraphToMerge = new AtomicInteger(1); workers = new ConcurrentMergeWorker[numWorker]; hnswLock = new HnswLock(); for (int i = 0; i < numWorker; i++) { @@ -66,11 +68,20 @@ public HnswConcurrentMergeBuilder( HnswGraphBuilder.randSeed, hnsw, hnswLock, - initializedNodes, - workProgress); + workProgress, + nextGraphToMerge, + graphMergeContext + ); } } + private static OnHeapHnswGraph initGraph(int M, GraphMergeContext graphMergeContext) throws IOException { + if (graphMergeContext.initGraphs() == null || graphMergeContext.initGraphs().length == 0) { + return new OnHeapHnswGraph(M, graphMergeContext.maxOrd()); + } + return InitializedHnswGraphBuilder.initGraph(graphMergeContext.initGraphs()[0], graphMergeContext.oldToNewOrdinalMaps()[0], graphMergeContext.maxOrd()); + } + @Override public OnHeapHnswGraph build(int maxOrd) throws IOException { if (frozen) { @@ -146,6 +157,14 @@ private static final class ConcurrentMergeWorker extends HnswGraphBuilder { */ private final AtomicInteger workProgress; + /** + * A common AtomicInteger shared among all workers, tracking which graph to merge next, if initGraphs is null, then + * this field will be ignored + */ + private final AtomicInteger nextGraphToMerge; + + private final GraphMergeContext graphMergeContext; + private final BitSet initializedNodes; private int batchSize = DEFAULT_BATCH_SIZE; @@ -155,8 +174,10 @@ private ConcurrentMergeWorker( long seed, OnHeapHnswGraph hnsw, HnswLock hnswLock, - BitSet initializedNodes, - AtomicInteger workProgress) + AtomicInteger workProgress, + AtomicInteger nextGraphToMerge, + GraphMergeContext graphMergeContext + ) throws IOException { super( scorerSupplier, @@ -167,7 +188,9 @@ private ConcurrentMergeWorker( new MergeSearcher( new NeighborQueue(beamWidth, true), hnswLock, new FixedBitSet(hnsw.maxNodeId() + 1))); this.workProgress = workProgress; - this.initializedNodes = initializedNodes; + this.nextGraphToMerge = nextGraphToMerge; + this.graphMergeContext = graphMergeContext; + this.initializedNodes = graphMergeContext.initializedNodes(); } /** @@ -177,6 +200,18 @@ private ConcurrentMergeWorker( * finishing around the same time. */ private void run(int maxOrd) throws IOException { + while (graphMergeContext.initGraphs() != null && nextGraphToMerge.get() < graphMergeContext.initGraphs().length) { + int graphToWork = nextGraphToMerge.getAndIncrement(); + if (graphToWork >= graphMergeContext.initGraphs().length) { + break; + } + UpdateGraphsUtils.joinSetGraphMerge( + graphMergeContext.initGraphs()[graphToWork], hnsw, getGraphSearcher(), graphMergeContext.oldToNewOrdinalMaps()[graphToWork], this); + } + if (graphMergeContext.allInitialized()) { + // all the work has been done above since all the graphs are in the initGraphs set + return; + } int start = getStartPos(maxOrd); int end; while (start != -1) { diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java index cd54443ab760..fc664a41f645 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/HnswGraphBuilder.java @@ -191,6 +191,10 @@ public OnHeapHnswGraph getGraph() { return hnsw; } + public HnswGraphSearcher getGraphSearcher() { + return graphSearcher; + } + /** add vectors in range [minOrd, maxOrd) */ protected void addVectors(int minOrd, int maxOrd) throws IOException { if (frozen) { @@ -646,5 +650,6 @@ public TopDocs topDocs() { public KnnSearchStrategy getSearchStrategy() { return null; } + } } diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java index ddfe52ee4aec..e820f202d5e0 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/IncrementalHnswGraphMerger.java @@ -113,19 +113,19 @@ public IncrementalHnswGraphMerger addReader( protected HnswBuilder createBuilder(KnnVectorValues mergedVectorValues, int maxOrd) throws IOException { GraphMergeContext mergeContext = prepareGraphMerge(mergedVectorValues, maxOrd); - if (mergeContext.initGraphs == null || mergeContext.initGraphs.length == 0) { + if (mergeContext.initGraphs() == null || mergeContext.initGraphs().length == 0) { return HnswGraphBuilder.create( - scorerSupplier, M, beamWidth, HnswGraphBuilder.randSeed, mergeContext.maxOrd); + scorerSupplier, M, beamWidth, HnswGraphBuilder.randSeed, mergeContext.maxOrd()); } return MergingHnswGraphBuilder.fromGraphs( scorerSupplier, beamWidth, HnswGraphBuilder.randSeed, - mergeContext.initGraphs, - mergeContext.oldToNewOrdinalMaps, - mergeContext.maxOrd, - mergeContext.initializedNodes); + mergeContext.initGraphs(), + mergeContext.oldToNewOrdinalMaps(), + mergeContext.maxOrd(), + mergeContext.initializedNodes()); } /** @@ -222,21 +222,4 @@ private static boolean hasDeletes(Bits liveDocs) { return false; } - /** - * A helper class to hold the context of merging graphs. - * @param initGraphs graphs that will be participated in initialization. For now, it's all graphs that does not have any deletion. - * If there are no such graphs, it will be null. - * @param oldToNewOrdinalMaps for each graph in {@code initGraphs}, it's the old to new ordinal mapping. - * @param maxOrd max ordinal of the new (to be created) graph - * @param initializedNodes all new ordinals that are included in the {@code initGraphs}, they might have already been initialized, - * as part of the very first graph, or will be initialized in a later process, e.g. see - * {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)} - * Note: in case of {@code initGraphs} is non-null but this field is null, it means all ordinals are/will be initialized. - */ - protected record GraphMergeContext(HnswGraph[] initGraphs, int[][] oldToNewOrdinalMaps, int maxOrd, BitSet initializedNodes) { - - public boolean allInitialized() { - return initGraphs != null && initializedNodes == null; - } - } } diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java index 05d3a297bfea..68e33c20b197 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/MergingHnswGraphBuilder.java @@ -22,7 +22,7 @@ /** * A graph builder that is used during segments' merging. It enables efficient merging of graphs - * using {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)}.} method. + * using {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, HnswGraphSearcher, int[], HnswBuilder)}.} method. * * @lucene.experimental */ @@ -57,7 +57,7 @@ private MergingHnswGraphBuilder( * @param totalNumberOfVectors the total number of vectors in the new graph, this should include * all vectors expected to be added to the graph in the future * @param initializedNodes the nodes will be initialized through the merging, if null, all nodes - * should be already initialized after {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, int[], HnswBuilder)} being called + * should be already initialized after {@link UpdateGraphsUtils#joinSetGraphMerge(HnswGraph, HnswGraph, HnswGraphSearcher, int[], HnswBuilder)} being called * @return a new HnswGraphBuilder that is initialized with the provided HnswGraph * @throws IOException when reading the graph fails */ @@ -95,8 +95,10 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException { + " vectors, graph sizes:" + graphSizes); } + // this searcher is created just to satisfy the call signature, shouldn't be used for any actual search functionality + HnswGraphSearcher graphSearcher = new HnswGraphSearcher(null, null); for (int i = 1; i < graphs.length; i++) { - UpdateGraphsUtils.joinSetGraphMerge(graphs[i], hnsw, ordMaps[i], this); + UpdateGraphsUtils.joinSetGraphMerge(graphs[i], hnsw, graphSearcher, ordMaps[i], this); } // TODO: optimize to iterate only over unset bits in initializedNodes diff --git a/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java b/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java index 46fc5c2ebba6..0a532fc21369 100644 --- a/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java +++ b/lucene/core/src/java/org/apache/lucene/util/hnsw/UpdateGraphsUtils.java @@ -140,8 +140,12 @@ private static int decodeValue2(long encoded) { * *

We expect the size of join set `j` to be small, around 1/5 to 1/2 of the size of gS. For the * rest of the nodes in gS, we expect savings by performing lighter searches in gL. + * + * Thread-safety: the thread safety is provided by the {@code graphSearcher} and {@code graphBuilder}, if both + * are thread-safe. Then the whole operation is thread-safe, providing no other thread is working on the same + * segment */ - public static void joinSetGraphMerge(HnswGraph sourceGraph, HnswGraph destGraph, int[] oldToNewOrd, HnswBuilder graphBuilder) throws IOException { + public static void joinSetGraphMerge(HnswGraph sourceGraph, HnswGraph destGraph, HnswGraphSearcher graphSearcher ,int[] oldToNewOrd, HnswBuilder graphBuilder) throws IOException { int size = sourceGraph.size(); IntHashSet j = computeJoinSet(sourceGraph); @@ -166,10 +170,9 @@ public static void joinSetGraphMerge(HnswGraph sourceGraph, HnswGraph destGraph, if (v < u || j.contains(v)) { int newv = oldToNewOrd[v]; eps.add(newv); - - destGraph.seek(0, newv); + graphSearcher.graphSeek(destGraph, 0, newv); int friendOrd; - while ((friendOrd = destGraph.nextNeighbor()) != NO_MORE_DOCS) { + while ((friendOrd = graphSearcher.graphNextNeighbor(destGraph)) != NO_MORE_DOCS) { eps.add(friendOrd); } } diff --git a/lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java b/lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java index be099398ba6a..317895ee7b16 100644 --- a/lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java +++ b/lucene/core/src/test/org/apache/lucene/util/hnsw/HnswGraphTestCase.java @@ -1042,7 +1042,7 @@ public void testOnHeapHnswGraphSearch() } /* - * A very basic test ensure the concurrent merge does not throw exceptions, it by no means guarantees the + * A very basic test ensuring the concurrent merge does not throw exceptions, it by no means guarantees the * true correctness of the concurrent merge and that must be checked manually by running a KNN benchmark * and comparing the recall */ @@ -1056,7 +1056,7 @@ public void testConcurrentMergeBuilder() throws IOException { HnswGraphBuilder.randSeed = random().nextLong(); HnswConcurrentMergeBuilder builder = new HnswConcurrentMergeBuilder( - taskExecutor, 4, scorerSupplier, 30, new OnHeapHnswGraph(10, size), null); + taskExecutor, 4, scorerSupplier, 30, 10, new GraphMergeContext(null, null, size, null)); builder.setBatchSize(100); builder.build(size); exec.shutdownNow();