From 8ef9ec82078105383d6334bb7041cc59e16103b8 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Mon, 31 Mar 2025 10:30:20 +0200 Subject: [PATCH 01/17] Add and populate the change-log table. --- .../migrations/00000024-add-change-log.plsql | 42 +++++++ .../component/PostgreSQLComponent.groovy | 106 +++++++++++------- 2 files changed, 107 insertions(+), 41 deletions(-) create mode 100644 librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql diff --git a/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql b/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql new file mode 100644 index 0000000000..57dc0939c5 --- /dev/null +++ b/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql @@ -0,0 +1,42 @@ +BEGIN; + +DO $$DECLARE + -- THESE MUST BE CHANGED WHEN YOU COPY THE SCRIPT! + + -- The version you expect the database to have _before_ the migration + old_version numeric := 23; + -- The version the database should have _after_ the migration + new_version numeric := 24; + + -- hands off + existing_version numeric; + +BEGIN + + -- Check existing version + SELECT version from lddb__schema INTO existing_version; + IF ( existing_version <> old_version) THEN + RAISE EXCEPTION 'ASKED TO MIGRATE FROM INCORRECT EXISTING VERSION!'; + ROLLBACK; + END IF; + UPDATE lddb__schema SET version = new_version; + + -- ACTUAL SCHEMA CHANGES HERE: + + CREATE TABLE IF NOT EXISTS lddb__change_log ( + changenumber BIGSERIAL PRIMARY KEY, + id text NOT NULL, + loud BOOLEAN NOT NULL, + time timestamp with time zone DEFAULT now() NOT NULL, + resulting_record_version INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_lddb__change_log_time ON lddb__change_log (time); + +END$$; + +COMMIT; + + + + diff --git a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy index fad8bdaac6..6c9171ca40 100644 --- a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy +++ b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy @@ -472,7 +472,6 @@ class PostgreSQLComponent { private HikariDataSource connectionPool private HikariDataSource outerConnectionPool - boolean versioning = true boolean doVerifyDocumentIdRetention = true boolean sparqlQueueEnabled = false @@ -726,6 +725,37 @@ class PostgreSQLComponent { } } + void logUpdate(String id, boolean loud, Date time, Connection connection) { + + // Which version did we just create? + // This rests on the assumption that lddb-row being updated is locked for the transaction, so that a new + // version could not slip in until this is done. + + int resultingVersion = 0 + PreparedStatement countStatement = connection.prepareStatement("SELECT COUNT(id) FROM lddb__versions WHERE id = ?") + ResultSet resultSet = null + try { + countStatement.setString(1, id) + resultSet = countStatement.executeQuery() + if (resultSet.next()) { + resultingVersion = resultSet.getInt(1) - 1 + } + } finally { + close(resultSet, countStatement) + } + + PreparedStatement logStatement = connection.prepareStatement("INSERT INTO lddb__change_log (id, loud, time, resulting_record_version) VALUES (?,?,?,?)") + try { + logStatement.setString(1, id) + logStatement.setBoolean(2, loud) + logStatement.setTimestamp(3, new Timestamp(time.getTime())) + logStatement.setInt(4, resultingVersion) + logStatement.execute() + } finally { + close(logStatement) + } + } + boolean createDocument(Document doc, String changedIn, String changedBy, String collection, boolean deleted) { log.debug("Saving ${doc.getShortId()}, ${changedIn}, ${changedBy}, ${collection}") @@ -795,6 +825,7 @@ class PostgreSQLComponent { insert.executeUpdate() saveVersion(doc, connection, now, now, changedIn, changedBy, collection, deleted) + logUpdate(doc.getShortId(), true, now, connection) refreshDerivativeTables(doc, connection, deleted) connection.commit() @@ -1049,11 +1080,12 @@ class PostgreSQLComponent { if (doVerifyDocumentIdRetention) { verifyDocumentIdRetention(preUpdateDoc, doc, connection) } - + + Date actualUpdateTime = new Date() Date createdTime = new Date(resultSet.getTimestamp("created").getTime()) Date modTime = minorUpdate ? new Date(resultSet.getTimestamp("modified").getTime()) - : new Date() + : actualUpdateTime doc.setModified(modTime) if (!minorUpdate) { @@ -1065,6 +1097,7 @@ class PostgreSQLComponent { updateStatement.execute() saveVersion(doc, connection, createdTime, modTime, changedIn, changedBy, collection, deleted) + logUpdate(doc.getShortId(), !minorUpdate, actualUpdateTime, connection) // If the mainentity has changed URI (for example happens when new id.kb.se-uris are added to records) if ( preUpdateDoc.getThingIdentifiers()[0] && @@ -1624,24 +1657,21 @@ class PostgreSQLComponent { boolean saveVersion(Document doc, Connection connection, Date createdTime, Date modTime, String changedIn, String changedBy, String collection, boolean deleted) { - if (versioning) { - PreparedStatement insVersion = connection.prepareStatement(INSERT_DOCUMENT_VERSION) - try { - log.debug("Trying to save a version of ${doc.getShortId() ?: ""} with checksum ${doc.getChecksum(jsonld)}. Modified: $modTime") - insVersion = rigVersionStatement(insVersion, doc, createdTime, - modTime, changedIn, changedBy, - collection, deleted) - insVersion.executeUpdate() - return true - } catch (Exception e) { - log.error("Failed to save document version: ${e.message}") - throw e - } - finally { - close(insVersion) - } - } else { - return false + + PreparedStatement insVersion = connection.prepareStatement(INSERT_DOCUMENT_VERSION) + try { + log.debug("Trying to save a version of ${doc.getShortId() ?: ""} with checksum ${doc.getChecksum(jsonld)}. Modified: $modTime") + insVersion = rigVersionStatement(insVersion, doc, createdTime, + modTime, changedIn, changedBy, + collection, deleted) + insVersion.executeUpdate() + return true + } catch (Exception e) { + log.error("Failed to save document version: ${e.message}") + throw e + } + finally { + close(insVersion) } } @@ -1681,10 +1711,8 @@ class PostgreSQLComponent { doc.setCreated(now) doc.setModified(now) doc.setDeleted(false) - if (versioning) { - ver_batch = rigVersionStatement(ver_batch, doc, now, now, changedIn, changedBy, collection, false) - ver_batch.addBatch() - } + ver_batch = rigVersionStatement(ver_batch, doc, now, now, changedIn, changedBy, collection, false) + ver_batch.addBatch() batch = rigInsertStatement(batch, doc, now, changedIn, changedBy, collection, false) batch.addBatch() } @@ -1696,7 +1724,7 @@ class PostgreSQLComponent { } clearEmbellishedCache(connection) connection.commit() - log.debug("Stored ${docs.size()} documents in collection ${collection} (versioning: ${versioning})") + log.debug("Stored ${docs.size()} documents in collection ${collection}") return true } catch (Exception e) { log.error("Failed to save batch: ${e.message}. Rolling back..", e) @@ -2758,21 +2786,17 @@ class PostgreSQLComponent { } void remove(String identifier, String changedIn, String changedBy) { - if (versioning) { - log.debug("Marking document with ID ${identifier} as deleted.") - try { - storeUpdate(identifier, false, true, changedIn, changedBy, - { Document doc -> - doc.setDeleted(true) - // Add a tombstone marker (without removing anything) perhaps? - }) - } catch (Throwable e) { - log.warn("Could not mark document with ID ${identifier} as deleted: ${e}") - throw e - } - } else { - throw new WhelkException( - "Actually deleting data from lddb is currently not supported") + + log.debug("Marking document with ID ${identifier} as deleted.") + try { + storeUpdate(identifier, false, true, changedIn, changedBy, + { Document doc -> + doc.setDeleted(true) + // Add a tombstone marker (without removing anything) perhaps? + }) + } catch (Throwable e) { + log.warn("Could not mark document with ID ${identifier} as deleted: ${e}") + throw e } // Clear out dependencies From c9d1dde5414fcc61307e04715935a37905289a3a Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Mon, 31 Mar 2025 15:32:37 +0200 Subject: [PATCH 02/17] Untested progress --- .../src/main/groovy/whelk/Indexing.java | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 whelk-core/src/main/groovy/whelk/Indexing.java diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java new file mode 100644 index 0000000000..aec6540b9f --- /dev/null +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -0,0 +1,122 @@ +package whelk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import whelk.component.ElasticSearch; +import whelk.component.PostgreSQLComponent; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.Map; + +public class Indexing { + + // There is to be only one of these. Using more than one thread for indexing potentially breaks made assumptions + // with regard to indexing-order of records, _which matters_. + static Thread worker; + private final static String INDEXER_STATE_KEY = "ElasticIndexer"; + private final static Logger logger = LogManager.getLogger(Indexing.class); + + /** + * Index all changes since last invocation of this function + */ + private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) throws SQLException { + Map storedIndexerState = psql.getState(INDEXER_STATE_KEY); + if (storedIndexerState == null){ + resetStateToNow(psql); + storedIndexerState = psql.getState(INDEXER_STATE_KEY); + } + long lastIndexedChangeNumber = (Long) storedIndexerState.get("lastIndexed"); + + String sql = "SELECT * FROM lddb__change_log WHERE changenumber > ? ORDER BY changenumber ASC LIMIT 500"; + try (Connection connection = psql.getOuterConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + + connection.setAutoCommit(false); + statement.setFetchSize(500); + + statement.setLong(1, lastIndexedChangeNumber); + ResultSet resultSet = statement.executeQuery(); + Long changeNumber = 0L; + while (resultSet.next()) { + changeNumber = resultSet.getLong("changenumber"); + String id = resultSet.getString("id"); + Instant modificationInstant = resultSet.getTimestamp("time").toInstant(); + int resultingVersion = resultSet.getInt("resulting_record_version"); + + System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); + } + + if (changeNumber > lastIndexedChangeNumber) { + psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", changeNumber)); + } + } + + } + + /** + * This resets the state of the Indexing code to "now", as in: + * "forget whatever you thought you had left to, you are just indexing new changes from *now* and forward." + * + * This should be called whenever a full reindexing has been done. + */ + public synchronized static void resetStateToNow(PostgreSQLComponent psql) throws SQLException { + String sql = """ + INSERT INTO lddb__state (key, value) + SELECT 'ElasticIndexer', jsonb_build_object( 'lastIndexed', MAX(changenumber) ) FROM lddb__change_log + ON CONFLICT (key) + DO UPDATE SET value = EXCLUDED.value; + """.stripIndent(); + + try (Connection connection = psql.getOuterConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + statement.executeUpdate(); + } + } + + /** + * Run in background, and index data continually as it changes + */ + public synchronized static void start(PostgreSQLComponent psql, ElasticSearch elastic) { + if (worker != null) + return; + + worker = Thread.ofPlatform().name("Whelk elastic indexing").unstarted( new IndexingRunnable(psql, elastic) ); + worker.start(); + } + + // The necessary machinery for invoking iterate() at a suitable cadence + private static class IndexingRunnable implements Runnable { + private PostgreSQLComponent psql; + ElasticSearch elastic; + + public IndexingRunnable(PostgreSQLComponent psql, ElasticSearch elastic) { + this.psql = psql; + this.elastic = elastic; + } + + public void run() { + while (true) { + + try { + iterate(psql, elastic); + + // Don't run hot, wait a little before the next pass. + try { + Thread.sleep(100); // 0.1 seconds + } catch (InterruptedException ie) { /* ignore */ } + } catch (Exception e) { + // Catch all, and wait a little before retrying. This thread should never be allowed to crash completely. + logger.error("Unexpected exception while indexing.", e); + try { + Thread.sleep(120 * 1000); // 2 minutes + } catch (InterruptedException ie) { /* ignore */ } + } + + } + } + } +} From ced5e51949f7a568e2b73a6093388a33b8a51367 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Wed, 2 Apr 2025 12:27:39 +0200 Subject: [PATCH 03/17] Encode large numbers as json-text. --- .../src/main/groovy/whelk/housekeeping/WebInterface.groovy | 5 ++++- whelk-core/src/main/groovy/whelk/Indexing.java | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy b/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy index 662f173b1c..8ed8bd9c2d 100755 --- a/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy +++ b/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy @@ -1,5 +1,6 @@ package whelk.housekeeping +import whelk.Indexing import whelk.Whelk import whelk.util.WhelkFactory; @@ -40,7 +41,9 @@ public class WebInterface extends HttpServlet { Scheduler cronScheduler = new Scheduler() public void init() { - Whelk whelk = WhelkFactory.getSingletonWhelk(); + Whelk whelk = WhelkFactory.getSingletonWhelk() + + Indexing.start(whelk.getStorage(), whelk.elastic) List houseKeepers = [ // Automatic generation is disabled for now, may need design changes approved before activation. diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index aec6540b9f..a71903bdea 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -29,7 +29,7 @@ private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) thr resetStateToNow(psql); storedIndexerState = psql.getState(INDEXER_STATE_KEY); } - long lastIndexedChangeNumber = (Long) storedIndexerState.get("lastIndexed"); + long lastIndexedChangeNumber = Long.parseLong( (String) storedIndexerState.get("lastIndexed") ); String sql = "SELECT * FROM lddb__change_log WHERE changenumber > ? ORDER BY changenumber ASC LIMIT 500"; try (Connection connection = psql.getOuterConnection(); @@ -51,7 +51,7 @@ private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) thr } if (changeNumber > lastIndexedChangeNumber) { - psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", changeNumber)); + psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", ""+changeNumber)); } } @@ -66,7 +66,7 @@ private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) thr public synchronized static void resetStateToNow(PostgreSQLComponent psql) throws SQLException { String sql = """ INSERT INTO lddb__state (key, value) - SELECT 'ElasticIndexer', jsonb_build_object( 'lastIndexed', MAX(changenumber) ) FROM lddb__change_log + SELECT 'ElasticIndexer', jsonb_build_object( 'lastIndexed', MAX(changenumber)::text ) FROM lddb__change_log ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value; """.stripIndent(); From a03e179a2a8de5299e80c7afdaea540be58108d5 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Thu, 3 Apr 2025 10:02:37 +0200 Subject: [PATCH 04/17] Move indexing out of whelk. --- .../whelk/housekeeping/WebInterface.groovy | 2 +- .../migrations/00000024-add-change-log.plsql | 1 + .../src/main/groovy/whelk/Indexing.java | 139 +++++++++++++++-- whelk-core/src/main/groovy/whelk/Whelk.groovy | 147 +----------------- .../component/PostgreSQLComponent.groovy | 31 ++-- 5 files changed, 153 insertions(+), 167 deletions(-) diff --git a/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy b/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy index 8ed8bd9c2d..dc8c0076ac 100755 --- a/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy +++ b/housekeeping/src/main/groovy/whelk/housekeeping/WebInterface.groovy @@ -43,7 +43,7 @@ public class WebInterface extends HttpServlet { public void init() { Whelk whelk = WhelkFactory.getSingletonWhelk() - Indexing.start(whelk.getStorage(), whelk.elastic) + Indexing.start(whelk) List houseKeepers = [ // Automatic generation is disabled for now, may need design changes approved before activation. diff --git a/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql b/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql index 57dc0939c5..c850e951e2 100644 --- a/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql +++ b/librisxl-tools/postgresql/migrations/00000024-add-change-log.plsql @@ -27,6 +27,7 @@ BEGIN changenumber BIGSERIAL PRIMARY KEY, id text NOT NULL, loud BOOLEAN NOT NULL, + skipindexdependers BOOLEAN NOT NULL, time timestamp with time zone DEFAULT now() NOT NULL, resulting_record_version INTEGER NOT NULL ); diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index a71903bdea..35f4f59df5 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -1,5 +1,6 @@ package whelk; +import com.google.common.collect.Iterables; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import whelk.component.ElasticSearch; @@ -10,7 +11,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; -import java.util.Map; +import java.util.*; + +import static whelk.FeatureFlags.Flag.INDEX_BLANK_WORKS; public class Indexing { @@ -23,7 +26,8 @@ public class Indexing { /** * Index all changes since last invocation of this function */ - private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) throws SQLException { + private static void iterate(Whelk whelk) throws SQLException { + PostgreSQLComponent psql = whelk.getStorage(); Map storedIndexerState = psql.getState(INDEXER_STATE_KEY); if (storedIndexerState == null){ resetStateToNow(psql); @@ -46,6 +50,7 @@ private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) thr String id = resultSet.getString("id"); Instant modificationInstant = resultSet.getTimestamp("time").toInstant(); int resultingVersion = resultSet.getInt("resulting_record_version"); + boolean skipIndexDependers = resultSet.getBoolean("skipindexdependers"); System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); } @@ -57,6 +62,115 @@ private static void iterate(PostgreSQLComponent psql, ElasticSearch elastic) thr } + private void reindexUpdated(Document updated, Document preUpdateDoc, boolean skipIndexDependers, Whelk whelk) { + whelk.elastic.index(updated, whelk); + if (whelk.getFeatures().isEnabled(INDEX_BLANK_WORKS)) { + Set removedIDs = preUpdateDoc.getVirtualRecordIds(); + removedIDs.removeAll(updated.getVirtualRecordIds()); + for (String removedID : removedIDs) { + whelk.elastic.remove(removedID); + } + + Set existingIDs = updated.getVirtualRecordIds(); + for (String existingID : existingIDs) { + whelk.elastic.index(updated.getVirtualRecord(existingID), whelk); + } + } + if (hasChangedMainEntityId(updated, preUpdateDoc)) { + reindexAllLinks(updated.getShortId(), whelk); + } else if (!skipIndexDependers) { + reindexAffected(updated, preUpdateDoc.getExternalRefs(), updated.getExternalRefs(), whelk); + } + } + + static boolean hasChangedMainEntityId(Document updated, Document preUpdateDoc) { + List preMainEntityIDs = preUpdateDoc.getThingIdentifiers(); + List postMainEntityIDs = updated.getThingIdentifiers(); + + if (!postMainEntityIDs.isEmpty() && !preMainEntityIDs.isEmpty()){ + return postMainEntityIDs.getFirst().equals(preMainEntityIDs.getFirst()); + } + return false; + } + + private void reindexAllLinks(String id, Whelk whelk) { + SortedSet links = whelk.getStorage().getDependencies(id); + links.addAll(whelk.getStorage().getDependers(id)); + bulkIndex(links, whelk); + } + + private void bulkIndex(Iterable ids, Whelk whelk) { + for (List a : Iterables.partition(ids, 100)) { + whelk.elastic.bulkIndex(a, whelk); + } + } + + private void reindexAffected(Document document, Set preUpdateLinks, Set postUpdateLinks, Whelk whelk) { + Set addedLinks = new HashSet<>(postUpdateLinks); + addedLinks.removeAll(preUpdateLinks); + Set removedLinks = new HashSet<>(preUpdateLinks); + removedLinks.removeAll(postUpdateLinks); + + for (Link link : removedLinks) { + String id = whelk.getStorage().getSystemIdByIri(link.getIri()); + if (id != null) { + whelk.elastic.decrementReverseLinks(id, link.getRelation()); + } + } + + for (Link link : addedLinks) { + String id = whelk.getStorage().getSystemIdByIri(link.getIri()); + if (id != null) { + Document doc = whelk.getStorage().load(id); + List lenses = Arrays.asList("chips", "cards", "full"); + List reverseRelations = new ArrayList<>(); + for (String lens : lenses) { + reverseRelations.addAll ( whelk.getJsonld().getInverseProperties(doc.data, lens) ); + } + + if (reverseRelations.contains(link.getRelation())) { + // we added a link to a document that includes us in its @reverse relations, reindex it + whelk.elastic.index(doc, whelk); + // that document may in turn have documents that include it, and by extension us in their + // @reverse relations. Reindex them. (For example item -> instance -> work) + // TODO this should be calculated in a more general fashion. We depend on the fact that indexed + // TODO docs are embellished one level (cards, chips) -> everything else must be integral relations + reindexAffectedReverseIntegral(doc, whelk); + } else { + // just update link counter + whelk.elastic.incrementReverseLinks(id, link.getRelation()); + } + } + } + + if (whelk.getStorage().isCardChangedOrNonexistent(document.getShortId())) { + List documentIDs = document.getThingIdentifiers(); + documentIDs.addAll(document.getRecordIdentifiers()); + bulkIndex(whelk.elastic.getAffectedIds(documentIDs), whelk); + } + } + + private void reindexAffectedReverseIntegral(Document reIndexedDoc, Whelk whelk) { + Set externalReferences = JsonLd.getExternalReferences(reIndexedDoc.data); + for (Link link : externalReferences) { + String p = link.property(); + if (whelk.getJsonld().isIntegral(whelk.getJsonld().getInverseProperty(p))) { + String id = whelk.getStorage().getSystemIdByIri(link.getIri()); + Document doc = whelk.getStorage().load(id); + + List lenses = Arrays.asList("chips", "cards", "full"); + List reverseRelations = new ArrayList<>(); + for (String lens : lenses) { + reverseRelations.addAll ( whelk.getJsonld().getInverseProperties(doc.data, lens) ); + } + + if (reverseRelations.contains(p)) { + whelk.elastic.index(doc, whelk); + } + } + } + } + /** * This resets the state of the Indexing code to "now", as in: * "forget whatever you thought you had left to, you are just indexing new changes from *now* and forward." @@ -75,34 +189,39 @@ ON CONFLICT (key) PreparedStatement statement = connection.prepareStatement(sql)) { statement.executeUpdate(); } + + // At the one first start of this, the above 'MAX(changenumber)' will be null, as there are no + // logged changenumbers yet. To cover this case, set an initial 0 explicitly. + Map storedIndexerState = psql.getState(INDEXER_STATE_KEY); + if (storedIndexerState != null && storedIndexerState.get("lastIndexed") == null) { + psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", "0")); + } } /** * Run in background, and index data continually as it changes */ - public synchronized static void start(PostgreSQLComponent psql, ElasticSearch elastic) { + public synchronized static void start(Whelk whelk) { if (worker != null) return; - worker = Thread.ofPlatform().name("Whelk elastic indexing").unstarted( new IndexingRunnable(psql, elastic) ); + worker = Thread.ofPlatform().name("Whelk elastic indexing").unstarted( new IndexingRunnable(whelk) ); worker.start(); } // The necessary machinery for invoking iterate() at a suitable cadence private static class IndexingRunnable implements Runnable { - private PostgreSQLComponent psql; - ElasticSearch elastic; + Whelk whelk; - public IndexingRunnable(PostgreSQLComponent psql, ElasticSearch elastic) { - this.psql = psql; - this.elastic = elastic; + public IndexingRunnable(Whelk whelk) { + this.whelk = whelk; } public void run() { while (true) { try { - iterate(psql, elastic); + iterate(whelk); // Don't run hot, wait a little before the next pass. try { diff --git a/whelk-core/src/main/groovy/whelk/Whelk.groovy b/whelk-core/src/main/groovy/whelk/Whelk.groovy index f1c696fc55..79d0d0e365 100644 --- a/whelk-core/src/main/groovy/whelk/Whelk.groovy +++ b/whelk-core/src/main/groovy/whelk/Whelk.groovy @@ -302,117 +302,6 @@ class Whelk { .collectEntries { id, doc -> [(idMap.getOrDefault(id, id)): doc] } } - private void reindexUpdated(Document updated, Document preUpdateDoc) { - indexAsyncOrSync { - elastic.index(updated, this) - if (features.isEnabled(INDEX_BLANK_WORKS)) { - (preUpdateDoc.getVirtualRecordIds() - updated.getVirtualRecordIds()).each { elastic.remove(it) } - updated.getVirtualRecordIds().each {elastic.index(updated.getVirtualRecord(it), this) } - } - if (!skipIndexDependers) { - if (hasChangedMainEntityId(updated, preUpdateDoc)) { - reindexAllLinks(updated.shortId) - } else { - reindexAffected(updated, preUpdateDoc.getExternalRefs(), updated.getExternalRefs()) - } - } - } - } - - private void indexAsyncOrSync(Runnable runnable) { - if (skipIndex) { - return - } - - if (!elastic) { - log.warn("Elasticsearch not configured when trying to reindex") - return - } - - Runnable reindex = { - try { - runnable.run() - } - catch (Exception e) { - log.error("Error reindexing: $e", e) - } - } - - if (isBatchJobThread()) { - // Update them synchronously - reindex.run() - } else { - // else use a fire-and-forget thread - new Thread(indexers, reindex).start() - } - } - - private void reindexAllLinks(String id) { - SortedSet links = storage.getDependencies(id) - links.addAll(storage.getDependers(id)) - bulkIndex(links) - } - - private void reindexAffected(Document document, Set preUpdateLinks, Set postUpdateLinks) { - Set addedLinks = (postUpdateLinks - preUpdateLinks) - Set removedLinks = (preUpdateLinks - postUpdateLinks) - - removedLinks.each { link -> - String id = storage.getSystemIdByIri(link.iri) - if (id) { - elastic.decrementReverseLinks(id, link.relation) - } - } - - addedLinks.each { link -> - String id = storage.getSystemIdByIri(link.iri) - if (id) { - Document doc = storage.load(id) - def lenses = ['chips', 'cards', 'full'] - def reverseRelations = lenses.collect { jsonld.getInverseProperties(doc.data, it) }.flatten() - if (reverseRelations.contains(link.relation)) { - // we added a link to a document that includes us in its @reverse relations, reindex it - elastic.index(doc, this) - // that document may in turn have documents that include it, and by extension us in their - // @reverse relations. Reindex them. (For example item -> instance -> work) - // TODO this should be calculated in a more general fashion. We depend on the fact that indexed - // TODO docs are embellished one level (cards, chips) -> everything else must be integral relations - reindexAffectedReverseIntegral(doc) - } else { - // just update link counter - elastic.incrementReverseLinks(id, link.relation) - } - } - } - - if (storage.isCardChangedOrNonexistent(document.getShortId())) { - bulkIndex(elastic.getAffectedIds(document.getThingIdentifiers() + document.getRecordIdentifiers())) - } - } - - private void reindexAffectedReverseIntegral(Document reIndexedDoc) { - JsonLd.getExternalReferences(reIndexedDoc.data).forEach { link -> - String p = link.property() - if (jsonld.isIntegral(jsonld.getInverseProperty(p))) { - String id = storage.getSystemIdByIri(link.iri) - Document doc = storage.load(id) - def lenses = ['chips', 'cards', 'full'] - def reverseRelations = lenses - .collect { jsonld.getInverseProperties(doc.data, it) } - .flatten() - if (reverseRelations.contains(p)) { - elastic.index(doc, this) - } - } - } - } - - private void bulkIndex(Iterable ids) { - Iterables.partition(ids, 100).each { - elastic.bulkIndexWithRetry(it, this) - } - } - /** * Returns tuples for ID collisions, the first entry in the tuple is the system ID of the colliding record, * the second is a freetext description of the reason for the collision @@ -477,17 +366,8 @@ class Whelk { throw new StorageCreateFailedException(document.getShortId(), "Document considered a duplicate of : " + collidingIDs) } - boolean success = storage.createDocument(document, changedIn, changedBy, collection, deleted) + boolean success = storage.createDocument(document, changedIn, changedBy, collection, deleted, this.skipIndexDependers) if (success) { - indexAsyncOrSync { - elastic.index(document, this) - if (features.isEnabled(INDEX_BLANK_WORKS)) { - document.getVirtualRecordIds().each {elastic.index(document.getVirtualRecord(it), this) } - } - if (!skipIndexDependers) { - reindexAffected(document, new TreeSet<>(), document.getExternalRefs()) - } - } sparqlUpdater?.pollNow() } return success @@ -505,7 +385,7 @@ class Whelk { */ boolean storeAtomicUpdate(String id, boolean minorUpdate, boolean writeIdenticalVersions, String changedIn, String changedBy, UpdateAgent updateAgent) { Document preUpdateDoc = null - Document updated = storage.storeUpdate(id, minorUpdate, writeIdenticalVersions, changedIn, changedBy, { Document doc -> + Document updated = storage.storeUpdate(id, minorUpdate, writeIdenticalVersions, skipIndexDependers, changedIn, changedBy, { Document doc -> preUpdateDoc = doc.clone() updateAgent.update(doc) normalize(doc) @@ -515,7 +395,7 @@ class Whelk { return false } - reindexUpdated(updated, preUpdateDoc) + //reindexUpdated(updated, preUpdateDoc) sparqlUpdater?.pollNow() return true @@ -524,13 +404,13 @@ class Whelk { void storeAtomicUpdate(Document doc, boolean minorUpdate, boolean writeIdenticalVersions, String changedIn, String changedBy, String oldChecksum) { normalize(doc) Document preUpdateDoc = storage.load(doc.shortId) - Document updated = storage.storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, changedIn, changedBy, oldChecksum) + Document updated = storage.storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, skipIndexDependers, changedIn, changedBy, oldChecksum) if (updated == null) { return } - reindexUpdated(updated, preUpdateDoc) + //reindexUpdated(updated, preUpdateDoc) sparqlUpdater?.pollNow() } @@ -563,16 +443,7 @@ class Whelk { if (!force) { assertNoDependers(doc) } - storage.remove(id, changedIn, changedBy) - indexAsyncOrSync { - elastic.remove(id) - if (features.isEnabled(INDEX_BLANK_WORKS)) { - doc.getVirtualRecordIds().each { elastic.remove(it) } - } - if (!skipIndexDependers) { - reindexAffected(doc, doc.getExternalRefs(), Collections.emptySet()) - } - } + storage.remove(id, changedIn, changedBy, skipIndexDependers) } } @@ -584,12 +455,6 @@ class Whelk { } } - static boolean hasChangedMainEntityId(Document updated, Document preUpdateDoc) { - preUpdateDoc.getThingIdentifiers()[0] && - updated.getThingIdentifiers()[0] && - updated.getThingIdentifiers()[0] != preUpdateDoc.getThingIdentifiers()[0] - } - void embellish(Document document, List levels = null) { def docsByIris = { List iris -> bulkLoad(iris).values().collect { it.data } } Embellisher e = new Embellisher(jsonld, docsByIris, storage.&getCards, relations.&getByReverse) diff --git a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy index 6c9171ca40..e65d138bab 100644 --- a/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy +++ b/whelk-core/src/main/groovy/whelk/component/PostgreSQLComponent.groovy @@ -725,7 +725,7 @@ class PostgreSQLComponent { } } - void logUpdate(String id, boolean loud, Date time, Connection connection) { + void logUpdate(String id, boolean loud, boolean skipIndexDependers, Date time, Connection connection) { // Which version did we just create? // This rests on the assumption that lddb-row being updated is locked for the transaction, so that a new @@ -744,19 +744,20 @@ class PostgreSQLComponent { close(resultSet, countStatement) } - PreparedStatement logStatement = connection.prepareStatement("INSERT INTO lddb__change_log (id, loud, time, resulting_record_version) VALUES (?,?,?,?)") + PreparedStatement logStatement = connection.prepareStatement("INSERT INTO lddb__change_log (id, loud, skipindexdependers, time, resulting_record_version) VALUES (?,?,?,?,?)") try { logStatement.setString(1, id) logStatement.setBoolean(2, loud) - logStatement.setTimestamp(3, new Timestamp(time.getTime())) - logStatement.setInt(4, resultingVersion) + logStatement.setBoolean(3, skipIndexDependers) + logStatement.setTimestamp(4, new Timestamp(time.getTime())) + logStatement.setInt(5, resultingVersion) logStatement.execute() } finally { close(logStatement) } } - boolean createDocument(Document doc, String changedIn, String changedBy, String collection, boolean deleted) { + boolean createDocument(Document doc, String changedIn, String changedBy, String collection, boolean deleted, boolean skipIndexDependers) { log.debug("Saving ${doc.getShortId()}, ${changedIn}, ${changedBy}, ${collection}") return withDbConnection { @@ -825,7 +826,7 @@ class PostgreSQLComponent { insert.executeUpdate() saveVersion(doc, connection, now, now, changedIn, changedBy, collection, deleted) - logUpdate(doc.getShortId(), true, now, connection) + logUpdate(doc.getShortId(), true, skipIndexDependers, now, connection) refreshDerivativeTables(doc, connection, deleted) connection.commit() @@ -958,12 +959,12 @@ class PostgreSQLComponent { } } - Document storeAtomicUpdate(Document doc, boolean minorUpdate, boolean writeIdenticalVersions, String changedIn, String changedBy, String oldChecksum) { + Document storeAtomicUpdate(Document doc, boolean minorUpdate, boolean writeIdenticalVersions, boolean skipIndexDependers, String changedIn, String changedBy, String oldChecksum) { return withDbConnection { Connection connection = getMyConnection() connection.setAutoCommit(false) List postCommitActions = [] - Document result = storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, changedIn, changedBy, oldChecksum, connection, postCommitActions) + Document result = storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, skipIndexDependers, changedIn, changedBy, oldChecksum, connection, postCommitActions) connection.commit() connection.setAutoCommit(true) postCommitActions.each { it.run() } @@ -971,14 +972,14 @@ class PostgreSQLComponent { } } - Document storeUpdate(String id, boolean minorUpdate, boolean writeIdenticalVersions, String changedIn, String changedBy, UpdateAgent updateAgent) { + Document storeUpdate(String id, boolean minorUpdate, boolean writeIdenticalVersions, boolean skipIndexDependers, String changedIn, String changedBy, UpdateAgent updateAgent) { int retriesLeft = STALE_UPDATE_RETRIES while (true) { try { Document doc = load(id) String checksum = doc.getChecksum(jsonld) updateAgent.update(doc) - Document updated = storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, changedIn, changedBy, checksum) + Document updated = storeAtomicUpdate(doc, minorUpdate, writeIdenticalVersions, skipIndexDependers, changedIn, changedBy, checksum) return updated } catch (StaleUpdateException e) { @@ -993,7 +994,7 @@ class PostgreSQLComponent { } } - Document storeAtomicUpdate(Document doc, boolean minorUpdate, boolean writeIdenticalVersions, String changedIn, String changedBy, String oldChecksum, + Document storeAtomicUpdate(Document doc, boolean minorUpdate, boolean writeIdenticalVersions, boolean skipIndexDependers, String changedIn, String changedBy, String oldChecksum, Connection connection, List postCommitActions) { String id = doc.shortId log.debug("Saving (atomic update) ${id}") @@ -1097,7 +1098,7 @@ class PostgreSQLComponent { updateStatement.execute() saveVersion(doc, connection, createdTime, modTime, changedIn, changedBy, collection, deleted) - logUpdate(doc.getShortId(), !minorUpdate, actualUpdateTime, connection) + logUpdate(doc.getShortId(), !minorUpdate, skipIndexDependers, actualUpdateTime, connection) // If the mainentity has changed URI (for example happens when new id.kb.se-uris are added to records) if ( preUpdateDoc.getThingIdentifiers()[0] && @@ -1110,7 +1111,7 @@ class PostgreSQLComponent { SortedSet idsLinkingToOldId = getDependencyData(id, GET_DEPENDERS, connection) for (String dependerId : idsLinkingToOldId) { Document depender = load(dependerId) - storeAtomicUpdate(depender, true, false, changedIn, changedBy, depender.getChecksum(jsonld), connection, postCommitActions) + storeAtomicUpdate(depender, true, false, true, changedIn, changedBy, depender.getChecksum(jsonld), connection, postCommitActions) } } @@ -2785,11 +2786,11 @@ class PostgreSQLComponent { } } - void remove(String identifier, String changedIn, String changedBy) { + void remove(String identifier, String changedIn, String changedBy, boolean skipIndexDependers) { log.debug("Marking document with ID ${identifier} as deleted.") try { - storeUpdate(identifier, false, true, changedIn, changedBy, + storeUpdate(identifier, false, true, skipIndexDependers, changedIn, changedBy, { Document doc -> doc.setDeleted(true) // Add a tombstone marker (without removing anything) perhaps? From 5183dab544a558cf2d01ae7d76d37d8b3f16200d Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Thu, 3 Apr 2025 12:15:37 +0200 Subject: [PATCH 05/17] First actual indexing. Retry queue must go, however. --- .../src/main/groovy/whelk/Indexing.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index 35f4f59df5..bba1d33ecf 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -52,7 +52,18 @@ private static void iterate(Whelk whelk) throws SQLException { int resultingVersion = resultSet.getInt("resulting_record_version"); boolean skipIndexDependers = resultSet.getBoolean("skipindexdependers"); - System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); + List versions = whelk.getStorage().loadAllVersions(id); + if (resultingVersion == 0) + whelk.elastic.index(versions.getFirst(), whelk); + else { + Document updated = versions.get(resultingVersion); + Document preUpdateDoc = versions.get(resultingVersion-1); + + System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); + System.err.println("data to index:\n\t" + updated.getDataAsString()+"\n"); + System.err.println("previous version:\n\t" + preUpdateDoc.getDataAsString() + "\n\n"); + reindexUpdated(updated, preUpdateDoc, skipIndexDependers, whelk); + } } if (changeNumber > lastIndexedChangeNumber) { @@ -62,7 +73,7 @@ private static void iterate(Whelk whelk) throws SQLException { } - private void reindexUpdated(Document updated, Document preUpdateDoc, boolean skipIndexDependers, Whelk whelk) { + private static void reindexUpdated(Document updated, Document preUpdateDoc, boolean skipIndexDependers, Whelk whelk) { whelk.elastic.index(updated, whelk); if (whelk.getFeatures().isEnabled(INDEX_BLANK_WORKS)) { Set removedIDs = preUpdateDoc.getVirtualRecordIds(); @@ -83,7 +94,7 @@ private void reindexUpdated(Document updated, Document preUpdateDoc, boolean ski } } - static boolean hasChangedMainEntityId(Document updated, Document preUpdateDoc) { + private static boolean hasChangedMainEntityId(Document updated, Document preUpdateDoc) { List preMainEntityIDs = preUpdateDoc.getThingIdentifiers(); List postMainEntityIDs = updated.getThingIdentifiers(); @@ -93,19 +104,20 @@ static boolean hasChangedMainEntityId(Document updated, Document preUpdateDoc) { return false; } - private void reindexAllLinks(String id, Whelk whelk) { + private static void reindexAllLinks(String id, Whelk whelk) { SortedSet links = whelk.getStorage().getDependencies(id); links.addAll(whelk.getStorage().getDependers(id)); bulkIndex(links, whelk); } - private void bulkIndex(Iterable ids, Whelk whelk) { + private static void bulkIndex(Iterable ids, Whelk whelk) { for (List a : Iterables.partition(ids, 100)) { - whelk.elastic.bulkIndex(a, whelk); + Collection docs = whelk.bulkLoad(a).values(); + whelk.elastic.bulkIndex(docs, whelk); } } - private void reindexAffected(Document document, Set preUpdateLinks, Set postUpdateLinks, Whelk whelk) { + private static void reindexAffected(Document document, Set preUpdateLinks, Set postUpdateLinks, Whelk whelk) { Set addedLinks = new HashSet<>(postUpdateLinks); addedLinks.removeAll(preUpdateLinks); Set removedLinks = new HashSet<>(preUpdateLinks); @@ -150,7 +162,7 @@ private void reindexAffected(Document document, Set preUpdateLinks, Set
  • externalReferences = JsonLd.getExternalReferences(reIndexedDoc.data); for (Link link : externalReferences) { String p = link.property(); From 6372aed0cc6c23b0e793cc0dbead96d461c8304b Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Wed, 9 Apr 2025 12:11:26 +0200 Subject: [PATCH 06/17] Working robust indexing. No vacation-mode yet. --- .../whelk/reindexer/ElasticReindexer.groovy | 2 +- .../groovy/whelk/rest/api/RefreshAPI.groovy | 1 - .../src/main/groovy/whelk/Indexing.java | 77 ++++++++---- .../whelk/component/ElasticSearch.groovy | 113 ++++-------------- 4 files changed, 77 insertions(+), 116 deletions(-) diff --git a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy index cb6fff4c52..f8faa97d89 100644 --- a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy +++ b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy @@ -121,7 +121,7 @@ class ElasticReindexer { private Exception tryBulkIndex(List docs, Whelk whelk) { try { - whelk.elastic.bulkIndex(docs, whelk) + whelk.elastic.bulkIndex(docs, whelk, true) return null } catch (Exception e) { diff --git a/rest/src/main/groovy/whelk/rest/api/RefreshAPI.groovy b/rest/src/main/groovy/whelk/rest/api/RefreshAPI.groovy index c2de182d23..12b62602df 100644 --- a/rest/src/main/groovy/whelk/rest/api/RefreshAPI.groovy +++ b/rest/src/main/groovy/whelk/rest/api/RefreshAPI.groovy @@ -108,6 +108,5 @@ class RefreshAPI extends HttpServlet void refreshQuietly(Document doc) { whelk.storage.refreshDerivativeTables(doc) - whelk.elastic.index(doc, whelk) } } diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index bba1d33ecf..142522930e 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -10,7 +10,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAmount; import java.util.*; import static whelk.FeatureFlags.Flag.INDEX_BLANK_WORKS; @@ -22,11 +25,14 @@ public class Indexing { static Thread worker; private final static String INDEXER_STATE_KEY = "ElasticIndexer"; private final static Logger logger = LogManager.getLogger(Indexing.class); + private static Instant lastBehindMessageAt = Instant.EPOCH; /** * Index all changes since last invocation of this function + * + * returns false if there was nothing to index, true otherwise */ - private static void iterate(Whelk whelk) throws SQLException { + private static boolean iterate(Whelk whelk) throws SQLException { PostgreSQLComponent psql = whelk.getStorage(); Map storedIndexerState = psql.getState(INDEXER_STATE_KEY); if (storedIndexerState == null){ @@ -44,33 +50,58 @@ private static void iterate(Whelk whelk) throws SQLException { statement.setLong(1, lastIndexedChangeNumber); ResultSet resultSet = statement.executeQuery(); - Long changeNumber = 0L; + Long indexedChangeNumber = 0L; + if (!resultSet.isBeforeFirst()) { + return false; + } while (resultSet.next()) { - changeNumber = resultSet.getLong("changenumber"); + Long changeNumber = resultSet.getLong("changenumber"); String id = resultSet.getString("id"); Instant modificationInstant = resultSet.getTimestamp("time").toInstant(); int resultingVersion = resultSet.getInt("resulting_record_version"); boolean skipIndexDependers = resultSet.getBoolean("skipindexdependers"); - List versions = whelk.getStorage().loadAllVersions(id); - if (resultingVersion == 0) - whelk.elastic.index(versions.getFirst(), whelk); - else { - Document updated = versions.get(resultingVersion); - Document preUpdateDoc = versions.get(resultingVersion-1); - - System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); - System.err.println("data to index:\n\t" + updated.getDataAsString()+"\n"); - System.err.println("previous version:\n\t" + preUpdateDoc.getDataAsString() + "\n\n"); - reindexUpdated(updated, preUpdateDoc, skipIndexDependers, whelk); + long minutesBehind = modificationInstant.until(Instant.now(), ChronoUnit.MINUTES); + if (minutesBehind >= 15 && lastBehindMessageAt.until(Instant.now(), ChronoUnit.MINUTES) >= 30) { + lastBehindMessageAt = Instant.now(); + logger.error("Elastic indexing is currently " + minutesBehind + " minutes behind. The next change to index is: " + changeNumber + + " (" + id + "). If this number is the same between two of these messages, it means that indexing is stuck on this change " + + "and cannot proceed until indexing it becomes possible. If you (in an emergency) need to proceed without indexing " + + "this change, do the following in the database: \"DELETE FROM lddb__change_log WHERE changenumber = " + changeNumber + ";\" " + + "No data will be lost (the log is temporary). But be aware: The inconsistency in the search index is now on YOU and will " + + "remain until the record is resaved or a full reindexing is done."); } + + try { + List versions = whelk.getStorage().loadAllVersions(id); + if (resultingVersion == 0) + whelk.elastic.index(versions.getFirst(), whelk); + else { + Document updated = versions.get(resultingVersion); + Document preUpdateDoc = versions.get(resultingVersion - 1); + + //System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); + //System.err.println("data to index:\n\t" + updated.getDataAsString() + "\n"); + //System.err.println("previous version:\n\t" + preUpdateDoc.getDataAsString() + "\n\n"); + reindexUpdated(updated, preUpdateDoc, skipIndexDependers, whelk); + } + } catch (Exception e) { + logger.warn("Failed to index " + id + ", will try again.", e); + // When we fail, wait a little before trying again. + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ie) { /* ignore */ } + break; // out of the while, without updating indexedChangeNumber + } + indexedChangeNumber = changeNumber; } - if (changeNumber > lastIndexedChangeNumber) { - psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", ""+changeNumber)); + if (indexedChangeNumber > lastIndexedChangeNumber) { + psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", ""+indexedChangeNumber)); } } + return true; } private static void reindexUpdated(Document updated, Document preUpdateDoc, boolean skipIndexDependers, Whelk whelk) { @@ -113,7 +144,7 @@ private static void reindexAllLinks(String id, Whelk whelk) { private static void bulkIndex(Iterable ids, Whelk whelk) { for (List a : Iterables.partition(ids, 100)) { Collection docs = whelk.bulkLoad(a).values(); - whelk.elastic.bulkIndex(docs, whelk); + whelk.elastic.bulkIndex(docs, whelk, false); } } @@ -233,12 +264,12 @@ public void run() { while (true) { try { - iterate(whelk); - - // Don't run hot, wait a little before the next pass. - try { - Thread.sleep(100); // 0.1 seconds - } catch (InterruptedException ie) { /* ignore */ } + if (!iterate(whelk)) { + // If there was nothing to index, don't run hot! Wait a little before the next pass! + try { + Thread.sleep(200); // 0.2 seconds + } catch (InterruptedException ie) { /* ignore */ } + } } catch (Exception e) { // Catch all, and wait a little before retrying. This thread should never be allowed to crash completely. logger.error("Unexpected exception while indexing.", e); diff --git a/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy b/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy index 3f9b10144d..93ca9b4787 100644 --- a/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy +++ b/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy @@ -53,8 +53,6 @@ class ElasticSearch { private boolean isPitApiAvailable = false private static final int ES_LOG_MIN_DURATION = 2000 // Only log queries taking at least this amount of milliseconds - private final Queue indexingRetryQueue = new LinkedBlockingQueue<>() - private static final class Lenses { public static final DerivedLens CARD_ONLY = new DerivedLens( FresnelUtil.LensGroupName.Card, @@ -99,16 +97,6 @@ class ElasticSearch { client = ElasticClient.withDefaultHttpClient(elasticHosts, elasticUser, elasticPassword) bulkClient = ElasticClient.withBulkHttpClient(elasticHosts, elasticUser, elasticPassword) - - new Timer("ElasticIndexingRetries", true).schedule(new TimerTask() { - void run() { - indexingRetryQueue.size().times { - Runnable entry = indexingRetryQueue.poll() - if (entry != null) - entry.run() - } - } - }, 60*1000, 10*1000) initSettings() } @@ -221,7 +209,7 @@ class ElasticSearch { } } - void bulkIndex(Collection docs, Whelk whelk) { + void bulkIndex(Collection docs, Whelk whelk, boolean tolerant) { if (docs) { String bulkString = docs.findResults{ doc -> try { @@ -280,9 +268,9 @@ class ElasticSearch { int docsCount = docs.count{it} int numFailed = numFailedDueToDocError + numFailedDueToESError if (numFailed) { - log.warn("Tried bulk indexing ${docsCount} docs: ${docsCount - numFailed} succeeded, ${numFailed} failed " + + log.error("Tried bulk indexing ${docsCount} docs: ${docsCount - numFailed} succeeded, ${numFailed} failed " + "(${numFailedDueToDocError} due to document error, ${numFailedDueToESError} due to ES error). Took ${responseMap.took} ms") - if (numFailedDueToESError) { + if (numFailedDueToESError || !tolerant) { throw new UnexpectedHttpStatusException("Failed indexing documents due to ES error", 500) } } else { @@ -294,21 +282,6 @@ class ElasticSearch { } } - void bulkIndexWithRetry(Collection ids, Whelk whelk) { - Collection docs = whelk.bulkLoad(ids).values() - try { - bulkIndex(docs, whelk) - } catch (Exception e) { - if (!isBadRequest(e)) { - log.info("Failed to index batch ${ids} in elastic, placing in retry queue: $e", e) - indexingRetryQueue.add({ -> bulkIndexWithRetry(ids, whelk) }) - } - else { - log.error("Failed to index ${ids} in elastic: $e", e) - } - } - } - String createActionRow(Document doc) { def action = ["index" : [ "_index" : indexName, "_id" : toElasticId(doc.getShortId()) ]] @@ -316,25 +289,13 @@ class ElasticSearch { } void index(Document doc, Whelk whelk) { - // The justification for this uncomfortable catch-all, is that an index-failure must raise an alert (log entry) - // _internally_ but be otherwise invisible to clients (If postgres writing was ok, the save is considered ok). - try { - String response = client.performRequest( - 'PUT', - "/${indexName}/_doc/${toElasticId(doc.getShortId())}", - getShapeForIndex(doc, whelk)) - if (log.isDebugEnabled()) { - Map responseMap = mapper.readValue(response, Map) - log.debug("Indexed the document ${doc.getShortId()} as ${indexName}/_doc/${responseMap['_id']} as version ${responseMap['_version']}") - } - } catch (Exception e) { - if (!isBadRequest(e)) { - log.info("Failed to index ${doc.getShortId()} in elastic, placing in retry queue: $e", e) - indexingRetryQueue.add({ -> index(doc, whelk) }) - } - else { - log.error("Failed to index ${doc.getShortId()} in elastic: $e", e) - } + String response = client.performRequest( + 'PUT', + "/${indexName}/_doc/${toElasticId(doc.getShortId())}", + getShapeForIndex(doc, whelk)) + if (log.isDebugEnabled()) { + Map responseMap = mapper.readValue(response, Map) + log.debug("Indexed the document ${doc.getShortId()} as ${indexName}/_doc/${responseMap['_id']} as version ${responseMap['_version']}") } } @@ -361,26 +322,10 @@ class ElasticSearch { } """.stripIndent() - try { - client.performRequest( - 'POST', - "/${indexName}/_update/${toElasticId(shortId)}", - body) - } - catch (Exception e) { - if (isBadRequest(e)) { - log.warn("Failed to update reverse link counter ($deltaCount) for $shortId: $e", e) - } - else if (isNotFound(e)) { - // OK. All dependers must be removed before the dependee in lddb. But the index update can happen - // in any order, so the dependee might already be gone when trying to decrement the counter. - log.info("Could not update reverse link counter ($deltaCount) for $shortId: $e, it does not exist", e) - } - else { - log.warn("Failed to update reverse link counter ($deltaCount) for $shortId: $e, placing in retry queue.", e) - indexingRetryQueue.add({ -> updateReverseLinkCounter(shortId, relation, deltaCount) }) - } - } + client.performRequest( + 'POST', + "/${indexName}/_update/${toElasticId(shortId)}", + body) } void remove(String identifier) { @@ -388,30 +333,16 @@ class ElasticSearch { log.debug("Deleting object with identifier ${toElasticId(identifier)}.") } def dsl = ["query":["term":["_id":toElasticId(identifier)]]] - try { - def response = client.performRequest('POST', - "/${indexName}/_delete_by_query", - JsonOutput.toJson(dsl)) + def response = client.performRequest('POST', + "/${indexName}/_delete_by_query", + JsonOutput.toJson(dsl)) - Map responseMap = mapper.readValue(response, Map) - if (log.isDebugEnabled()) { - log.debug("Response: ${responseMap.deleted} of ${responseMap.total} objects deleted") - } - if (responseMap.deleted == 0) { - log.warn("Record with id $identifier was not deleted from the Elasticsearch index.") - } + Map responseMap = mapper.readValue(response, Map) + if (log.isDebugEnabled()) { + log.debug("Response: ${responseMap.deleted} of ${responseMap.total} objects deleted") } - catch(Exception e) { - if (isBadRequest(e)) { - log.warn("Failed to delete $identifier from index: $e", e) - } - else if (isNotFound(e)) { - log.warn("Tried to delete $identifier from index, but it was not there: $e", e) - } - else { - log.warn("Failed to delete $identifier from index: $e, placing in retry queue.", e) - indexingRetryQueue.add({ -> remove(identifier) }) - } + if (responseMap.deleted == 0) { + log.error("Record with id $identifier was not deleted from the Elasticsearch index.") } } From da5048d44f28efa12e18f460eed9f31232eea5d4 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Wed, 9 Apr 2025 12:55:37 +0200 Subject: [PATCH 07/17] Add catch-up after reindexing. --- .../src/main/groovy/whelk/reindexer/ElasticReindexer.groovy | 3 +++ whelk-core/src/main/groovy/whelk/Indexing.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy index f8faa97d89..6fcce4f0e0 100644 --- a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy +++ b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy @@ -2,6 +2,7 @@ package whelk.reindexer import groovy.util.logging.Log4j2 as Log import whelk.Document +import whelk.Indexing import whelk.Whelk import whelk.util.BlockingThreadPool @@ -102,6 +103,8 @@ class ElasticReindexer { } finally { threadPool.awaitAllAndShutdown() } + if (suppliedCollection == null) // If a "full" reindex. + Indexing.resetStateToNow(whelk.storage) } private void bulkIndexWithRetries(List docs, Whelk whelk) { diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index 142522930e..17d9925098 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -239,6 +239,8 @@ ON CONFLICT (key) if (storedIndexerState != null && storedIndexerState.get("lastIndexed") == null) { psql.putState(INDEXER_STATE_KEY, Map.of("lastIndexed", "0")); } + + logger.info("Elastic indexer state was reset (and is now considered \"caught up\")."); } /** From 8f2908577567da93e8bae52e5bbca94b572a1eb0 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Thu, 10 Apr 2025 10:41:25 +0200 Subject: [PATCH 08/17] Restore handling of 400-class errors in ElasticSearch, these must (unfortunately) be tolerated. Other errors will still be retried in order. --- .../whelk/reindexer/ElasticReindexer.groovy | 2 +- .../src/main/groovy/whelk/Indexing.java | 2 +- .../whelk/component/ElasticSearch.groovy | 78 +++++++++++++------ 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy index 6fcce4f0e0..28a0920c69 100644 --- a/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy +++ b/importers/src/main/groovy/whelk/reindexer/ElasticReindexer.groovy @@ -124,7 +124,7 @@ class ElasticReindexer { private Exception tryBulkIndex(List docs, Whelk whelk) { try { - whelk.elastic.bulkIndex(docs, whelk, true) + whelk.elastic.bulkIndex(docs, whelk) return null } catch (Exception e) { diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index 17d9925098..d61e9db681 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -144,7 +144,7 @@ private static void reindexAllLinks(String id, Whelk whelk) { private static void bulkIndex(Iterable ids, Whelk whelk) { for (List a : Iterables.partition(ids, 100)) { Collection docs = whelk.bulkLoad(a).values(); - whelk.elastic.bulkIndex(docs, whelk, false); + whelk.elastic.bulkIndex(docs, whelk); } } diff --git a/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy b/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy index 93ca9b4787..1aa958a46b 100644 --- a/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy +++ b/whelk-core/src/main/groovy/whelk/component/ElasticSearch.groovy @@ -209,7 +209,7 @@ class ElasticSearch { } } - void bulkIndex(Collection docs, Whelk whelk, boolean tolerant) { + void bulkIndex(Collection docs, Whelk whelk) { if (docs) { String bulkString = docs.findResults{ doc -> try { @@ -270,7 +270,7 @@ class ElasticSearch { if (numFailed) { log.error("Tried bulk indexing ${docsCount} docs: ${docsCount - numFailed} succeeded, ${numFailed} failed " + "(${numFailedDueToDocError} due to document error, ${numFailedDueToESError} due to ES error). Took ${responseMap.took} ms") - if (numFailedDueToESError || !tolerant) { + if (numFailedDueToESError) { throw new UnexpectedHttpStatusException("Failed indexing documents due to ES error", 500) } } else { @@ -289,13 +289,22 @@ class ElasticSearch { } void index(Document doc, Whelk whelk) { - String response = client.performRequest( - 'PUT', - "/${indexName}/_doc/${toElasticId(doc.getShortId())}", - getShapeForIndex(doc, whelk)) - if (log.isDebugEnabled()) { - Map responseMap = mapper.readValue(response, Map) - log.debug("Indexed the document ${doc.getShortId()} as ${indexName}/_doc/${responseMap['_id']} as version ${responseMap['_version']}") + try { + String response = client.performRequest( + 'PUT', + "/${indexName}/_doc/${toElasticId(doc.getShortId())}", + getShapeForIndex(doc, whelk)) + if (log.isDebugEnabled()) { + Map responseMap = mapper.readValue(response, Map) + log.debug("Indexed the document ${doc.getShortId()} as ${indexName}/_doc/${responseMap['_id']} as version ${responseMap['_version']}") + } + } catch (Exception e) { + if (isBadRequest(e)) { + // These errors are caught and ignored (but logged) to avoid blocking further indexing. + log.error("Failed to index ${doc.getShortId()} in elastic: $e ACTION REQUIRED TO FIX THIS (it shouldn't happen), or the index will remain incomplete.", e) + } else { + throw e + } } } @@ -322,10 +331,24 @@ class ElasticSearch { } """.stripIndent() - client.performRequest( - 'POST', - "/${indexName}/_update/${toElasticId(shortId)}", - body) + try { + client.performRequest( + 'POST', + "/${indexName}/_update/${toElasticId(shortId)}", + body) + } catch (Exception e) { + if (isBadRequest(e)) { + log.error("Failed to update reverse link counter ($deltaCount) for $shortId: $e ACTION REQUIRED or link counters will be WRONG. This shouldn't happen.", e) + } + else if (isNotFound(e)) { + // OK. All dependers must be removed before the dependee in lddb. But the index update can happen + // in any order, so the dependee might already be gone when trying to decrement the counter. + log.info("Could not update reverse link counter ($deltaCount) for $shortId: $e, it does not exist", e) + } + else { + throw e + } + } } void remove(String identifier) { @@ -333,16 +356,27 @@ class ElasticSearch { log.debug("Deleting object with identifier ${toElasticId(identifier)}.") } def dsl = ["query":["term":["_id":toElasticId(identifier)]]] - def response = client.performRequest('POST', - "/${indexName}/_delete_by_query", - JsonOutput.toJson(dsl)) + try { + def response = client.performRequest('POST', + "/${indexName}/_delete_by_query", + JsonOutput.toJson(dsl)) - Map responseMap = mapper.readValue(response, Map) - if (log.isDebugEnabled()) { - log.debug("Response: ${responseMap.deleted} of ${responseMap.total} objects deleted") - } - if (responseMap.deleted == 0) { - log.error("Record with id $identifier was not deleted from the Elasticsearch index.") + Map responseMap = mapper.readValue(response, Map) + if (log.isDebugEnabled()) { + log.debug("Response: ${responseMap.deleted} of ${responseMap.total} objects deleted") + } + if (responseMap.deleted == 0) { + log.error("Record with id $identifier was not deleted from the Elasticsearch index.") + } + } catch (Exception e) { + // warn and ignore + if (isBadRequest(e)) { + log.warn("Failed to delete $identifier from index: $e", e) + } + else if (isNotFound(e)) { + log.warn("Tried to delete $identifier from index, but it was not there: $e", e) + } + else throw e } } From 4814656e424a7c4cce9c0fa19319c31a5baf734d Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Thu, 10 Apr 2025 10:55:29 +0200 Subject: [PATCH 09/17] Clean up --- whelk-core/src/main/groovy/whelk/Indexing.java | 12 ++++-------- whelk-core/src/main/groovy/whelk/Whelk.groovy | 2 -- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/whelk-core/src/main/groovy/whelk/Indexing.java b/whelk-core/src/main/groovy/whelk/Indexing.java index d61e9db681..d6bb3647d1 100644 --- a/whelk-core/src/main/groovy/whelk/Indexing.java +++ b/whelk-core/src/main/groovy/whelk/Indexing.java @@ -62,11 +62,11 @@ private static boolean iterate(Whelk whelk) throws SQLException { boolean skipIndexDependers = resultSet.getBoolean("skipindexdependers"); long minutesBehind = modificationInstant.until(Instant.now(), ChronoUnit.MINUTES); - if (minutesBehind >= 15 && lastBehindMessageAt.until(Instant.now(), ChronoUnit.MINUTES) >= 30) { + if (minutesBehind >= 10 && lastBehindMessageAt.until(Instant.now(), ChronoUnit.MINUTES) >= 30) { lastBehindMessageAt = Instant.now(); logger.error("Elastic indexing is currently " + minutesBehind + " minutes behind. The next change to index is: " + changeNumber + - " (" + id + "). If this number is the same between two of these messages, it means that indexing is stuck on this change " + - "and cannot proceed until indexing it becomes possible. If you (in an emergency) need to proceed without indexing " + + " (" + id + "). It should not be possible, but IF this number is the same between two of these messages, it means that " + + "indexing is stuck on a particular change and cannot proceed. If you (in an emergency) need to proceed without indexing " + "this change, do the following in the database: \"DELETE FROM lddb__change_log WHERE changenumber = " + changeNumber + ";\" " + "No data will be lost (the log is temporary). But be aware: The inconsistency in the search index is now on YOU and will " + "remain until the record is resaved or a full reindexing is done."); @@ -79,17 +79,13 @@ private static boolean iterate(Whelk whelk) throws SQLException { else { Document updated = versions.get(resultingVersion); Document preUpdateDoc = versions.get(resultingVersion - 1); - - //System.err.println("Now want to reindex: " + id + " ch-nr: " + changeNumber + " recordv: " + resultingVersion); - //System.err.println("data to index:\n\t" + updated.getDataAsString() + "\n"); - //System.err.println("previous version:\n\t" + preUpdateDoc.getDataAsString() + "\n\n"); reindexUpdated(updated, preUpdateDoc, skipIndexDependers, whelk); } } catch (Exception e) { logger.warn("Failed to index " + id + ", will try again.", e); // When we fail, wait a little before trying again. try { - Thread.sleep(10 * 1000); + Thread.sleep(5000); } catch (InterruptedException ie) { /* ignore */ } break; // out of the while, without updating indexedChangeNumber } diff --git a/whelk-core/src/main/groovy/whelk/Whelk.groovy b/whelk-core/src/main/groovy/whelk/Whelk.groovy index 79d0d0e365..40e98c9dfc 100644 --- a/whelk-core/src/main/groovy/whelk/Whelk.groovy +++ b/whelk-core/src/main/groovy/whelk/Whelk.groovy @@ -395,7 +395,6 @@ class Whelk { return false } - //reindexUpdated(updated, preUpdateDoc) sparqlUpdater?.pollNow() return true @@ -410,7 +409,6 @@ class Whelk { return } - //reindexUpdated(updated, preUpdateDoc) sparqlUpdater?.pollNow() } From 4a84db4677bf97bb3ee119450e77efa284659364 Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Thu, 10 Apr 2025 12:38:10 +0200 Subject: [PATCH 10/17] Fix b0rken crud unit-tests. --- .../test/groovy/whelk/rest/api/CrudSpec.groovy | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rest/src/test/groovy/whelk/rest/api/CrudSpec.groovy b/rest/src/test/groovy/whelk/rest/api/CrudSpec.groovy index b31f17d716..f3a7094e35 100644 --- a/rest/src/test/groovy/whelk/rest/api/CrudSpec.groovy +++ b/rest/src/test/groovy/whelk/rest/api/CrudSpec.groovy @@ -910,7 +910,7 @@ class CrudSpec extends Specification { request.getRequestURL() >> { return new StringBuffer(BASE_URI.toString()) } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { Document doc = it.first() doc.setModified(new Date()) return doc @@ -936,7 +936,7 @@ class CrudSpec extends Specification { request.getContentType() >> { "application/ld+json" } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { throw new Exception("This shouldn't happen") } when: @@ -959,7 +959,7 @@ class CrudSpec extends Specification { request.getContentType() >> { "application/x-www-form-urlencoded" } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { throw new Exception("This shouldn't happen") } @@ -1151,7 +1151,7 @@ class CrudSpec extends Specification { request.getRequestURL() >> { return new StringBuffer(BASE_URI.toString()) } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } when: @@ -1208,7 +1208,7 @@ class CrudSpec extends Specification { storage.getMainId(_) >> { return null } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } when: @@ -1265,7 +1265,7 @@ class CrudSpec extends Specification { storage.getMainId(_) >> { return null } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } storage.followDependers(_) >> { @@ -1546,7 +1546,7 @@ class CrudSpec extends Specification { storage.getMainId(_) >> { return null } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } when: @@ -1660,7 +1660,7 @@ class CrudSpec extends Specification { storage.getMainId(_) >> { return null } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } when: @@ -1719,7 +1719,7 @@ class CrudSpec extends Specification { storage.getMainId(_) >> { return null } - storage.createDocument(_, _, _, _, _) >> { + storage.createDocument(_, _, _, _, _, _) >> { return true } when: From ca4a0065d700e4821184c1ad1642e73c76d8e3af Mon Sep 17 00:00:00 2001 From: Jannis Mohlin Tsiroyannis Date: Fri, 11 Apr 2025 10:32:23 +0200 Subject: [PATCH 11/17] Remove the 'skip index' options for whelktool and dataset loading. It is no longer relevant as indexing is no longer a direct effect of these processes, and so there is no speedup to be had. --- .../src/main/groovy/whelk/importer/ImporterMain.groovy | 10 ++-------- whelk-core/src/main/groovy/whelk/Whelk.groovy | 1 - whelktool/README.md | 2 -- .../src/main/groovy/whelk/datatool/WhelkTool.groovy | 5 ----- 4 files changed, 2 insertions(+), 16 deletions(-) diff --git a/importers/src/main/groovy/whelk/importer/ImporterMain.groovy b/importers/src/main/groovy/whelk/importer/ImporterMain.groovy index 7388e5ce60..698762e301 100644 --- a/importers/src/main/groovy/whelk/importer/ImporterMain.groovy +++ b/importers/src/main/groovy/whelk/importer/ImporterMain.groovy @@ -39,12 +39,9 @@ class ImporterMain { } @Command(args='SOURCE_URL DATASET_URI [DATASET_DESCRIPTION_FILE]', - flags='--skip-index --replace-main-ids --force-delete --skip-dependers') + flags='--replace-main-ids --force-delete --skip-dependers') void dataset(Map flags, String sourceUrl, String datasetUri, String datasetDescPath=null) { Whelk whelk = Whelk.createLoadedSearchWhelk(props) - if (flags['skip-index']) { - whelk.setSkipIndex(true) - } if (flags['skip-dependers']) { whelk.setSkipIndexDependers(true) } @@ -52,12 +49,9 @@ class ImporterMain { } @Command(args='DATASETS_DESCRIPTION_FILE [SOURCE_BASE_DIR] [DATASET_URI...]', - flags='--skip-index --replace-main-ids --force-delete --skip-dependers') + flags='--replace-main-ids --force-delete --skip-dependers') void datasets(Map flags, String datasetDescPath, String sourceBaseDir=null, String... onlyDatasets=null) { Whelk whelk = Whelk.createLoadedSearchWhelk(props) - if (flags['skip-index']) { - whelk.setSkipIndex(true) - } if (flags['skip-dependers']) { whelk.setSkipIndexDependers(true) } diff --git a/whelk-core/src/main/groovy/whelk/Whelk.groovy b/whelk-core/src/main/groovy/whelk/Whelk.groovy index 40e98c9dfc..a2f7fe4f2b 100644 --- a/whelk-core/src/main/groovy/whelk/Whelk.groovy +++ b/whelk-core/src/main/groovy/whelk/Whelk.groovy @@ -70,7 +70,6 @@ class Whelk { File logRoot URI baseUri = null - boolean skipIndex = false boolean skipIndexDependers = false // useCache may be set to true only when doing initial imports (temporary processes with the rest of Libris down). diff --git a/whelktool/README.md b/whelktool/README.md index 78a8b77fb6..8b166280e7 100644 --- a/whelktool/README.md +++ b/whelktool/README.md @@ -30,8 +30,6 @@ usage: whelktool [options]