Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ jobs:

- name: Install System Libs
run: sudo apt-get install -y openssl libapr1

- name: Setup gradle
uses: gradle/gradle-build-action@v2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Check
run: make test
env:
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ dependencies {
implementation(libs.reactor.grpc.stub)
implementation(libs.grpc.proto.util)

implementation libs.prom.exporter.server
implementation libs.micrometer.registry.prometheus
implementation libs.micrometer.ctx.prop
implementation libs.lettuce.core
Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref =

lettuce-core = "io.lettuce:lettuce-core:5.2.2.RELEASE"

prom-exporter-server = "io.prometheus:prometheus-metrics-exporter-httpserver:1.5.0"
micrometer-registry-prometheus = "io.micrometer:micrometer-registry-prometheus:1.16.0"
micrometer-ctx-prop = "io.micrometer:context-propagation:1.2.0"

Expand Down Expand Up @@ -104,7 +105,7 @@ spring-boot-starter-actuator = { module = "org.springframework.boot:spring-boot-

spring-boot-starter-test = { module = "org.springframework.boot:spring-boot-starter-test", version.ref = "spring-boot" }

testcontainers = "org.testcontainers:testcontainers:1.21.3"
testcontainers = "org.testcontainers:testcontainers:2.0.3"
testcontainers-ganache = "io.github.ganchix:testcontainers-java-module-ganache:0.0.4"

assertj = "org.assertj:assertj-core:3.23.1"
Expand Down
8 changes: 0 additions & 8 deletions src/main/kotlin/io/emeraldpay/dshackle/Starter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ fun main(args: Array<String>) {
Hooks.enableAutomaticContextPropagation()
OpenSsl.ensureAvailability()

if (!System.getProperties().containsKey("sun.net.httpserver.maxReqTime")) {
System.setProperty("sun.net.httpserver.maxReqTime", "60")
}

if (!System.getProperties().containsKey("sun.net.httpserver.maxRspTime")) {
System.setProperty("sun.net.httpserver.maxRspTime", "600")
}

// add metrics for internal reactor schedulers
Schedulers.addExecutorServiceDecorator("key") { scheduler, execService ->
val schedulerName = Scannable.from(scheduler).scanOrDefault(Attr.NAME, scheduler.javaClass.name)
Expand Down
104 changes: 8 additions & 96 deletions src/main/kotlin/io/emeraldpay/dshackle/monitoring/MonitoringSetup.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.emeraldpay.dshackle.monitoring

import com.sun.net.httpserver.HttpServer
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.config.MonitoringConfig
import io.micrometer.core.instrument.Meter
Expand All @@ -28,19 +27,10 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.core.instrument.config.MeterFilter
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import io.prometheus.metrics.exporter.httpserver.HTTPServer
import jakarta.annotation.PostConstruct
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.ServerSocket
import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.GZIPOutputStream

@Service
class MonitoringSetup(
Expand All @@ -51,20 +41,6 @@ class MonitoringSetup(
private val log = LoggerFactory.getLogger(MonitoringSetup::class.java)
}

private fun isTcpPortAvailable(host: String, port: Int): Boolean {
try {
ServerSocket().use { serverSocket ->
// setReuseAddress(false) is required only on macOS,
// otherwise the code will not work correctly on that platform
serverSocket.reuseAddress = false
serverSocket.bind(InetSocketAddress(InetAddress.getByName(host), port), 1)
return true
}
} catch (ex: java.lang.Exception) {
return false
}
}

@PostConstruct
fun setup() {
val prometheusRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
Expand Down Expand Up @@ -93,77 +69,13 @@ class MonitoringSetup(
}

if (monitoringConfig.prometheus.enabled) {
// use standard JVM server with a single thread blocking processing
// prometheus is a single thread periodic call, no reason to setup anything complex
Thread {
var started = false
val scrapeThreadCounter = AtomicInteger(0)
val scrapeExecutor = Executors.newFixedThreadPool(
4,
ThreadFactory { runnable ->
Thread(runnable, "prometheus-scrape-${scrapeThreadCounter.incrementAndGet()}").apply {
isDaemon = true
}
},
)
while (true) {
if (isTcpPortAvailable(monitoringConfig.prometheus.host, monitoringConfig.prometheus.port)) {
started = true
try {
log.info("Run Prometheus metrics on ${monitoringConfig.prometheus.host}:${monitoringConfig.prometheus.port}${monitoringConfig.prometheus.path}")
val server = HttpServer.create(
InetSocketAddress(
monitoringConfig.prometheus.host,
monitoringConfig.prometheus.port,
),
0,
)
server.executor = scrapeExecutor
server.createContext(monitoringConfig.prometheus.path) { httpExchange ->
val startTime = System.nanoTime()
try {
val response = prometheusRegistry.scrape()
val responseBytes = response.toByteArray(StandardCharsets.UTF_8)
httpExchange.responseHeaders.add("Content-Encoding", "gzip")
httpExchange.responseHeaders.add("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
httpExchange.sendResponseHeaders(200, 0)
httpExchange.responseBody.use { os ->
GZIPOutputStream(os).use { gzos ->
gzos.write(responseBytes)
}
}
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
if (durationMs > 5000) {
log.warn("Prometheus scrape served in {} ms", durationMs)
} else {
log.debug("Prometheus scrape served in {} ms", durationMs)
}
} catch (e: Exception) {
log.error("Failed to serve Prometheus metrics", e)
val messageBytes = "internal error".toByteArray(StandardCharsets.UTF_8)
runCatching {
httpExchange.responseHeaders.add("Content-Type", "text/plain; charset=utf-8")
httpExchange.sendResponseHeaders(500, messageBytes.size.toLong())
httpExchange.responseBody.use { os ->
os.write(messageBytes)
}
}
} finally {
httpExchange.close()
}
}
Thread(server::start).start()
} catch (e: IOException) {
log.error("Failed to start Prometheus Server", e)
}
} else {
if (!started) {
log.error("Can't start prometheus metrics on ${monitoringConfig.prometheus.host}:${monitoringConfig.prometheus.port}${monitoringConfig.prometheus.path}")
}
Thread.sleep(1000)
}
}
}.start()
HTTPServer
.builder()
.hostname(monitoringConfig.prometheus.host)
.port(monitoringConfig.prometheus.port)
.registry(prometheusRegistry.prometheusRegistry)
.metricsHandlerPath(monitoringConfig.prometheus.path)
.buildAndStart()
}
}
}
Loading