diff --git a/iceberg/openhouse/internalcatalog/build.gradle b/iceberg/openhouse/internalcatalog/build.gradle index b1e5c44c4..0537754f2 100644 --- a/iceberg/openhouse/internalcatalog/build.gradle +++ b/iceberg/openhouse/internalcatalog/build.gradle @@ -11,7 +11,9 @@ plugins { dependencies { implementation 'com.github.spotbugs:spotbugs-annotations:4.8.1' + implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8' api 'org.springframework.retry:spring-retry:1.3.3' + implementation 'org.springframework:spring-context-support:5.3.18' compileOnly "io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:${otel_annotations_version}" api 'io.opentelemetry:opentelemetry-api:1.47.0' api project(':client:hts') diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java index d1fae0ad5..f73b2d71e 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalCatalog.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.common.api.spec.TableUri; import com.linkedin.openhouse.common.exception.AlreadyExistsException; import com.linkedin.openhouse.common.exception.NoSuchSoftDeletedUserTableException; +import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; import com.linkedin.openhouse.internal.catalog.model.HouseTable; @@ -63,6 +64,8 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog { @Autowired MeterRegistry meterRegistry; + @Autowired TableMetadataCache tableMetadataCache; + @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { FileIO fileIO = resolveFileIO(tableIdentifier); @@ -74,7 +77,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { houseTableMapper, tableIdentifier, metricsReporter, - fileIOManager); + fileIOManager, + tableMetadataCache); } @Override diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java index 13791048c..17497f98e 100644 --- a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java @@ -12,6 +12,7 @@ import com.linkedin.openhouse.cluster.storage.StorageClient; import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient; import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient; +import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache; import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; @@ -83,6 +84,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio FileIOManager fileIOManager; + TableMetadataCache tableMetadataCache; + private static final Gson GSON = new Gson(); private static final Cache CACHE = @@ -131,7 +134,10 @@ protected void doRefresh() { protected void refreshMetadata(final String metadataLoc) { long startTime = System.currentTimeMillis(); boolean needToReload = !Objects.equal(currentMetadataLocation(), metadataLoc); - Runnable r = () -> super.refreshFromMetadataLocation(metadataLoc); + Runnable r = + () -> + super.refreshFromMetadataLocation( + metadataLoc, null, 20, this::loadTableMetadataWithCache); try { if (needToReload) { metricsReporter.executeWithStats( @@ -337,6 +343,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { updatedMtDataRef, io().newOutputFile(newMetadataLocation)), InternalCatalogMetricsConstant.METADATA_UPDATE_LATENCY, getCatalogMetricTags()); + tableMetadataCache.seed(newMetadataLocation, updatedMtDataRef); log.info( "updateMetadata to location {} succeeded, took {} ms", newMetadataLocation, @@ -354,7 +361,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)), writeSpan.end(); } - houseTable = houseTableMapper.toHouseTable(metadataToCommit, fileIO); + houseTable = houseTableMapper.toHouseTable(updatedMtDataRef, fileIO); if (base != null && (properties.containsKey(CatalogConstants.OPENHOUSE_TABLEID_KEY) && !properties @@ -387,7 +394,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)), * "forced refresh" in {@link OpenHouseInternalTableOperations#commit(TableMetadata, * TableMetadata)} */ - refreshFromMetadataLocation(newMetadataLocation); + refreshMetadata(newMetadataLocation); } if (isReplicatedTableCreate(properties)) { updateMetadataFieldForTable(metadata, newMetadataLocation); @@ -768,4 +775,9 @@ private List getIntermediateSchemasFromProps(TableMetadata metadata) { .create() .fromJson(serializedNewIntermediateSchemas, new TypeToken>() {}.getType()); } + + private TableMetadata loadTableMetadataWithCache(String metadataLocation) { + return tableMetadataCache.load( + metadataLocation, () -> TableMetadataParser.read(io(), metadataLocation)); + } } diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfiguration.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfiguration.java new file mode 100644 index 000000000..ca19b44e6 --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfiguration.java @@ -0,0 +1,35 @@ +package com.linkedin.openhouse.internal.catalog.cache; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.linkedin.openhouse.internal.catalog.config.InternalCatalogSettings; +import java.util.List; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.cache.caffeine.CaffeineCacheManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableCaching +public class CacheConfiguration { + + @Bean + @ConditionalOnMissingBean(InternalCatalogSettings.class) + public InternalCatalogSettings internalCatalogSettings() { + return new InternalCatalogSettings(); + } + + @Bean + public CacheManager internalCatalogCacheManager(InternalCatalogSettings settings) { + CaffeineCacheManager cacheManager = new CaffeineCacheManager(); + cacheManager.setAllowNullValues(false); + cacheManager.setCacheNames(List.of("tableMetadata")); + cacheManager.setCaffeine( + Caffeine.newBuilder() + .expireAfterWrite(settings.getMetadataCache().getTtl()) + .maximumSize(settings.getMetadataCache().getMaxSize()) + .recordStats()); + return cacheManager; + } +} diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/SpringTableMetadataCache.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/SpringTableMetadataCache.java new file mode 100644 index 000000000..f29504d32 --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/SpringTableMetadataCache.java @@ -0,0 +1,29 @@ +package com.linkedin.openhouse.internal.catalog.cache; + +import java.util.function.Supplier; +import org.apache.iceberg.TableMetadata; +import org.springframework.cache.annotation.CachePut; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Component; + +@Component +public class SpringTableMetadataCache implements TableMetadataCache { + + @Override + @Cacheable( + cacheManager = "internalCatalogCacheManager", + cacheNames = "tableMetadata", + key = "#metadataLocation") + public TableMetadata load(String metadataLocation, Supplier metadataLoader) { + return metadataLoader.get(); + } + + @Override + @CachePut( + cacheManager = "internalCatalogCacheManager", + cacheNames = "tableMetadata", + key = "#metadataLocation") + public TableMetadata seed(String metadataLocation, TableMetadata tableMetadata) { + return tableMetadata; + } +} diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/TableMetadataCache.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/TableMetadataCache.java new file mode 100644 index 000000000..1f10aefda --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/cache/TableMetadataCache.java @@ -0,0 +1,11 @@ +package com.linkedin.openhouse.internal.catalog.cache; + +import java.util.function.Supplier; +import org.apache.iceberg.TableMetadata; + +public interface TableMetadataCache { + + TableMetadata load(String metadataLocation, Supplier metadataLoader); + + TableMetadata seed(String metadataLocation, TableMetadata tableMetadata); +} diff --git a/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/config/InternalCatalogSettings.java b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/config/InternalCatalogSettings.java new file mode 100644 index 000000000..078a778ac --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/config/InternalCatalogSettings.java @@ -0,0 +1,19 @@ +package com.linkedin.openhouse.internal.catalog.config; + +import java.time.Duration; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class InternalCatalogSettings { + + private MetadataCache metadataCache = new MetadataCache(); + + @Getter + @Setter + public static class MetadataCache { + private Duration ttl = Duration.ofMinutes(5); + private long maxSize = 1000; + } +} diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java index cad25eb12..98587a2a3 100644 --- a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java @@ -8,6 +8,7 @@ import com.linkedin.openhouse.cluster.storage.StorageType; import com.linkedin.openhouse.cluster.storage.local.LocalStorage; import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient; +import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; import com.linkedin.openhouse.internal.catalog.model.HouseTable; @@ -34,7 +35,10 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.SneakyThrows; import org.apache.commons.compress.utils.Lists; @@ -96,6 +100,7 @@ public class OpenHouseInternalTableOperationsTest { @Mock private FSDataInputStream mockFSDataInputStream; @Mock private FSDataOutputStream mockFSDataOutputStream; + private TableMetadataCache tableMetadataCache; private OpenHouseInternalTableOperations openHouseInternalTableOperations; private OpenHouseInternalTableOperations openHouseInternalTableOperationsWithMockMetrics; @@ -107,6 +112,7 @@ private static String getTempLocation() { @BeforeEach void setup() { MockitoAnnotations.openMocks(this); + tableMetadataCache = new InMemoryTableMetadataCache(); Mockito.when(mockHouseTableMapper.toHouseTable(Mockito.any(TableMetadata.class), Mockito.any())) .thenReturn(mockHouseTable); HadoopFileIO fileIO = new HadoopFileIO(new Configuration()); @@ -119,7 +125,8 @@ void setup() { mockHouseTableMapper, TEST_TABLE_IDENTIFIER, metricsReporter, - fileIOManager); + fileIOManager, + tableMetadataCache); // Create a separate instance with mock metrics reporter for testing metrics openHouseInternalTableOperationsWithMockMetrics = @@ -129,7 +136,8 @@ void setup() { mockHouseTableMapper, TEST_TABLE_IDENTIFIER, mockMetricsReporter, - fileIOManager); + fileIOManager, + tableMetadataCache); LocalStorage localStorage = mock(LocalStorage.class); when(fileIOManager.getStorage(fileIO)).thenReturn(localStorage); @@ -1068,6 +1076,113 @@ void testCommitMetadataUpdateLatencyHasHistogramBuckets() { this::executeCommitMetadata); } + @Test + void testRefreshReusesCachedMetadataAcrossOperations() { + HouseTablePrimaryKey primaryKey = + HouseTablePrimaryKey.builder() + .databaseId(TEST_TABLE_IDENTIFIER.namespace().toString()) + .tableId(TEST_TABLE_IDENTIFIER.name()) + .build(); + when(mockHouseTableRepository.findById(primaryKey)).thenReturn(Optional.of(mockHouseTable)); + when(mockHouseTable.getTableLocation()).thenReturn("test_metadata_location"); + + OpenHouseInternalTableOperations secondOperations = + new OpenHouseInternalTableOperations( + mockHouseTableRepository, + new HadoopFileIO(new Configuration()), + mockHouseTableMapper, + TEST_TABLE_IDENTIFIER, + new MetricsReporter(new SimpleMeterRegistry(), "TEST_CATALOG", Lists.newArrayList()), + fileIOManager, + tableMetadataCache); + + try (MockedStatic parserMock = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + parserMock + .when( + () -> + TableMetadataParser.read( + Mockito.any(FileIO.class), Mockito.eq("test_metadata_location"))) + .thenReturn(BASE_TABLE_METADATA); + + openHouseInternalTableOperations.refresh(); + secondOperations.refresh(); + + parserMock.verify( + () -> + TableMetadataParser.read( + Mockito.any(FileIO.class), Mockito.eq("test_metadata_location")), + times(1)); + } + } + + @Test + void testCommitSeedsCacheForSubsequentRefresh() { + AtomicReference savedHouseTable = new AtomicReference<>(); + HouseTablePrimaryKey primaryKey = + HouseTablePrimaryKey.builder() + .databaseId(TEST_TABLE_IDENTIFIER.namespace().toString()) + .tableId(TEST_TABLE_IDENTIFIER.name()) + .build(); + when(mockHouseTableMapper.toHouseTable(Mockito.any(TableMetadata.class), Mockito.any())) + .thenAnswer( + invocation -> { + TableMetadata tableMetadata = invocation.getArgument(0); + HouseTable mappedHouseTable = + HouseTable.builder() + .databaseId(TEST_TABLE_IDENTIFIER.namespace().toString()) + .tableId(TEST_TABLE_IDENTIFIER.name()) + .tableLocation( + tableMetadata.properties().get(getCanonicalFieldName("tableLocation"))) + .build(); + savedHouseTable.set(mappedHouseTable); + return mappedHouseTable; + }); + when(mockHouseTableRepository.save(Mockito.any(HouseTable.class))) + .thenAnswer( + invocation -> { + HouseTable houseTable = invocation.getArgument(0); + savedHouseTable.set(houseTable); + return houseTable; + }); + when(mockHouseTableRepository.findById(primaryKey)) + .thenAnswer(invocation -> Optional.ofNullable(savedHouseTable.get())); + + OpenHouseInternalTableOperations refreshedOperations = + new OpenHouseInternalTableOperations( + mockHouseTableRepository, + new HadoopFileIO(new Configuration()), + mockHouseTableMapper, + TEST_TABLE_IDENTIFIER, + new MetricsReporter(new SimpleMeterRegistry(), "TEST_CATALOG", Lists.newArrayList()), + fileIOManager, + tableMetadataCache); + + Map properties = new HashMap<>(BASE_TABLE_METADATA.properties()); + properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION); + TableMetadata metadata = BASE_TABLE_METADATA.replaceProperties(properties); + + try (MockedStatic parserMock = + Mockito.mockStatic(TableMetadataParser.class, Mockito.CALLS_REAL_METHODS)) { + parserMock + .when( + () -> + TableMetadataParser.write( + Mockito.any(TableMetadata.class), + Mockito.any(org.apache.iceberg.io.OutputFile.class))) + .thenAnswer(invocation -> null); + + openHouseInternalTableOperations.doCommit(BASE_TABLE_METADATA, metadata); + refreshedOperations.refresh(); + + String committedLocation = savedHouseTable.get().getTableLocation(); + Assertions.assertEquals(committedLocation, refreshedOperations.currentMetadataLocation()); + parserMock.verify( + () -> TableMetadataParser.read(Mockito.any(FileIO.class), Mockito.eq(committedLocation)), + never()); + } + } + /** * Common test method for verifying metrics exclude both database and table tags. * @@ -1095,7 +1210,8 @@ private void testMetricExcludesDatabaseTag( mockHouseTableMapper, TEST_TABLE_IDENTIFIER, realMetricsReporter, - fileIOManager); + fileIOManager, + tableMetadataCache); // Setup test-specific mocks setupFunction.accept(operationsWithRealMetrics); @@ -1157,7 +1273,8 @@ private void testMetricHasHistogramBuckets( mockHouseTableMapper, TEST_TABLE_IDENTIFIER, realMetricsReporter, - fileIOManager); + fileIOManager, + tableMetadataCache); // Setup test-specific mocks setupFunction.accept(operationsWithRealMetrics); @@ -1861,4 +1978,19 @@ void testDoCommitCreatesOtelSpans() { tracerProvider.close(); } } + + private static final class InMemoryTableMetadataCache implements TableMetadataCache { + private final Map cache = new ConcurrentHashMap<>(); + + @Override + public TableMetadata load(String metadataLocation, Supplier metadataLoader) { + return cache.computeIfAbsent(metadataLocation, ignored -> metadataLoader.get()); + } + + @Override + public TableMetadata seed(String metadataLocation, TableMetadata tableMetadata) { + cache.put(metadataLocation, tableMetadata); + return tableMetadata; + } + } } diff --git a/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfigurationTest.java b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfigurationTest.java new file mode 100644 index 000000000..ff7f7d400 --- /dev/null +++ b/iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/cache/CacheConfigurationTest.java @@ -0,0 +1,106 @@ +package com.linkedin.openhouse.internal.catalog.cache; + +import com.linkedin.openhouse.internal.catalog.config.InternalCatalogSettings; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.TableMetadata; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.boot.test.context.assertj.AssertableApplicationContext; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cache.caffeine.CaffeineCache; +import org.springframework.cache.caffeine.CaffeineCacheManager; + +class CacheConfigurationTest { + + private final ApplicationContextRunner contextRunner = + new ApplicationContextRunner().withUserConfiguration(CacheConfiguration.class); + + private final ApplicationContextRunner tableMetadataCacheContextRunner = + new ApplicationContextRunner() + .withUserConfiguration(CacheConfiguration.class) + .withBean(SpringTableMetadataCache.class, SpringTableMetadataCache::new); + + @Test + public void testDefaultMetadataCacheConfiguration() { + contextRunner + .withBean(InternalCatalogSettings.class, InternalCatalogSettings::new) + .run(context -> assertMetadataCacheConfiguration(context, Duration.ofMinutes(5), 1000)); + } + + @Test + public void testConfiguredMetadataCacheConfiguration() { + contextRunner + .withBean(InternalCatalogSettings.class, () -> buildSettings(Duration.ofMinutes(7), 42)) + .run(context -> assertMetadataCacheConfiguration(context, Duration.ofMinutes(7), 42)); + } + + @Test + public void testSpringTableMetadataCacheUsesConfiguredTableMetadataCache() { + tableMetadataCacheContextRunner + .withBean(InternalCatalogSettings.class, () -> buildSettings(Duration.ofMinutes(7), 42)) + .run( + context -> { + CaffeineCache tableMetadataCache = + assertMetadataCacheConfiguration(context, Duration.ofMinutes(7), 42); + TableMetadataCache cache = context.getBean(TableMetadataCache.class); + String metadataLocation = "metadata-location"; + TableMetadata seededMetadata = Mockito.mock(TableMetadata.class); + AtomicInteger loadCount = new AtomicInteger(); + + cache.seed(metadataLocation, seededMetadata); + TableMetadata loadedMetadata = + cache.load( + metadataLocation, + () -> { + loadCount.incrementAndGet(); + return Mockito.mock(TableMetadata.class); + }); + + Assertions.assertSame(seededMetadata, loadedMetadata); + Assertions.assertEquals(0, loadCount.get()); + Assertions.assertSame( + seededMetadata, tableMetadataCache.get(metadataLocation, TableMetadata.class)); + }); + } + + private InternalCatalogSettings buildSettings(Duration ttl, long maxSize) { + InternalCatalogSettings settings = new InternalCatalogSettings(); + settings.getMetadataCache().setTtl(ttl); + settings.getMetadataCache().setMaxSize(maxSize); + return settings; + } + + private CaffeineCache assertMetadataCacheConfiguration( + AssertableApplicationContext context, Duration expectedTtl, long expectedMaxSize) { + Assertions.assertNull(context.getStartupFailure()); + + InternalCatalogSettings settings = context.getBean(InternalCatalogSettings.class); + Assertions.assertEquals(expectedTtl, settings.getMetadataCache().getTtl()); + Assertions.assertEquals(expectedMaxSize, settings.getMetadataCache().getMaxSize()); + + CaffeineCacheManager cacheManager = + context.getBean("internalCatalogCacheManager", CaffeineCacheManager.class); + Assertions.assertFalse(cacheManager.isAllowNullValues()); + Assertions.assertEquals(List.of("tableMetadata"), List.copyOf(cacheManager.getCacheNames())); + + CaffeineCache tableMetadataCache = (CaffeineCache) cacheManager.getCache("tableMetadata"); + Assertions.assertNotNull(tableMetadataCache); + + com.github.benmanes.caffeine.cache.Cache nativeCache = + tableMetadataCache.getNativeCache(); + Assertions.assertEquals( + expectedTtl.toNanos(), + nativeCache + .policy() + .expireAfterWrite() + .orElseThrow() + .getExpiresAfter(TimeUnit.NANOSECONDS)); + Assertions.assertEquals( + expectedMaxSize, nativeCache.policy().eviction().orElseThrow().getMaximum()); + return tableMetadataCache; + } +} diff --git a/services/common/build.gradle b/services/common/build.gradle index 83c54bfde..f005a8c6c 100644 --- a/services/common/build.gradle +++ b/services/common/build.gradle @@ -36,7 +36,6 @@ dependencies { implementation 'io.opentelemetry:opentelemetry-sdk:1.47.0' implementation 'io.opentelemetry:opentelemetry-semconv:1.14.0-alpha' implementation 'org.apache.commons:commons-lang3:3.12.0' - // version chosen to be consistent with the transitive dependency // from the springboot framework's version in other modules. testImplementation 'commons-io:commons-io:2.4' diff --git a/services/tables/build.gradle b/services/tables/build.gradle index daadd8297..271d8951e 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -43,6 +43,7 @@ dependencies { implementation 'com.cronutils:cron-utils:9.2.0' testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' + testImplementation 'org.springframework:spring-context-support:5.3.18' testImplementation(testFixtures(project(':services:common'))) testImplementation (project(':tables-test-fixtures:tables-test-fixtures_2.12')) { exclude group: 'com.linkedin.iceberg' diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogBeans.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogBeans.java new file mode 100644 index 000000000..df4c5f20d --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogBeans.java @@ -0,0 +1,28 @@ +package com.linkedin.openhouse.tables.config; + +import com.linkedin.openhouse.internal.catalog.config.InternalCatalogSettings; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties(InternalCatalogProperties.class) +public class InternalCatalogBeans { + + @Bean + public InternalCatalogSettings internalCatalogSettings(InternalCatalogProperties properties) { + InternalCatalogSettings settings = new InternalCatalogSettings(); + InternalCatalogProperties.MetadataCache metadataCacheOverrides = properties.getMetadataCache(); + + if (metadataCacheOverrides != null) { + if (metadataCacheOverrides.getTtl() != null) { + settings.getMetadataCache().setTtl(metadataCacheOverrides.getTtl()); + } + if (metadataCacheOverrides.getMaxSize() != null) { + settings.getMetadataCache().setMaxSize(metadataCacheOverrides.getMaxSize()); + } + } + + return settings; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java new file mode 100644 index 000000000..e5a59d18b --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/config/InternalCatalogProperties.java @@ -0,0 +1,21 @@ +package com.linkedin.openhouse.tables.config; + +import java.time.Duration; +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Getter +@Setter +@ConfigurationProperties(prefix = "cluster.iceberg.tables") +public class InternalCatalogProperties { + + private MetadataCache metadataCache = new MetadataCache(); + + @Getter + @Setter + public static class MetadataCache { + private Duration ttl; + private Long maxSize; + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/config/InternalCatalogBeansTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/config/InternalCatalogBeansTest.java new file mode 100644 index 000000000..39e01125a --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/config/InternalCatalogBeansTest.java @@ -0,0 +1,76 @@ +package com.linkedin.openhouse.tables.config; + +import com.linkedin.openhouse.internal.catalog.cache.CacheConfiguration; +import com.linkedin.openhouse.internal.catalog.config.InternalCatalogSettings; +import java.time.Duration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.assertj.AssertableApplicationContext; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cache.CacheManager; + +class InternalCatalogBeansTest { + + private final ApplicationContextRunner contextRunner = + new ApplicationContextRunner().withUserConfiguration(InternalCatalogBeans.class); + + private final ApplicationContextRunner crossModuleContextRunner = + new ApplicationContextRunner() + .withUserConfiguration(InternalCatalogBeans.class, CacheConfiguration.class); + + @Test + public void testDefaultInternalCatalogSettings() { + contextRunner.run( + context -> { + assertMetadataCacheOverrides(context, null, null); + assertMetadataCacheSettings(context, Duration.ofMinutes(5), 1000); + Assertions.assertFalse(context.containsBean("internalCatalogCacheManager")); + }); + } + + @Test + public void testOverriddenInternalCatalogSettings() { + contextRunner + .withPropertyValues( + "cluster.iceberg.tables.metadata-cache.ttl=7m", + "cluster.iceberg.tables.metadata-cache.max-size=42") + .run( + context -> { + assertMetadataCacheOverrides(context, Duration.ofMinutes(7), 42L); + assertMetadataCacheSettings(context, Duration.ofMinutes(7), 42); + Assertions.assertFalse(context.containsBean("internalCatalogCacheManager")); + }); + } + + @Test + public void testInternalCatalogSettingsPropagateToCacheConfiguration() { + crossModuleContextRunner + .withPropertyValues( + "cluster.iceberg.tables.metadata-cache.ttl=7m", + "cluster.iceberg.tables.metadata-cache.max-size=42") + .run( + context -> { + assertMetadataCacheOverrides(context, Duration.ofMinutes(7), 42L); + assertMetadataCacheSettings(context, Duration.ofMinutes(7), 42); + Assertions.assertNotNull(context.getBean(CacheManager.class)); + }); + } + + private void assertMetadataCacheOverrides( + AssertableApplicationContext context, Duration expectedTtl, Long expectedMaxSize) { + Assertions.assertNull(context.getStartupFailure()); + + InternalCatalogProperties properties = context.getBean(InternalCatalogProperties.class); + Assertions.assertEquals(expectedTtl, properties.getMetadataCache().getTtl()); + Assertions.assertEquals(expectedMaxSize, properties.getMetadataCache().getMaxSize()); + } + + private void assertMetadataCacheSettings( + AssertableApplicationContext context, Duration expectedTtl, long expectedMaxSize) { + Assertions.assertNull(context.getStartupFailure()); + + InternalCatalogSettings settings = context.getBean(InternalCatalogSettings.class); + Assertions.assertEquals(expectedTtl, settings.getMetadataCache().getTtl()); + Assertions.assertEquals(expectedMaxSize, settings.getMetadataCache().getMaxSize()); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java index 1ce1cae1d..dcc5b4b04 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTestWithSettableComponents.java @@ -8,6 +8,7 @@ import com.linkedin.openhouse.cluster.storage.StorageManager; import com.linkedin.openhouse.common.test.cluster.PropertyOverrideContextInitializer; import com.linkedin.openhouse.internal.catalog.OpenHouseInternalTableOperations; +import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper; import com.linkedin.openhouse.internal.catalog.model.HouseTable; @@ -26,9 +27,12 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import javax.annotation.PostConstruct; import org.apache.iceberg.BaseTable; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -88,6 +92,22 @@ private HouseTableRepository provideFailedHtsRepoWhenSave(String tableLocation) return htsRepo; } + private TableMetadataCache newTableMetadataCache() { + Map cache = new ConcurrentHashMap<>(); + return new TableMetadataCache() { + @Override + public TableMetadata load(String metadataLocation, Supplier metadataLoader) { + return cache.computeIfAbsent(metadataLocation, ignored -> metadataLoader.get()); + } + + @Override + public TableMetadata seed(String metadataLocation, TableMetadata tableMetadata) { + cache.put(metadataLocation, tableMetadata); + return tableMetadata; + } + }; + } + @Test void testNoRetryInternalRepo() throws Exception { TableIdentifier tableIdentifier = @@ -104,7 +124,8 @@ void testNoRetryInternalRepo() throws Exception { houseTableMapper, tableIdentifier, metricsReporter, - fileIOManager); + fileIOManager, + newTableMetadataCache()); ((SettableCatalogForTest) catalog).setOperation(actualOps); TableDto creationDTO = TABLE_DTO.toBuilder().tableVersion(INITIAL_TABLE_VERSION).build(); creationDTO = openHouseInternalRepository.save(creationDTO); @@ -120,7 +141,13 @@ void testNoRetryInternalRepo() throws Exception { new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()); OpenHouseInternalTableOperations mockOps = new OpenHouseInternalTableOperations( - htsRepo, fileIO, houseTableMapper, tableIdentifier, metricsReporter2, fileIOManager); + htsRepo, + fileIO, + houseTableMapper, + tableIdentifier, + metricsReporter2, + fileIOManager, + newTableMetadataCache()); OpenHouseInternalTableOperations spyOperations = Mockito.spy(mockOps); BaseTable spyOptsMockedTable = Mockito.spy(new BaseTable(spyOperations, realTable.name())); @@ -192,7 +219,8 @@ void testSaveClearsTransientCommitPropertiesDuringTransaction() throws Exception houseTableMapper, tableIdentifier, metricsReporter, - fileIOManager); + fileIOManager, + newTableMetadataCache()); ((SettableCatalogForTest) catalog).setOperation(actualOps); TableDto creationDTO = TABLE_DTO.toBuilder().tableVersion(INITIAL_TABLE_VERSION).build(); @@ -251,7 +279,13 @@ void testFailedHtsRepoWhenGet() { new MetricsReporter(this.meterRegistry, "test", Lists.newArrayList()); OpenHouseInternalTableOperations mockOps = new OpenHouseInternalTableOperations( - htsRepo, fileIO, houseTableMapper, tableIdentifier, metricsReporter, fileIOManager); + htsRepo, + fileIO, + houseTableMapper, + tableIdentifier, + metricsReporter, + fileIOManager, + newTableMetadataCache()); OpenHouseInternalTableOperations spyOperations = Mockito.spy(mockOps); BaseTable spyOptsMockedTable = Mockito.spy(