Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Provides `resource-ids-streaming v1.0`
* Provides `browse-inventory v1.0`
* Provides `browse-authorities v1.0`
* Provides `indices v1.2`
* Removed `search`
* Removed `browse`

Expand All @@ -36,6 +37,7 @@
* Omit sub-resource if main value is blank ([MSEARCH-1084](https://folio-org.atlassian.net/browse/MSEARCH-1084))
* Remove excessive escaping of backslash character in sub-resources ([MSEARCH-1094](https://folio-org.atlassian.net/browse/MSEARCH-1094))
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
* Implement member tenant reindex ([MSEARCH-1100](https://folio-org.atlassian.net/browse/MSEARCH-1100))
* **Instance Search**
* Add support for searching by instance/holdings/item electronic access relationship ID ([MSEARCH-816](https://folio-org.atlassian.net/browse/MSEARCH-816))
* Normalize ISSN search ([MSEARCH-658](https://folio-org.atlassian.net/browse/MSEARCH-658))
Expand Down
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications/call-numbers in a background |
| SUB_RESOURCE_BATCH_SIZE | 100 | Defines number of sub-resources to process at a time during background indexing |
| STALE_LOCK_THRESHOLD_MS | 600000 | Threshold to consider a sub-resource lock as stale and eligible for release |
| REINDEX_MIGRATION_WORK_MEM | 64MB | PostgreSQL work_mem value for migration operations during staging table processing. Controls memory usage before PostgreSQL writes to temporary disk files. |

The module uses system user to communicate with other modules from Kafka consumers.
For production deployments you MUST specify the password for this system user via env variable:
Expand Down Expand Up @@ -478,6 +479,7 @@ x-okapi-tenant: [tenant]
x-okapi-token: [JWT_TOKEN]

{
"tenantId": "optional_specific_tenant",
"indexSettings": {
"numberOfShards": 2,
"numberOfReplicas": 4,
Expand All @@ -486,6 +488,7 @@ x-okapi-token: [JWT_TOKEN]
}

```
* `tenantId` parameter is optional and allows reindexing a specific consortium member. If not provided, reindexes all consortium members or single non-consortium tenant
* `indexSettings` parameter is optional and defines the following Elasticsearch/Opensearch index settings:
- `numberOfShards` - the number (between 1 and 100) of primary shards for the index
- `numberOfReplicas` - the number of replicas (between 0 and 100) each primary shard has
Expand Down Expand Up @@ -575,6 +578,72 @@ Only for entity type of ```instance``` we can have statuses of both Merge and Up
```status``` response field can have values of ```"MERGE_IN_PROGRESS"```, ```"MERGE_COMPLETED"``` or ```"MERGE_FAILED"``` for entity types
representing Merge step and values of ```"UPLOAD_IN_PROGRESS"```, ```"UPLOAD_COMPLETED"``` or ```"UPLOAD_FAILED"``` for the entities of Upload step.

### Tenant-Specific Reindexing in Consortia

For consortium deployments, it's often necessary to reindex data for a specific member tenant without affecting other tenants' data or shared consortium instances.
The tenant-specific reindex feature addresses this need by providing fine-grained control over which tenant's data gets reprocessed.

#### When to Use Tenant-Specific Reindex

- **Member tenant issues**: When a specific consortium member has data corruption or indexing problems
- **Selective updates**: When configuration changes only affect certain tenants
- **Maintenance operations**: When performing targeted maintenance without disrupting the entire consortium
- **Testing and troubleshooting**: When diagnosing issues specific to a single tenant

#### How It Works

1. **Data Preservation**: Shared consortium instances are preserved during tenant-specific operations
2. **Staging Process**: Data is processed through staging tables to ensure separation from other tenants
3. **Selective Cleanup**: Only documents belonging to the specified tenant are removed from OpenSearch indices
4. **Relationship Maintenance**: Instance-to-holdings/items relationships are properly maintained across the consortium

#### Example Usage

```http
# Reindex only tenant "university_library" in a consortium
POST /search/index/instance-records/reindex/full

x-okapi-tenant: consortium
x-okapi-token: [JWT_TOKEN]

{
"tenantId": "university_library",
"indexSettings": {
"numberOfReplicas": 2,
"refreshInterval": 30
}
}
```

This approach ensures that:
- Shared instances from other consortium members remain untouched
- The specified tenant's data is completely refreshed
- Index integrity is maintained throughout the process
- Other consortium members continue to have uninterrupted service

### Staging Tables

The reindexing process uses staging tables as a high-performance buffer to maximize throughput during large-scale data operations.
The staging tables are specifically designed for optimal write performance and minimal contention.

#### Performance-Optimized Design

- **Unlogged Tables**: Staging tables are unlogged for maximum write performance (no WAL overhead)
- **Partitioned Structure**: Data is partitioned to allow parallel processing across multiple workers
- **No Indexes**: Absence of indexes eliminates index maintenance overhead during bulk inserts
- **Minimal Contention**: Multiple processes can write concurrently without blocking each other

#### Staging Process Flow

1. **High-Speed Data Collection**: Multiple parallel processes load raw data from inventory services into staging tables without contention
2. **Batch Migration**: Data is moved to operational tables in optimized batches

#### Performance Benefits

- **Maximum Throughput**: Unlogged, unindexed tables allow maximum write speed during data collection
- **Reduced Contention**: Main operational tables experience minimal locking during the reindex process
- **Parallel Processing**: Partitioned staging tables enable concurrent processing across multiple workers

## API

### CQL support
Expand Down
2 changes: 1 addition & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"provides": [
{
"id": "indices",
"version": "1.1",
"version": "1.2",
"handlers": [
{
"methods": [
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/folio/search/SearchApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.resilience.annotation.EnableResilientMethods;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Folio search application.
*/
@EnableCaching
@EnableScheduling
@EnableResilientMethods
@SpringBootApplication
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.folio.search.configuration;

import static org.folio.search.configuration.SearchCacheNames.REINDEX_TARGET_TENANT_CACHE;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.folio.search.configuration.properties.CacheConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableCaching
@RequiredArgsConstructor
public class CacheConfiguration {

private final CacheConfigurationProperties cacheProperties;

@Bean
public CacheManager cacheManager() {
var cacheManager = new CaffeineCacheManager();

// Set cache names from configuration
if (cacheProperties.getCacheNames() != null && !cacheProperties.getCacheNames().isEmpty()) {
cacheManager.setCacheNames(cacheProperties.getCacheNames());
}

// Set default spec from configuration for all caches
var defaultSpec = cacheProperties.getCaffeine().getSpec();
cacheManager.setCaffeineSpec(CaffeineSpec.parse(defaultSpec));

// Register custom cache with 10-second TTL for reindex-target-tenant
cacheManager.registerCustomCache(REINDEX_TARGET_TENANT_CACHE,
Caffeine.newBuilder()
.maximumSize(500)
.expireAfterWrite(10, TimeUnit.SECONDS)
.build());

return cacheManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public class SearchCacheNames {
public static final String SEARCH_PREFERENCE_CACHE = "search-preference";
public static final String USER_TENANTS_CACHE = "user-tenants";
public static final String CONSORTIUM_TENANTS_CACHE = "consortium-tenants-cache";
//custom cache names
public static final String REINDEX_TARGET_TENANT_CACHE = "reindex-target-tenant";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.folio.search.configuration.properties;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
@ConfigurationProperties(prefix = "spring.cache")
public class CacheConfigurationProperties {

/**
* List of cache names to create.
*/
private List<String> cacheNames;

/**
* Caffeine cache specification for default caches.
*/
private Caffeine caffeine = new Caffeine();

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Caffeine {
/**
* Caffeine spec string (e.g., "maximumSize=500,expireAfterWrite=3600s").
*/
private String spec = "maximumSize=500,expireAfterWrite=3600s";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.folio.search.configuration.properties;

import jakarta.validation.constraints.Min;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* Configuration properties for index management operations.
*/
@Data
@Component
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
@ConfigurationProperties(prefix = "folio.index-management")
public class IndexManagementConfigurationProperties {

/**
* Batch size for delete-by-query operations to improve performance on large datasets.
*/
@Min(1)
private Integer deleteQueryBatchSize = 1_000;

/**
* Scroll timeout in minutes for delete-by-query operations to handle large result sets.
*/
@Min(1)
private Integer deleteQueryScrollTimeoutMinutes = 5;

/**
* Request timeout in minutes for delete-by-query operations to prevent failures on large operations.
*/
@Min(1)
private Integer deleteQueryRequestTimeoutMinutes = 30;

/**
* Whether to refresh the index after delete-by-query operation.
* Setting to false improves performance by deferring refresh.
*/
private Boolean deleteQueryRefresh = false;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.folio.search.configuration.properties;

import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

Expand All @@ -12,8 +15,12 @@
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
@ConfigurationProperties(prefix = "folio.reindex")
@Log4j2
public class ReindexConfigurationProperties {

private static final java.util.regex.Pattern WORK_MEM_VALIDATION_PATTERN =
java.util.regex.Pattern.compile("^\\d+\\s*(KB|MB|GB)$");

/**
* Defines number of locations to retrieve per inventory http request on locations reindex process.
*/
Expand All @@ -33,4 +40,33 @@ public class ReindexConfigurationProperties {
private long mergeRangePublisherRetryIntervalMs = 1000;

private int mergeRangePublisherRetryAttempts = 5;

/**
* Defines the PostgreSQL work_mem value to set for migration operations.
* This controls the amount of memory used by query operations before PostgreSQL
* starts writing data to temporary disk files. Default is '64MB'.
* Format must be a number followed by KB, MB, or GB (e.g., "64MB", "512KB", "1GB").
*/
@Pattern(regexp = "^\\d+\\s*(KB|MB|GB)$",
message = "work_mem must be a number followed by KB, MB, or GB (e.g., '64MB', '512KB', '1GB')")
private String migrationWorkMem = "64MB";

/**
* Validates the configuration properties at startup.
* This ensures that any invalid configuration fails fast during application startup.
*/
@PostConstruct
public void validateConfiguration() {
log.info("Validating reindex configuration properties...");

// Validate work_mem format
if (!WORK_MEM_VALIDATION_PATTERN.matcher(migrationWorkMem).matches()) {
var errorMsg = "Invalid work_mem configuration: " + migrationWorkMem
+ ". Must be a number followed by KB, MB, or GB (e.g., '64MB', '512KB', '1GB')";
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}

log.info("Reindex configuration validated successfully. Migration work_mem: {}", migrationWorkMem);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.folio.search.controller;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.CreateIndexRequest;
import org.folio.search.domain.dto.FolioCreateIndexResponse;
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.IndexSettings;
import org.folio.search.domain.dto.ReindexFullRequest;
import org.folio.search.domain.dto.ReindexJob;
import org.folio.search.domain.dto.ReindexRequest;
import org.folio.search.domain.dto.ReindexStatusItem;
Expand Down Expand Up @@ -51,9 +53,16 @@ public ResponseEntity<FolioIndexOperationResponse> indexRecords(List<ResourceEve
}

@Override
public ResponseEntity<Void> reindexInstanceRecords(String tenantId, IndexSettings indexSettings) {
log.info("Attempting to run full-reindex for instance records [tenant: {}]", tenantId);
reindexService.submitFullReindex(tenantId, indexSettings);
public ResponseEntity<Void> reindexInstanceRecords(String tenantId, ReindexFullRequest reindexFullRequest) {
var targetTenantId = reindexFullRequest != null && isNotBlank(reindexFullRequest.getTenantId())
? reindexFullRequest.getTenantId()
: null;
var indexSettings = reindexFullRequest != null ? reindexFullRequest.getIndexSettings() : null;

log.info("Attempting to run full-reindex for instance records [requestingTenant: {}, targetTenant: {}]",
tenantId, targetTenantId != null ? targetTenantId : "all consortium members");

reindexService.submitFullReindex(tenantId, indexSettings, targetTenantId);
return ResponseEntity.ok().build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ public class ReindexException extends RuntimeException {
public ReindexException(String errorMessage) {
super(errorMessage);
}

public ReindexException(String errorMessage, Throwable cause) {
super(errorMessage, cause);
}
}
Loading
Loading