Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.emeraldpay.dshackle.config.reload

import io.emeraldpay.dshackle.monitoring.HealthCheckSetup
import org.springframework.stereotype.Component

@Component
class ReloadConfigHealthService(
private val healthCheckSetup: HealthCheckSetup,
) {

fun reloadHealth() {
healthCheckSetup.reload()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package io.emeraldpay.dshackle.config.reload

import io.emeraldpay.dshackle.Config
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.config.HealthConfig
import io.emeraldpay.dshackle.config.HealthConfigReader
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfigReader
Expand All @@ -17,12 +19,33 @@ class ReloadConfigService(
) {
private val optionsReader = ChainOptionsReader()
private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader)
private val healthConfigReader = HealthConfigReader()

fun readUpstreamsConfig() = upstreamsConfigReader.read(config.getConfigPath().inputStream())!!
fun readUpstreamsConfig(): UpstreamsConfig =
config.getConfigPath().inputStream().use { input ->
upstreamsConfigReader.read(input)
?: throw IllegalStateException("Cluster config is not defined in ${config.getConfigPath()}")
}

fun currentUpstreamsConfig() = mainConfig.initialConfig!!

fun updateUpstreamsConfig(newConfig: UpstreamsConfig) {
mainConfig.upstreams = newConfig
}

fun readHealthConfig(): HealthConfig =
config.getConfigPath().inputStream().use { input ->
healthConfigReader.read(input) ?: HealthConfig.default()
}

fun currentHealthConfig(): HealthConfig = mainConfig.health

fun updateHealthConfig(newConfig: HealthConfig) {
val current = mainConfig.health
current.host = newConfig.host
current.port = newConfig.port
current.path = newConfig.path
current.chains.clear()
current.chains.putAll(newConfig.chains)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.config.reload

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global.Companion.chainById
import io.emeraldpay.dshackle.config.HealthConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import org.slf4j.LoggerFactory
Expand All @@ -15,6 +16,7 @@ import java.util.stream.Collectors
class ReloadConfigSetup(
private val reloadConfigService: ReloadConfigService,
private val reloadConfigUpstreamService: ReloadConfigUpstreamService,
private val reloadConfigHealthService: ReloadConfigHealthService,
) : SignalHandler {

companion object {
Expand Down Expand Up @@ -59,28 +61,45 @@ class ReloadConfigSetup(
private fun reloadConfig(): Boolean {
val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig()
val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig()
val newHealthConfig = reloadConfigService.readHealthConfig()
val currentHealthConfig = reloadConfigService.currentHealthConfig()

if (newUpstreamsConfig == currentUpstreamsConfig) {
val upstreamsChanged = newUpstreamsConfig != currentUpstreamsConfig
val healthChanged = !healthConfigEquals(newHealthConfig, currentHealthConfig)

if (!upstreamsChanged && !healthChanged) {
return false
}

val chainsToReload = analyzeDefaultOptions(
currentUpstreamsConfig.defaultOptions,
newUpstreamsConfig.defaultOptions,
)
val upstreamsAnalyzeData = analyzeUpstreams(
currentUpstreamsConfig.upstreams,
newUpstreamsConfig.upstreams,
)
if (upstreamsChanged) {
val chainsToReload = analyzeDefaultOptions(
currentUpstreamsConfig.defaultOptions,
newUpstreamsConfig.defaultOptions,
)
val upstreamsAnalyzeData = analyzeUpstreams(
currentUpstreamsConfig.upstreams,
newUpstreamsConfig.upstreams,
)

val upstreamsToRemove = upstreamsAnalyzeData.removed
.filterNot { chainsToReload.contains(it.second) }
.toSet()
val upstreamsToAdd = upstreamsAnalyzeData.added
val upstreamsToRemove = upstreamsAnalyzeData.removed
.filterNot { chainsToReload.contains(it.second) }
.toSet()
val upstreamsToAdd = upstreamsAnalyzeData.added

reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig)
reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig)

reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig)
reloadConfigUpstreamService.reloadUpstreams(
chainsToReload,
upstreamsToRemove,
upstreamsToAdd,
newUpstreamsConfig,
)
}

if (healthChanged) {
reloadConfigService.updateHealthConfig(newHealthConfig)
reloadConfigHealthService.reloadHealth()
}

return true
}
Expand Down Expand Up @@ -163,4 +182,11 @@ class ReloadConfigSetup(
val added: Set<Pair<String, Chain>> = emptySet(),
val removed: Set<Pair<String, Chain>> = emptySet(),
)

private fun healthConfigEquals(a: HealthConfig, b: HealthConfig): Boolean {
return a.host == b.host &&
a.port == b.port &&
a.path == b.path &&
a.chains == b.chains
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,21 @@ class HealthCheckSetup(
private val log = LoggerFactory.getLogger(HealthCheckSetup::class.java)
}

lateinit var server: HttpServer
@Volatile
private var server: HttpServer? = null
private val serverLock = Any()

@PostConstruct
fun start() {
if (!healthConfig.isEnabled()) {
return
synchronized(serverLock) {
startServer()
}
// use standard JVM server with a single thread blocking processing
// health check is a rare operation, no reason to set up anything complex
try {
log.info("Run Health Server on ${healthConfig.host}:${healthConfig.port}${healthConfig.path}")
server = HttpServer.create(
InetSocketAddress(
healthConfig.host,
healthConfig.port,
),
0,
)
server.createContext(healthConfig.path) { httpExchange ->
val response = if (httpExchange.requestURI.query == "detailed") {
getDetailedHealth()
} else {
getHealth()
}
val ok = response.ok
val data = response.details.joinToString("\n")
val code = if (ok) HttpStatus.OK else HttpStatus.SERVICE_UNAVAILABLE
log.debug("Health check response: ${code.value()} ${code.reasonPhrase} $data")
httpExchange.sendResponseHeaders(code.value(), data.toByteArray().size.toLong())
httpExchange.responseBody.use { os ->
os.write(data.toByteArray())
}
}
Thread(server::start).start()
} catch (e: IOException) {
log.error("Failed to start Health Server", e)
}

fun reload() {
synchronized(serverLock) {
shutdownServer()
startServer()
}
}

Expand Down Expand Up @@ -144,11 +123,56 @@ class HealthCheckSetup(
)
}

private fun startServer() {
if (!healthConfig.isEnabled()) {
return
}
// use standard JVM server with a single thread blocking processing
// health check is a rare operation, no reason to set up anything complex
try {
log.info("Run Health Server on ${healthConfig.host}:${healthConfig.port}${healthConfig.path}")
val newServer = HttpServer.create(
InetSocketAddress(
healthConfig.host,
healthConfig.port,
),
0,
)
newServer.createContext(healthConfig.path) { httpExchange ->
val response = if (httpExchange.requestURI.query == "detailed") {
getDetailedHealth()
} else {
getHealth()
}
val ok = response.ok
val data = response.details.joinToString("\n")
val code = if (ok) HttpStatus.OK else HttpStatus.SERVICE_UNAVAILABLE
log.debug("Health check response: ${code.value()} ${code.reasonPhrase} $data")
httpExchange.sendResponseHeaders(code.value(), data.toByteArray().size.toLong())
httpExchange.responseBody.use { os ->
os.write(data.toByteArray())
}
}
server = newServer
Thread(newServer::start).start()
} catch (e: IOException) {
log.error("Failed to start Health Server", e)
server = null
}
}

private fun shutdownServer() {
server?.let {
log.info("Shutting down health Server...")
it.stop(0)
server = null
}
}

@PreDestroy
fun shutdown() {
if (::server.isInitialized) {
log.info("Shutting down health Server...")
server.stop(0)
synchronized(serverLock) {
shutdownServer()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,22 @@ class GenericWsHead(
private fun registerHeadResubscribeFlux(): Disposable {
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId.get() && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
val currentConnectionId = connectionId.get()
if (
it.connectionState == WsConnection.ConnectionState.DISCONNECTED &&
(currentConnectionId == null || it.connectionId == currentConnectionId)
) {
headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
subscribed.set(false)
connected.set(false)
connectionId.set(null)
if (currentConnectionId != null) {
connectionId.compareAndSet(currentConnectionId, null)
}
} else if (it.connectionState == WsConnection.ConnectionState.CONNECTED) {
connected.set(true)
if (currentConnectionId == null) {
connectionId.compareAndSet(null, it.connectionId)
}
return@map true
}
return@map false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.emeraldpay.dshackle.Chain.POLYGON__MAINNET
import io.emeraldpay.dshackle.Config
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.config.HealthConfigReader
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.config.UpstreamsConfigReader
Expand Down Expand Up @@ -43,6 +44,7 @@ class ReloadConfigTest {

private val optionsReader = ChainOptionsReader()
private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader)
private val healthConfigReader = HealthConfigReader()

private val config = mock<Config>()
private val reloadConfigService = ReloadConfigService(config, fileResolver, mainConfig)
Expand Down Expand Up @@ -76,7 +78,8 @@ class ReloadConfigTest {
currentMultistreamHolder,
configuredUpstreams,
)
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService)
val reloadConfigHealthService = mock<ReloadConfigHealthService>()
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService)

val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream()
val initialConfig = upstreamsConfigReader.read(initialConfigIs)!!
Expand All @@ -95,6 +98,7 @@ class ReloadConfigTest {
)
verify(msEth, never()).stop()
verify(msPoly, never()).stop()
verify(reloadConfigHealthService, never()).reloadHealth()

assertEquals(3, mainConfig.upstreams!!.upstreams.size)
assertEquals(newConfig, mainConfig.upstreams)
Expand Down Expand Up @@ -127,7 +131,8 @@ class ReloadConfigTest {
currentMultistreamHolder,
configuredUpstreams,
)
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService)
val reloadConfigHealthService = mock<ReloadConfigHealthService>()
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService)
val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream()
val initialConfig = upstreamsConfigReader.read(initialConfigIs)!!
val newConfig = upstreamsConfigReader.read(newConfigFile.inputStream())!!
Expand All @@ -146,6 +151,36 @@ class ReloadConfigTest {
assertTrue(msPoly.isRunning())
assertEquals(1, mainConfig.upstreams!!.upstreams.size)
assertEquals(newConfig, mainConfig.upstreams)
verify(reloadConfigHealthService, never()).reloadHealth()
}

@Test
fun `reload health config changes`() {
val initialConfigFile = ResourceUtils.getFile("classpath:configs/reload-health-initial.yaml")
val newConfigFile = ResourceUtils.getFile("classpath:configs/reload-health-changed.yaml")

whenever(config.getConfigPath()).thenReturn(newConfigFile)

val initialUpstreams = initialConfigFile.inputStream().use { upstreamsConfigReader.read(it)!! }
mainConfig.upstreams = initialUpstreams
val initialHealth = initialConfigFile.inputStream().use { healthConfigReader.read(it)!! }
mainConfig.health = initialHealth

val reloadConfigUpstreamService = mock<ReloadConfigUpstreamService>()
val reloadConfigHealthService = mock<ReloadConfigHealthService>()
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService)

reloadConfig.handle(Signal("HUP"))

verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any())
verify(reloadConfigHealthService).reloadHealth()

assertEquals("0.0.0.0", mainConfig.health.host)
assertEquals(10003, mainConfig.health.port)
assertEquals("/readyz", mainConfig.health.path)
assertEquals(2, mainConfig.health.chains.size)
assertEquals(2, mainConfig.health.chains[ETHEREUM__MAINNET]?.minAvailable)
assertEquals(1, mainConfig.health.chains[POLYGON__MAINNET]?.minAvailable)
}

@Test
Expand All @@ -156,14 +191,16 @@ class ReloadConfigTest {
mainConfig.upstreams?.upstreams?.get(0)?.methodGroups = UpstreamsConfig.MethodGroups(setOf("new"), setOf())

val reloadConfigUpstreamService = mock<ReloadConfigUpstreamService>()
val reloadConfigHealthService = mock<ReloadConfigHealthService>()

val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService)
val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService)

whenever(config.getConfigPath()).thenReturn(initialConfigFile)

reloadConfig.handle(Signal("HUP"))

verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any())
verify(reloadConfigHealthService, never()).reloadHealth()

assertEquals(initialConfig, mainConfig.upstreams)
}
Expand Down
Loading
Loading