From 34a3da805dcee6be5be7dec21e040535ea38a72d Mon Sep 17 00:00:00 2001 From: Artem Rootman <4586640+artemrootman@users.noreply.github.com> Date: Thu, 30 Oct 2025 15:23:27 +0000 Subject: [PATCH 1/3] Add dynamic reload support for HealthConfig with runtime HealthCheck restart --- .../reload/ReloadConfigHealthService.kt | 14 +++ .../config/reload/ReloadConfigService.kt | 25 ++++- .../config/reload/ReloadConfigSetup.kt | 56 ++++++++--- .../dshackle/monitoring/HealthCheckSetup.kt | 94 ++++++++++++------- .../config/reload/ReloadConfigTest.kt | 43 ++++++++- .../configs/reload-health-changed.yaml | 44 +++++++++ .../configs/reload-health-initial.yaml | 42 +++++++++ 7 files changed, 264 insertions(+), 54 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigHealthService.kt create mode 100644 src/test/resources/configs/reload-health-changed.yaml create mode 100644 src/test/resources/configs/reload-health-initial.yaml diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigHealthService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigHealthService.kt new file mode 100644 index 000000000..d06acb287 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigHealthService.kt @@ -0,0 +1,14 @@ +package io.emeraldpay.dshackle.config.reload + +import io.emeraldpay.dshackle.monitoring.HealthCheckSetup +import org.springframework.stereotype.Component + +@Component +class ReloadConfigHealthService( + private val healthCheckSetup: HealthCheckSetup, +) { + + fun reloadHealth() { + healthCheckSetup.reload() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt index 76135cb15..a90ea8b85 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigService.kt @@ -2,6 +2,8 @@ package io.emeraldpay.dshackle.config.reload import io.emeraldpay.dshackle.Config import io.emeraldpay.dshackle.FileResolver +import io.emeraldpay.dshackle.config.HealthConfig +import io.emeraldpay.dshackle.config.HealthConfigReader import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfigReader @@ -17,12 +19,33 @@ class ReloadConfigService( ) { private val optionsReader = ChainOptionsReader() private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader) + private val healthConfigReader = HealthConfigReader() - fun readUpstreamsConfig() = upstreamsConfigReader.read(config.getConfigPath().inputStream())!! + fun readUpstreamsConfig(): UpstreamsConfig = + config.getConfigPath().inputStream().use { input -> + upstreamsConfigReader.read(input) + ?: throw IllegalStateException("Cluster config is not defined in ${config.getConfigPath()}") + } fun currentUpstreamsConfig() = mainConfig.initialConfig!! fun updateUpstreamsConfig(newConfig: UpstreamsConfig) { mainConfig.upstreams = newConfig } + + fun readHealthConfig(): HealthConfig = + config.getConfigPath().inputStream().use { input -> + healthConfigReader.read(input) ?: HealthConfig.default() + } + + fun currentHealthConfig(): HealthConfig = mainConfig.health + + fun updateHealthConfig(newConfig: HealthConfig) { + val current = mainConfig.health + current.host = newConfig.host + current.port = newConfig.port + current.path = newConfig.path + current.chains.clear() + current.chains.putAll(newConfig.chains) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt index ecece93c0..b033a737d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.config.reload import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global.Companion.chainById +import io.emeraldpay.dshackle.config.HealthConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.foundation.ChainOptions import org.slf4j.LoggerFactory @@ -15,6 +16,7 @@ import java.util.stream.Collectors class ReloadConfigSetup( private val reloadConfigService: ReloadConfigService, private val reloadConfigUpstreamService: ReloadConfigUpstreamService, + private val reloadConfigHealthService: ReloadConfigHealthService, ) : SignalHandler { companion object { @@ -59,28 +61,45 @@ class ReloadConfigSetup( private fun reloadConfig(): Boolean { val newUpstreamsConfig = reloadConfigService.readUpstreamsConfig() val currentUpstreamsConfig = reloadConfigService.currentUpstreamsConfig() + val newHealthConfig = reloadConfigService.readHealthConfig() + val currentHealthConfig = reloadConfigService.currentHealthConfig() - if (newUpstreamsConfig == currentUpstreamsConfig) { + val upstreamsChanged = newUpstreamsConfig != currentUpstreamsConfig + val healthChanged = !healthConfigEquals(newHealthConfig, currentHealthConfig) + + if (!upstreamsChanged && !healthChanged) { return false } - val chainsToReload = analyzeDefaultOptions( - currentUpstreamsConfig.defaultOptions, - newUpstreamsConfig.defaultOptions, - ) - val upstreamsAnalyzeData = analyzeUpstreams( - currentUpstreamsConfig.upstreams, - newUpstreamsConfig.upstreams, - ) + if (upstreamsChanged) { + val chainsToReload = analyzeDefaultOptions( + currentUpstreamsConfig.defaultOptions, + newUpstreamsConfig.defaultOptions, + ) + val upstreamsAnalyzeData = analyzeUpstreams( + currentUpstreamsConfig.upstreams, + newUpstreamsConfig.upstreams, + ) - val upstreamsToRemove = upstreamsAnalyzeData.removed - .filterNot { chainsToReload.contains(it.second) } - .toSet() - val upstreamsToAdd = upstreamsAnalyzeData.added + val upstreamsToRemove = upstreamsAnalyzeData.removed + .filterNot { chainsToReload.contains(it.second) } + .toSet() + val upstreamsToAdd = upstreamsAnalyzeData.added - reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig) + reloadConfigService.updateUpstreamsConfig(newUpstreamsConfig) - reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig) + reloadConfigUpstreamService.reloadUpstreams( + chainsToReload, + upstreamsToRemove, + upstreamsToAdd, + newUpstreamsConfig, + ) + } + + if (healthChanged) { + reloadConfigService.updateHealthConfig(newHealthConfig) + reloadConfigHealthService.reloadHealth() + } return true } @@ -163,4 +182,11 @@ class ReloadConfigSetup( val added: Set> = emptySet(), val removed: Set> = emptySet(), ) + + private fun healthConfigEquals(a: HealthConfig, b: HealthConfig): Boolean { + return a.host == b.host && + a.port == b.port && + a.path == b.path && + a.chains == b.chains + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/HealthCheckSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/HealthCheckSetup.kt index 30a7d3b8a..c89b5a49e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/monitoring/HealthCheckSetup.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/monitoring/HealthCheckSetup.kt @@ -37,42 +37,21 @@ class HealthCheckSetup( private val log = LoggerFactory.getLogger(HealthCheckSetup::class.java) } - lateinit var server: HttpServer + @Volatile + private var server: HttpServer? = null + private val serverLock = Any() @PostConstruct fun start() { - if (!healthConfig.isEnabled()) { - return + synchronized(serverLock) { + startServer() } - // use standard JVM server with a single thread blocking processing - // health check is a rare operation, no reason to set up anything complex - try { - log.info("Run Health Server on ${healthConfig.host}:${healthConfig.port}${healthConfig.path}") - server = HttpServer.create( - InetSocketAddress( - healthConfig.host, - healthConfig.port, - ), - 0, - ) - server.createContext(healthConfig.path) { httpExchange -> - val response = if (httpExchange.requestURI.query == "detailed") { - getDetailedHealth() - } else { - getHealth() - } - val ok = response.ok - val data = response.details.joinToString("\n") - val code = if (ok) HttpStatus.OK else HttpStatus.SERVICE_UNAVAILABLE - log.debug("Health check response: ${code.value()} ${code.reasonPhrase} $data") - httpExchange.sendResponseHeaders(code.value(), data.toByteArray().size.toLong()) - httpExchange.responseBody.use { os -> - os.write(data.toByteArray()) - } - } - Thread(server::start).start() - } catch (e: IOException) { - log.error("Failed to start Health Server", e) + } + + fun reload() { + synchronized(serverLock) { + shutdownServer() + startServer() } } @@ -144,11 +123,56 @@ class HealthCheckSetup( ) } + private fun startServer() { + if (!healthConfig.isEnabled()) { + return + } + // use standard JVM server with a single thread blocking processing + // health check is a rare operation, no reason to set up anything complex + try { + log.info("Run Health Server on ${healthConfig.host}:${healthConfig.port}${healthConfig.path}") + val newServer = HttpServer.create( + InetSocketAddress( + healthConfig.host, + healthConfig.port, + ), + 0, + ) + newServer.createContext(healthConfig.path) { httpExchange -> + val response = if (httpExchange.requestURI.query == "detailed") { + getDetailedHealth() + } else { + getHealth() + } + val ok = response.ok + val data = response.details.joinToString("\n") + val code = if (ok) HttpStatus.OK else HttpStatus.SERVICE_UNAVAILABLE + log.debug("Health check response: ${code.value()} ${code.reasonPhrase} $data") + httpExchange.sendResponseHeaders(code.value(), data.toByteArray().size.toLong()) + httpExchange.responseBody.use { os -> + os.write(data.toByteArray()) + } + } + server = newServer + Thread(newServer::start).start() + } catch (e: IOException) { + log.error("Failed to start Health Server", e) + server = null + } + } + + private fun shutdownServer() { + server?.let { + log.info("Shutting down health Server...") + it.stop(0) + server = null + } + } + @PreDestroy fun shutdown() { - if (::server.isInitialized) { - log.info("Shutting down health Server...") - server.stop(0) + synchronized(serverLock) { + shutdownServer() } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt index 69c6f40ac..c915b9bf3 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt @@ -7,6 +7,7 @@ import io.emeraldpay.dshackle.Config import io.emeraldpay.dshackle.FileResolver import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.MainConfig +import io.emeraldpay.dshackle.config.HealthConfigReader import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfigReader import io.emeraldpay.dshackle.foundation.ChainOptionsReader @@ -43,6 +44,7 @@ class ReloadConfigTest { private val optionsReader = ChainOptionsReader() private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader) + private val healthConfigReader = HealthConfigReader() private val config = mock() private val reloadConfigService = ReloadConfigService(config, fileResolver, mainConfig) @@ -76,7 +78,8 @@ class ReloadConfigTest { currentMultistreamHolder, configuredUpstreams, ) - val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + val reloadConfigHealthService = mock() + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService) val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream() val initialConfig = upstreamsConfigReader.read(initialConfigIs)!! @@ -95,6 +98,7 @@ class ReloadConfigTest { ) verify(msEth, never()).stop() verify(msPoly, never()).stop() + verify(reloadConfigHealthService, never()).reloadHealth() assertEquals(3, mainConfig.upstreams!!.upstreams.size) assertEquals(newConfig, mainConfig.upstreams) @@ -127,7 +131,8 @@ class ReloadConfigTest { currentMultistreamHolder, configuredUpstreams, ) - val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + val reloadConfigHealthService = mock() + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService) val initialConfigIs = ResourceUtils.getFile("classpath:configs/upstreams-initial.yaml").inputStream() val initialConfig = upstreamsConfigReader.read(initialConfigIs)!! val newConfig = upstreamsConfigReader.read(newConfigFile.inputStream())!! @@ -146,6 +151,36 @@ class ReloadConfigTest { assertTrue(msPoly.isRunning()) assertEquals(1, mainConfig.upstreams!!.upstreams.size) assertEquals(newConfig, mainConfig.upstreams) + verify(reloadConfigHealthService, never()).reloadHealth() + } + + @Test + fun `reload health config changes`() { + val initialConfigFile = ResourceUtils.getFile("classpath:configs/reload-health-initial.yaml") + val newConfigFile = ResourceUtils.getFile("classpath:configs/reload-health-changed.yaml") + + whenever(config.getConfigPath()).thenReturn(newConfigFile) + + val initialUpstreams = initialConfigFile.inputStream().use { upstreamsConfigReader.read(it)!! } + mainConfig.upstreams = initialUpstreams + val initialHealth = initialConfigFile.inputStream().use { healthConfigReader.read(it)!! } + mainConfig.health = initialHealth + + val reloadConfigUpstreamService = mock() + val reloadConfigHealthService = mock() + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService) + + reloadConfig.handle(Signal("HUP")) + + verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any()) + verify(reloadConfigHealthService).reloadHealth() + + assertEquals("0.0.0.0", mainConfig.health.host) + assertEquals(10003, mainConfig.health.port) + assertEquals("/readyz", mainConfig.health.path) + assertEquals(2, mainConfig.health.chains.size) + assertEquals(2, mainConfig.health.chains[ETHEREUM__MAINNET]?.minAvailable) + assertEquals(1, mainConfig.health.chains[POLYGON__MAINNET]?.minAvailable) } @Test @@ -156,14 +191,16 @@ class ReloadConfigTest { mainConfig.upstreams?.upstreams?.get(0)?.methodGroups = UpstreamsConfig.MethodGroups(setOf("new"), setOf()) val reloadConfigUpstreamService = mock() + val reloadConfigHealthService = mock() - val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService) + val reloadConfig = ReloadConfigSetup(reloadConfigService, reloadConfigUpstreamService, reloadConfigHealthService) whenever(config.getConfigPath()).thenReturn(initialConfigFile) reloadConfig.handle(Signal("HUP")) verify(reloadConfigUpstreamService, never()).reloadUpstreams(any(), any(), any(), any()) + verify(reloadConfigHealthService, never()).reloadHealth() assertEquals(initialConfig, mainConfig.upstreams) } diff --git a/src/test/resources/configs/reload-health-changed.yaml b/src/test/resources/configs/reload-health-changed.yaml new file mode 100644 index 000000000..356edc9ea --- /dev/null +++ b/src/test/resources/configs/reload-health-changed.yaml @@ -0,0 +1,44 @@ +cluster: + upstreams: + - id: local1 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local2 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local3 + chain: polygon + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" +health: + host: 0.0.0.0 + port: 10003 + path: /readyz + blockchains: + - chain: ethereum + min-available: 2 + - chain: polygon + min-available: 1 diff --git a/src/test/resources/configs/reload-health-initial.yaml b/src/test/resources/configs/reload-health-initial.yaml new file mode 100644 index 000000000..67a3c729d --- /dev/null +++ b/src/test/resources/configs/reload-health-initial.yaml @@ -0,0 +1,42 @@ +cluster: + upstreams: + - id: local1 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local2 + chain: ethereum + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" + - id: local3 + chain: polygon + labels: + provider: provider1 + connection: + ethereum-pos: + execution: + rpc: + url: "http://localhost" + ws: + url: "ws://localhost" +health: + host: 127.0.0.1 + port: 8082 + path: /health + blockchains: + - chain: ethereum + min-available: 1 From a09e264dcaea01923d2f7aec8728aedf865f1009 Mon Sep 17 00:00:00 2001 From: Artem Rootman <4586640+artemrootman@users.noreply.github.com> Date: Thu, 30 Oct 2025 15:31:17 +0000 Subject: [PATCH 2/3] fix: reorder imports in ReloadConfigTest for ktlint --- .../io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt index c915b9bf3..ae6eed738 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt @@ -6,8 +6,8 @@ import io.emeraldpay.dshackle.Chain.POLYGON__MAINNET import io.emeraldpay.dshackle.Config import io.emeraldpay.dshackle.FileResolver import io.emeraldpay.dshackle.cache.Caches -import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.config.HealthConfigReader +import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfigReader import io.emeraldpay.dshackle.foundation.ChainOptionsReader From 71053283823fe295851085cf846bad7978a5af86 Mon Sep 17 00:00:00 2001 From: Artem Rootman <4586640+artemrootman@users.noreply.github.com> Date: Thu, 30 Oct 2025 15:41:28 +0000 Subject: [PATCH 3/3] fix: ensure ws head liveness emits on disconnect --- .../dshackle/upstream/ethereum/GenericWsHead.kt | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 8523a12e4..e0511216e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -204,13 +204,22 @@ class GenericWsHead( private fun registerHeadResubscribeFlux(): Disposable { val connectionStates = wsSubscriptions.connectionInfoFlux() .map { - if (it.connectionId == connectionId.get() && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) { + val currentConnectionId = connectionId.get() + if ( + it.connectionState == WsConnection.ConnectionState.DISCONNECTED && + (currentConnectionId == null || it.connectionId == currentConnectionId) + ) { headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } subscribed.set(false) connected.set(false) - connectionId.set(null) + if (currentConnectionId != null) { + connectionId.compareAndSet(currentConnectionId, null) + } } else if (it.connectionState == WsConnection.ConnectionState.CONNECTED) { connected.set(true) + if (currentConnectionId == null) { + connectionId.compareAndSet(null, it.connectionId) + } return@map true } return@map false