Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ lazy val node = (project in file("node"))
apiServerDependencies ++ commonDependencies ++ kamonDependencies ++ protobufDependencies ++ Seq(
catsCore,
catsRetry,
influxdb,
grpcNetty,
grpcServices,
jline,
Expand Down
8 changes: 7 additions & 1 deletion node/src/main/resources/kamon.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ kamon {
join-remote-parents-with-same-span-id = true
}

metric.tick-interval = 1 second

# Config for streaming metrics
influxdb {
# InfluxDB server hostname and UDP port
hostname = "127.0.0.1"
hostname = "https://us-east-1-1.aws.cloud2.influxdata.com"
port = 8089
token = "put token here"
database = "put bucket here"
organisation = "put organisation here"
precision = "ms"

# Max packet size for UDP metrics data sent to InfluxDB
max-packet-size = 1024 bytes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,70 +1,74 @@
package coop.rchain.node.diagnostics

import cats.effect.IO
import cats.effect.kernel.Async
import cats.effect.std.Supervisor
import cats.effect.unsafe.implicits.global
import cats.effect.kernel.{Async, Sync}
import cats.implicits.catsSyntaxOptionId
import com.influxdb.v3.client.InfluxDBClient
import com.influxdb.v3.client.config.ClientConfig
import com.typesafe.config.Config
import coop.rchain.node.diagnostics.BatchInfluxDBReporter.Settings
import coop.rchain.node.diagnostics.UdpInfluxDBReporter.Settings
import coop.rchain.shared.Log
import fs2.concurrent.Channel
import kamon.metric._
import kamon.Kamon
import kamon.influxdb.InfluxDBReporter
import kamon.module.{MetricReporter, ModuleFactory}
import kamon.metric._
import kamon.module.MetricReporter
import kamon.status.Environment
import kamon.tag.{Tag, TagSet}
import kamon.util.EnvironmentTags
import okhttp3._
import org.slf4j.LoggerFactory

import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.util.Try

@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class BatchInfluxDBReporter[F[_]: Async](
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.Var"))
class BatchInfluxDBReporter[F[_]: Async: Log](
dispatcher: cats.effect.std.Dispatcher[F],
config: Config = Kamon.config()
) extends MetricReporter {
private val logger = LoggerFactory.getLogger(classOf[BatchInfluxDBReporter[F]])
@SuppressWarnings(Array("org.wartremover.warts.Var"))
private var settings = readSettings(config)
private val client = buildClient(settings)
private val subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]])
private val logger = LoggerFactory.getLogger(classOf[BatchInfluxDBReporter[F]])
private val c = readSettings(config)
private var clientConfig = c._1
private var settings = c._2

start()
private val client = InfluxDBClient.getInstance(clientConfig)
private var subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]])

override def stop(): Unit = subject.send(None) // finish stream
override def stop(): Unit =
// finish stream
subject.send(None)

override def reconfigure(config: Config): Unit = {
stop()
settings = readSettings(config)
val c = readSettings(config)
clientConfig = c._1
settings = c._2
start()
}

override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit =
dispatcher.unsafeRunSync(subject.send(Seq(translateToLineProtocol(snapshot)).some))
dispatcher.unsafeRunSync {
subject.send(Seq(translateToLineProtocol(snapshot)).some)
}

private def start(): Unit = {
dispatcher.unsafeRunSync(Log[F].info(s"Starting BatchInfluxDBReporter with: ${clientConfig}"))
subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]])
// TODO implement accumulation over time interval settings.batchInterval
val batching = subject.stream.unNoneTerminate.evalMap(postMetrics).compile
dispatcher.unsafeRunAndForget(batching.drain)
}

private def readSettings(config: Config): Settings = {
private def readSettings(config: Config): (ClientConfig, Settings) = {
import scala.jdk.CollectionConverters._
val root = config.getConfig("kamon.influxdb")
val host = root.getString("hostname")
val authConfig = Try(root.getConfig("authentication")).toOption
val credentials =
authConfig.map(conf => Credentials.basic(conf.getString("user"), conf.getString("password")))
val port = root.getInt("port")
val database = root.getString("database")
val protocol = root.getString("protocol").toLowerCase
val url = s"$protocol://$host:$port/write?precision=ms&db=$database"
val root = config.getConfig("kamon.influxdb")
val host = root.getString("hostname")
val organisations = root.getString("organisation")
val accessToken = root.getString("token").toCharArray
val port = root.getInt("port")
val database = root.getString("database")
// val protocol = root.getString("protocol").toLowerCase
// val url = s"$protocol://$host:$port/write?precision=ms&db=$database"
val interval =
if (root.hasPath("batch-interval"))
Duration.fromNanos(root.getDuration("batch-interval").toNanos)
Expand All @@ -74,49 +78,27 @@ class BatchInfluxDBReporter[F[_]: Async](
val additionalTags =
EnvironmentTags.from(Environment.from(config), root.getConfig("additional-tags"))

Settings(
url,
val b = new ClientConfig.Builder
val cfg = b
.host(host)
.organization(organisations)
.database(database)
.token(accessToken)
.build()

val settings = Settings(
interval,
root.getDoubleList("percentiles").asScala.map(_.toDouble).toSeq,
credentials,
additionalTags,
precision
)

cfg -> settings

}

private def postMetrics(metrics: Seq[String]): F[Unit] =
Async[F].async_ {
case cb =>
val body = RequestBody.create(MediaType.parse("text/plain"), metrics.mkString)
val request = new Request.Builder()
.url(settings.url)
.post(body)
.build()

client
.newCall(request)
.enqueue(
new Callback {
def onFailure(call: Call, e: IOException): Unit = {
logger.error("Failed to POST metrics to InfluxDB", e)
cb(Right(()))
}

def onResponse(call: Call, response: Response): Unit = {
if (response.isSuccessful)
logger.trace("Successfully sent metrics to InfluxDB")
else {
logger.error(
"Metrics POST to InfluxDB failed with status code [{}], response body: {}",
response.code(),
response.body().string()
)
}
cb(Right(()))
}
}
)
}
Sync[F].delay { Try { client.writeRecord(metrics.mkString) } }

private def translateToLineProtocol(periodSnapshot: PeriodSnapshot): String = {
import periodSnapshot._
Expand Down Expand Up @@ -264,28 +246,4 @@ class BatchInfluxDBReporter[F[_]: Async](
.append(timestamp)
.append("\n")

protected def buildClient(settings: Settings): OkHttpClient = {
val basicBuilder = new OkHttpClient.Builder()
val authenticator = settings.credentials.map(
credentials =>
new Authenticator() {
def authenticate(route: Route, response: Response): Request =
response.request().newBuilder().header("Authorization", credentials).build()
}
)
authenticator
.foldLeft(basicBuilder) { case (builder, auth) => builder.authenticator(auth) }
.build()
}
}

object BatchInfluxDBReporter {
final case class Settings(
url: String,
batchInterval: FiniteDuration,
percentiles: Seq[Double],
credentials: Option[String],
additionalTags: TagSet,
measurementPrecision: String
)
}
Loading