From 11f5219e6d06ff0759e89dd1c3c5fa642decbf59 Mon Sep 17 00:00:00 2001 From: wangdongwei Date: Thu, 5 Sep 2024 16:59:17 +0800 Subject: [PATCH] Fix getReadTimeNanos calculation error, return a accumulated time for DirectTrinoPageSource --- .../paimon/trino/DirectTrinoPageSource.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/paimon-trino-common/src/main/java/org/apache/paimon/trino/DirectTrinoPageSource.java b/paimon-trino-common/src/main/java/org/apache/paimon/trino/DirectTrinoPageSource.java index 1144557..aed2cfe 100644 --- a/paimon-trino-common/src/main/java/org/apache/paimon/trino/DirectTrinoPageSource.java +++ b/paimon-trino-common/src/main/java/org/apache/paimon/trino/DirectTrinoPageSource.java @@ -32,6 +32,8 @@ public class DirectTrinoPageSource implements ConnectorPageSource { private ConnectorPageSource current; private final LinkedList pageSourceQueue; private long completedBytes; + private long previousCurrentReadTimeNanos; + private long totalReadTimeNanos; public DirectTrinoPageSource(LinkedList pageSourceQueue) { this.pageSourceQueue = pageSourceQueue; @@ -45,7 +47,7 @@ public long getCompletedBytes() { @Override public long getReadTimeNanos() { - return current == null ? 0 : current.getReadTimeNanos(); + return totalReadTimeNanos; } @Override @@ -60,6 +62,7 @@ public Page getNextPage() { return null; } Page dataPage = current.getNextPage(); + recordTotalReadTimeNanos(); if (dataPage == null) { advance(); return getNextPage(); @@ -84,6 +87,7 @@ private void advance() { throw new RuntimeException("error happens while advance and close old page source."); } current = pageSourceQueue.poll(); + previousCurrentReadTimeNanos = 0; } @Override @@ -114,4 +118,16 @@ public long getMemoryUsage() { public Metrics getMetrics() { return current == null ? Metrics.EMPTY : current.getMetrics(); } + + private void recordTotalReadTimeNanos(){ + if (current == null){ + return ; + } + + long currentReadTimeNanos = current.getReadTimeNanos(); + if (previousCurrentReadTimeNanos != currentReadTimeNanos){ + totalReadTimeNanos += (currentReadTimeNanos-previousCurrentReadTimeNanos); + previousCurrentReadTimeNanos = currentReadTimeNanos; + } + } }