Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.emeraldpay.dshackle.upstream.ripple

import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import reactor.core.publisher.Flux

class RippleLowerBoundStateDetector(
Expand All @@ -15,10 +19,41 @@ class RippleLowerBoundStateDetector(
}

override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return Flux.just(LowerBoundData(1, LowerBoundType.STATE))
return upstream.getIngressReader()
.read(ChainRequest("server_state", ListParams()))
.timeout(Defaults.internalCallsTimeout)
.map {
val resp = Global.objectMapper.readValue(it.getResult(), RippleState::class.java)
parseCompleteLedgersLowerBound(resp.state.completeLedgers)
}
.filter { it != null }
.map { it!! }
.flatMapMany { lowerBound ->
Flux.fromIterable(listOf(LowerBoundData(lowerBound, LowerBoundType.STATE)))
}
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.STATE)
}

private fun parseCompleteLedgersLowerBound(completeLedgers: String?): Long? {
if (completeLedgers.isNullOrBlank()) {
return null
}
if (completeLedgers == "empty") {
return null
}

return completeLedgers.split(",")
.mapNotNull { part ->
val token = part.trim()
if (token.isEmpty()) {
null
} else {
token.substringBefore("-").toLongOrNull()
}
}
.minOrNull()
}
}
Loading