Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class DirectTrinoPageSource implements ConnectorPageSource {
private ConnectorPageSource current;
private final LinkedList<ConnectorPageSource> pageSourceQueue;
private long completedBytes;
private long previousCurrentReadTimeNanos;
private long totalReadTimeNanos;

public DirectTrinoPageSource(LinkedList<ConnectorPageSource> pageSourceQueue) {
this.pageSourceQueue = pageSourceQueue;
Expand All @@ -45,7 +47,7 @@ public long getCompletedBytes() {

@Override
public long getReadTimeNanos() {
return current == null ? 0 : current.getReadTimeNanos();
return totalReadTimeNanos;
}

@Override
Expand All @@ -60,6 +62,7 @@ public Page getNextPage() {
return null;
}
Page dataPage = current.getNextPage();
recordTotalReadTimeNanos();
if (dataPage == null) {
advance();
return getNextPage();
Expand All @@ -84,6 +87,7 @@ private void advance() {
throw new RuntimeException("error happens while advance and close old page source.");
}
current = pageSourceQueue.poll();
previousCurrentReadTimeNanos = 0;
}

@Override
Expand Down Expand Up @@ -114,4 +118,16 @@ public long getMemoryUsage() {
public Metrics getMetrics() {
return current == null ? Metrics.EMPTY : current.getMetrics();
}

private void recordTotalReadTimeNanos(){
if (current == null){
return ;
}

long currentReadTimeNanos = current.getReadTimeNanos();
if (previousCurrentReadTimeNanos != currentReadTimeNanos){
totalReadTimeNanos += (currentReadTimeNanos-previousCurrentReadTimeNanos);
previousCurrentReadTimeNanos = currentReadTimeNanos;
}
}
}