Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions iceberg/openhouse/internalcatalog/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ plugins {

dependencies {
implementation 'com.github.spotbugs:spotbugs-annotations:4.8.1'
implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8'
api 'org.springframework.retry:spring-retry:1.3.3'
implementation 'org.springframework:spring-context-support:5.3.18'
compileOnly "io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:${otel_annotations_version}"
api 'io.opentelemetry:opentelemetry-api:1.47.0'
api project(':client:hts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.common.exception.AlreadyExistsException;
import com.linkedin.openhouse.common.exception.NoSuchSoftDeletedUserTableException;
import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired MeterRegistry meterRegistry;

@Autowired TableMetadataCache tableMetadataCache;

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
FileIO fileIO = resolveFileIO(tableIdentifier);
Expand All @@ -74,7 +77,8 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
houseTableMapper,
tableIdentifier,
metricsReporter,
fileIOManager);
fileIOManager,
tableMetadataCache);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient;
import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient;
import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIOManager fileIOManager;

TableMetadataCache tableMetadataCache;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a generic file cache no? I might call it a file cache but the variable name be metadata.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two layers here: internalCatalogCacheManager can be generic for any file since it is just the Spring CacheManager, but the injected wrapper is still intentionally TableMetadataCache and specific to tablemetdata.

Right now the cache namespace is tableMetadata and the value type is still TableMetadata

We can easily reuse the generic internalCatalogCacheManager for other use cases later like this

  @CachePut(
      cacheManager = "internalCatalogCacheManager",
      cacheNames = "newUseCase",
      key = "#newUseCaseUUID")


private static final Gson GSON = new Gson();

private static final Cache<String, Integer> CACHE =
Expand Down Expand Up @@ -131,7 +134,10 @@ protected void doRefresh() {
protected void refreshMetadata(final String metadataLoc) {
long startTime = System.currentTimeMillis();
boolean needToReload = !Objects.equal(currentMetadataLocation(), metadataLoc);
Runnable r = () -> super.refreshFromMetadataLocation(metadataLoc);
Runnable r =
() ->
super.refreshFromMetadataLocation(
metadataLoc, null, 20, this::loadTableMetadataWithCache);
try {
if (needToReload) {
metricsReporter.executeWithStats(
Expand Down Expand Up @@ -337,6 +343,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
InternalCatalogMetricsConstant.METADATA_UPDATE_LATENCY,
getCatalogMetricTags());
tableMetadataCache.seed(newMetadataLocation, updatedMtDataRef);
log.info(
"updateMetadata to location {} succeeded, took {} ms",
newMetadataLocation,
Expand All @@ -354,7 +361,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
writeSpan.end();
}

houseTable = houseTableMapper.toHouseTable(metadataToCommit, fileIO);
houseTable = houseTableMapper.toHouseTable(updatedMtDataRef, fileIO);
if (base != null
&& (properties.containsKey(CatalogConstants.OPENHOUSE_TABLEID_KEY)
&& !properties
Expand Down Expand Up @@ -387,7 +394,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
* "forced refresh" in {@link OpenHouseInternalTableOperations#commit(TableMetadata,
* TableMetadata)}
*/
refreshFromMetadataLocation(newMetadataLocation);
refreshMetadata(newMetadataLocation);
}
if (isReplicatedTableCreate(properties)) {
updateMetadataFieldForTable(metadata, newMetadataLocation);
Expand Down Expand Up @@ -768,4 +775,9 @@ private List<String> getIntermediateSchemasFromProps(TableMetadata metadata) {
.create()
.fromJson(serializedNewIntermediateSchemas, new TypeToken<List<String>>() {}.getType());
}

private TableMetadata loadTableMetadataWithCache(String metadataLocation) {
return tableMetadataCache.load(
metadataLocation, () -> TableMetadataParser.read(io(), metadataLocation));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.linkedin.openhouse.internal.catalog.cache;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.openhouse.internal.catalog.config.InternalCatalogSettings;
import java.util.List;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableCaching
public class CacheConfiguration {

@Bean
@ConditionalOnMissingBean(InternalCatalogSettings.class)
public InternalCatalogSettings internalCatalogSettings() {
return new InternalCatalogSettings();
}

@Bean
public CacheManager internalCatalogCacheManager(InternalCatalogSettings settings) {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setAllowNullValues(false);
cacheManager.setCacheNames(List.of("tableMetadata"));
cacheManager.setCaffeine(
Caffeine.newBuilder()
.expireAfterWrite(settings.getMetadataCache().getTtl())
.maximumSize(settings.getMetadataCache().getMaxSize())
.recordStats());
return cacheManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.openhouse.internal.catalog.cache;

import java.util.function.Supplier;
import org.apache.iceberg.TableMetadata;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

@Component
public class SpringTableMetadataCache implements TableMetadataCache {

@Override
@Cacheable(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if there are concurrent requests for same metadata location? Probably both could miss and read from HDFS. Consider https://docs.spring.io/spring-framework/reference/integration/cache/annotations.html#cache-annotations-cacheable-synchronized ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine isn't it? This strictly reduces but does not eliminate HDFS load.

cacheManager = "internalCatalogCacheManager",
cacheNames = "tableMetadata",
key = "#metadataLocation")
public TableMetadata load(String metadataLocation, Supplier<TableMetadata> metadataLoader) {
return metadataLoader.get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the caching here per request basis? What happens for the scenario immediate get after put or multiople concurrent get after put?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Pod is my understanding.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PUT seeds cache with newest UUID after inserting to HTS.

Caching table metadata by file location is safe because the locations are immutable, UUID-based paths. Each commit writes a new file like 00003-.metadata.json.

The content at a given path never changes, so the cache key always maps to the same value. There's no invalidation problem because a new version is a new key, not an update to an old one.

Stale reads are harmless because the commit path uses compare-and-swap, not the cache. if you try to commit based on an old version, the CAS check rejects it at HTS layer. The cache only affects read performance and not correctness.

}

@Override
@CachePut(
cacheManager = "internalCatalogCacheManager",
cacheNames = "tableMetadata",
key = "#metadataLocation")
Comment on lines +22 to +25
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SO this abstracts the cache as a spring component?

Copy link
Copy Markdown
Collaborator Author

@cbb330 cbb330 Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea.SpringTableMetadataCache is the Spring-managed bean we inject as the implementation. This gives us runtime dependency injection so it allows us to override in the li-openhouse variant at runtime if needed.

And @Cacheable / @CachePut delegate the actual cache read/write through Spring’s cache abstraction rather than us calling Caffeine directly. This gives us auto-instrumentation among other things.

internalCatalogCacheManager represents a named reference to the logic actually used for implementation , and in our config that’s currently a CaffeineCacheManager.

so the end result is that callers stay behind the TableMetadataCache interface and don’t need to know about the concrete cache impl and the impl is mostly framework config.

public TableMetadata seed(String metadataLocation, TableMetadata tableMetadata) {
return tableMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.openhouse.internal.catalog.cache;

import java.util.function.Supplier;
import org.apache.iceberg.TableMetadata;

public interface TableMetadataCache {

TableMetadata load(String metadataLocation, Supplier<TableMetadata> metadataLoader);

TableMetadata seed(String metadataLocation, TableMetadata tableMetadata);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.openhouse.internal.catalog.config;

import java.time.Duration;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class InternalCatalogSettings {

private MetadataCache metadataCache = new MetadataCache();

@Getter
@Setter
public static class MetadataCache {
private Duration ttl = Duration.ofMinutes(5);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was any profiling was done on per-entry TableMetadata memory size to validate the 1000-entry max ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but it is in progress, I will post the results on the PR as a heatmap (x axis ttl, y axis size, z is hit rate)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also am working on a way to visualize our efficiency intra-request. the goal is to have the right size / ttl for cache to last for whole request where request may be up to 3 min

private long maxSize = 1000;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a per request cache, why do we need more than one entity stored and a TTL?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the per request cache is not able to emit prometheus metrics, so we can't monitor it.

hence I'm changed strategy to use per-pod cache and key with UUID -- we get metrics, better cache-hit ratio, and don't affect our concurrency model.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't affect our concurrency model.

I'm not sure.

T0 - Table Manifest State X
T1 - MAnifest is cached
T2 - Table Manifest updated to state Y
T3 - Cached value is read vs latest value and transaction fails.
T4 - Cached value is read vs latest value and transaction fails.
T5 - Cache TTLs.

A per-request cache limited lifecycle makes this a non-issue.

Copy link
Copy Markdown
Collaborator Author

@cbb330 cbb330 Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the manifest" refers to a manifest file (the Avro file that tracks a list of data files). this is distinct from "the metadata" which specifically refers to a json file that represents table state e.g. /data/openhouse/<db>/<table>/01226-03d77993-b04c-46a2-b3a8-dbb7b6b45783.metadata.json

and the metadata is immutable, once the UUID is generated and file is written it can never change.

so in the same format listed above it more accurately this:

T0 - Table Metadata State X
T1 - Metadata UUIDv1 is cached
T2 - Table Metadata updated to state Y
T3 - Cache miss because UUID is not found
T4 - Metadata UUIDv2 is cached
T5 - Metadata UUIDv1 TTLs

Copy link
Copy Markdown
Member

@abhisheknath2011 abhisheknath2011 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the approach here and why per pod cache? As there are 56 tables service pod, the issue with per pod cache is cache miss scenario could be more as most of the requests could land on different pods. What are we missing with request level cache? I believe idea was to optimize multiple metadata calls for a given requests.

Copy link
Copy Markdown
Collaborator Author

@cbb330 cbb330 Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the primary reason is because per request cache is too ephemeral to capture any metrics. and, our logs are only retained for 14days. so with 14d lookback we can't see a trend of before/after impact of caching.

second reason is because per pod cache has a benefit for the getTable path.

overall our solutions looks like this in increasing order of complexity and value.

  1. no reuse
  2. reuse a variable
  3. reuse per request cache
  4. per pod memory cache <--- we are here
  5. dedicated cache service

and this PR sets the right abstractions to get to step 5if we need to in future

}
}
Loading