diff --git a/build.sbt b/build.sbt index cda4b9726b1..a8a598513cc 100644 --- a/build.sbt +++ b/build.sbt @@ -298,6 +298,7 @@ lazy val node = (project in file("node")) apiServerDependencies ++ commonDependencies ++ kamonDependencies ++ protobufDependencies ++ Seq( catsCore, catsRetry, + influxdb, grpcNetty, grpcServices, jline, diff --git a/node/src/main/resources/kamon.conf b/node/src/main/resources/kamon.conf index 3c981719244..198a7cfc1ee 100644 --- a/node/src/main/resources/kamon.conf +++ b/node/src/main/resources/kamon.conf @@ -6,11 +6,17 @@ kamon { join-remote-parents-with-same-span-id = true } + metric.tick-interval = 1 second + # Config for streaming metrics influxdb { # InfluxDB server hostname and UDP port - hostname = "127.0.0.1" + hostname = "https://us-east-1-1.aws.cloud2.influxdata.com" port = 8089 + token = "put token here" + database = "put bucket here" + organisation = "put organisation here" + precision = "ms" # Max packet size for UDP metrics data sent to InfluxDB max-packet-size = 1024 bytes diff --git a/node/src/main/scala/coop/rchain/node/diagnostics/BatchInfluxDBReporter.scala b/node/src/main/scala/coop/rchain/node/diagnostics/BatchInfluxDBReporter.scala index 7b89f101ce5..fafa0522c79 100644 --- a/node/src/main/scala/coop/rchain/node/diagnostics/BatchInfluxDBReporter.scala +++ b/node/src/main/scala/coop/rchain/node/diagnostics/BatchInfluxDBReporter.scala @@ -1,70 +1,74 @@ package coop.rchain.node.diagnostics -import cats.effect.IO -import cats.effect.kernel.Async -import cats.effect.std.Supervisor -import cats.effect.unsafe.implicits.global +import cats.effect.kernel.{Async, Sync} import cats.implicits.catsSyntaxOptionId +import com.influxdb.v3.client.InfluxDBClient +import com.influxdb.v3.client.config.ClientConfig import com.typesafe.config.Config -import coop.rchain.node.diagnostics.BatchInfluxDBReporter.Settings +import coop.rchain.node.diagnostics.UdpInfluxDBReporter.Settings +import coop.rchain.shared.Log import fs2.concurrent.Channel -import kamon.metric._ import kamon.Kamon -import kamon.influxdb.InfluxDBReporter -import kamon.module.{MetricReporter, ModuleFactory} +import kamon.metric._ +import kamon.module.MetricReporter import kamon.status.Environment import kamon.tag.{Tag, TagSet} import kamon.util.EnvironmentTags -import okhttp3._ import org.slf4j.LoggerFactory -import java.io.IOException import java.time.Instant import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import scala.util.Try -@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -class BatchInfluxDBReporter[F[_]: Async]( +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.Var")) +class BatchInfluxDBReporter[F[_]: Async: Log]( dispatcher: cats.effect.std.Dispatcher[F], config: Config = Kamon.config() ) extends MetricReporter { - private val logger = LoggerFactory.getLogger(classOf[BatchInfluxDBReporter[F]]) - @SuppressWarnings(Array("org.wartremover.warts.Var")) - private var settings = readSettings(config) - private val client = buildClient(settings) - private val subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]]) + private val logger = LoggerFactory.getLogger(classOf[BatchInfluxDBReporter[F]]) + private val c = readSettings(config) + private var clientConfig = c._1 + private var settings = c._2 - start() + private val client = InfluxDBClient.getInstance(clientConfig) + private var subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]]) - override def stop(): Unit = subject.send(None) // finish stream + override def stop(): Unit = + // finish stream + subject.send(None) override def reconfigure(config: Config): Unit = { stop() - settings = readSettings(config) + val c = readSettings(config) + clientConfig = c._1 + settings = c._2 start() } override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = - dispatcher.unsafeRunSync(subject.send(Seq(translateToLineProtocol(snapshot)).some)) + dispatcher.unsafeRunSync { + subject.send(Seq(translateToLineProtocol(snapshot)).some) + } private def start(): Unit = { + dispatcher.unsafeRunSync(Log[F].info(s"Starting BatchInfluxDBReporter with: ${clientConfig}")) + subject = dispatcher.unsafeRunSync(Channel.unbounded[F, Option[Seq[String]]]) // TODO implement accumulation over time interval settings.batchInterval val batching = subject.stream.unNoneTerminate.evalMap(postMetrics).compile dispatcher.unsafeRunAndForget(batching.drain) } - private def readSettings(config: Config): Settings = { + private def readSettings(config: Config): (ClientConfig, Settings) = { import scala.jdk.CollectionConverters._ - val root = config.getConfig("kamon.influxdb") - val host = root.getString("hostname") - val authConfig = Try(root.getConfig("authentication")).toOption - val credentials = - authConfig.map(conf => Credentials.basic(conf.getString("user"), conf.getString("password"))) - val port = root.getInt("port") - val database = root.getString("database") - val protocol = root.getString("protocol").toLowerCase - val url = s"$protocol://$host:$port/write?precision=ms&db=$database" + val root = config.getConfig("kamon.influxdb") + val host = root.getString("hostname") + val organisations = root.getString("organisation") + val accessToken = root.getString("token").toCharArray + val port = root.getInt("port") + val database = root.getString("database") +// val protocol = root.getString("protocol").toLowerCase +// val url = s"$protocol://$host:$port/write?precision=ms&db=$database" val interval = if (root.hasPath("batch-interval")) Duration.fromNanos(root.getDuration("batch-interval").toNanos) @@ -74,49 +78,27 @@ class BatchInfluxDBReporter[F[_]: Async]( val additionalTags = EnvironmentTags.from(Environment.from(config), root.getConfig("additional-tags")) - Settings( - url, + val b = new ClientConfig.Builder + val cfg = b + .host(host) + .organization(organisations) + .database(database) + .token(accessToken) + .build() + + val settings = Settings( interval, root.getDoubleList("percentiles").asScala.map(_.toDouble).toSeq, - credentials, additionalTags, precision ) + + cfg -> settings + } private def postMetrics(metrics: Seq[String]): F[Unit] = - Async[F].async_ { - case cb => - val body = RequestBody.create(MediaType.parse("text/plain"), metrics.mkString) - val request = new Request.Builder() - .url(settings.url) - .post(body) - .build() - - client - .newCall(request) - .enqueue( - new Callback { - def onFailure(call: Call, e: IOException): Unit = { - logger.error("Failed to POST metrics to InfluxDB", e) - cb(Right(())) - } - - def onResponse(call: Call, response: Response): Unit = { - if (response.isSuccessful) - logger.trace("Successfully sent metrics to InfluxDB") - else { - logger.error( - "Metrics POST to InfluxDB failed with status code [{}], response body: {}", - response.code(), - response.body().string() - ) - } - cb(Right(())) - } - } - ) - } + Sync[F].delay { Try { client.writeRecord(metrics.mkString) } } private def translateToLineProtocol(periodSnapshot: PeriodSnapshot): String = { import periodSnapshot._ @@ -264,28 +246,4 @@ class BatchInfluxDBReporter[F[_]: Async]( .append(timestamp) .append("\n") - protected def buildClient(settings: Settings): OkHttpClient = { - val basicBuilder = new OkHttpClient.Builder() - val authenticator = settings.credentials.map( - credentials => - new Authenticator() { - def authenticate(route: Route, response: Response): Request = - response.request().newBuilder().header("Authorization", credentials).build() - } - ) - authenticator - .foldLeft(basicBuilder) { case (builder, auth) => builder.authenticator(auth) } - .build() - } -} - -object BatchInfluxDBReporter { - final case class Settings( - url: String, - batchInterval: FiniteDuration, - percentiles: Seq[Double], - credentials: Option[String], - additionalTags: TagSet, - measurementPrecision: String - ) } diff --git a/node/src/main/scala/coop/rchain/node/diagnostics/UdpInfluxDBReporter.scala b/node/src/main/scala/coop/rchain/node/diagnostics/UdpInfluxDBReporter.scala index c892c96d542..abd01c4f6bb 100644 --- a/node/src/main/scala/coop/rchain/node/diagnostics/UdpInfluxDBReporter.scala +++ b/node/src/main/scala/coop/rchain/node/diagnostics/UdpInfluxDBReporter.scala @@ -14,197 +14,198 @@ import kamon.util.EnvironmentTags import java.time.Instant import java.util.concurrent.TimeUnit - -// TODO use Dispatcher to execute inside F context? -@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -class UdpInfluxDBReporter(config: Config = Kamon.config()) extends MetricReporter { - - @SuppressWarnings(Array("org.wartremover.warts.Var")) - private var settings: Settings = readConfiguration(config) - private val clientChannel: DatagramChannel = DatagramChannel.open() - - override def stop(): Unit = {} - - override def reconfigure(config: Config): Unit = - settings = readConfiguration(config) - - protected def getTimestamp(instant: Instant): String = - settings.measurementPrecision match { - case "s" => - instant.getEpochSecond.toString - case "ms" => - instant.toEpochMilli.toString - case "u" | "µ" => - ((BigInt(instant.getEpochSecond) * 1000000) + TimeUnit.NANOSECONDS.toMicros( - instant.getNano.toLong - )).toString - case "ns" => - ((BigInt(instant.getEpochSecond) * 1000000000) + instant.getNano).toString - } - - private def readConfiguration(config: Config): Settings = { - import scala.jdk.CollectionConverters._ - val influxConfig = config.getConfig("kamon.influxdb") - val address = - new InetSocketAddress(influxConfig.getString("hostname"), influxConfig.getInt("port")) - val maxPacketSize = influxConfig.getBytes("max-packet-size") - val percentiles = influxConfig.getDoubleList("percentiles").asScala.map(_.toDouble).toSeq - val precision = influxConfig.getString("precision") - val additionalTags = - EnvironmentTags.from(Environment.from(config), config.getConfig("additional-tags")) - - Settings(address, maxPacketSize, percentiles, additionalTags, precision) - } - - def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { - import snapshot._ - val packetBuffer = - new MetricDataPacketBuffer(settings.maxPacketSize, clientChannel, settings.address) - val builder = new StringBuilder - val timestamp = getTimestamp(snapshot.to) - - counters.foreach { c => - writeLongMetricValue(builder, c, "count", timestamp) - packetBuffer.appendMeasurement(builder.toString) - builder.clear() - } - - gauges.foreach { g => - writeDoubleMetricValue(builder, g, "value", timestamp) - packetBuffer.appendMeasurement(builder.toString) - builder.clear() - } - - histograms.foreach { h => - writeMetricDistribution(builder, h, settings.percentiles, timestamp) - packetBuffer.appendMeasurement(builder.toString) - builder.clear() - } - - rangeSamplers.foreach { rs => - writeMetricDistribution(builder, rs, settings.percentiles, timestamp) - packetBuffer.appendMeasurement(builder.toString) - builder.clear() - } - } - - private def writeLongMetricValue( - builder: StringBuilder, - metric: MetricSnapshot.Values[Long], - fieldName: String, - timestamp: String - ): Unit = - metric.instruments.foreach { instrument => - writeNameAndTags(builder, metric.name, instrument.tags) - writeIntField(builder, fieldName, instrument.value, appendSeparator = false) - writeTimestamp(builder, timestamp) - } - - private def writeDoubleMetricValue( - builder: StringBuilder, - metric: MetricSnapshot.Values[Double], - fieldName: String, - timestamp: String - ): Unit = - metric.instruments.foreach { instrument => - writeNameAndTags(builder, metric.name, instrument.tags) - writeDoubleField(builder, fieldName, instrument.value, appendSeparator = false) - writeTimestamp(builder, timestamp) - } - - private def writeMetricDistribution( - builder: StringBuilder, - metric: MetricSnapshot.Distributions, - percentiles: Seq[Double], - timestamp: String - ): Unit = - metric.instruments.foreach { instrument => - if (instrument.value.count > 0) { - writeNameAndTags(builder, metric.name, instrument.tags) - writeIntField(builder, "count", instrument.value.count) - writeIntField(builder, "sum", instrument.value.sum) - writeIntField(builder, "mean", instrument.value.sum / instrument.value.count) - writeIntField(builder, "min", instrument.value.min) - - percentiles.foreach { p => - writeDoubleField( - builder, - "p" + String.valueOf(p), - instrument.value.percentile(p).value.toDouble - ) - } - - writeIntField(builder, "max", instrument.value.max, appendSeparator = false) - writeTimestamp(builder, timestamp) - } - } - - private def writeNameAndTags(builder: StringBuilder, name: String, metricTags: TagSet): Unit = { - builder - .append(escapeName(name)) - - val tags = (if (settings.additionalTags.nonEmpty()) metricTags.withTags(settings.additionalTags) - else metricTags).all() - - if (tags.nonEmpty) { - tags.foreach { t => - builder - .append(',') - .append(escapeString(t.key)) - .append("=") - .append(escapeString(Tag.unwrapValue(t).toString)) - } - } - - builder.append(' ') - } - - private def escapeName(in: String): String = - in.replace(" ", "\\ ") - .replace(",", "\\,") - - private def escapeString(in: String): String = - in.replace(" ", "\\ ") - .replace("=", "\\=") - .replace(",", "\\,") - - def writeDoubleField( - builder: StringBuilder, - fieldName: String, - value: Double, - appendSeparator: Boolean = true - ): Unit = { - builder - .append(fieldName) - .append('=') - .append(String.valueOf(value)) - - if (appendSeparator) - builder.append(',') - } - - def writeIntField( - builder: StringBuilder, - fieldName: String, - value: Long, - appendSeparator: Boolean = true - ): Unit = { - builder - .append(fieldName) - .append('=') - .append(String.valueOf(value)) - .append('i') - - if (appendSeparator) - builder.append(',') - } - - def writeTimestamp(builder: StringBuilder, timestamp: String): Unit = - builder - .append(' ') - .append(timestamp) - .append("\n") -} +import scala.concurrent.duration.FiniteDuration + +//// TODO use Dispatcher to execute inside F context? +//@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) +//class UdpInfluxDBReporter(config: Config = Kamon.config()) extends MetricReporter { +// +// @SuppressWarnings(Array("org.wartremover.warts.Var")) +// private var settings: Settings = readConfiguration(config) +// private val clientChannel: DatagramChannel = DatagramChannel.open() +// +// override def stop(): Unit = {} +// +// override def reconfigure(config: Config): Unit = +// settings = readConfiguration(config) +// +// protected def getTimestamp(instant: Instant): String = +// settings.measurementPrecision match { +// case "s" => +// instant.getEpochSecond.toString +// case "ms" => +// instant.toEpochMilli.toString +// case "u" | "µ" => +// ((BigInt(instant.getEpochSecond) * 1000000) + TimeUnit.NANOSECONDS.toMicros( +// instant.getNano.toLong +// )).toString +// case "ns" => +// ((BigInt(instant.getEpochSecond) * 1000000000) + instant.getNano).toString +// } +// +// private def readConfiguration(config: Config): Settings = { +// import scala.jdk.CollectionConverters._ +// val influxConfig = config.getConfig("kamon.influxdb") +// val address = +// new InetSocketAddress(influxConfig.getString("hostname"), influxConfig.getInt("port")) +// val maxPacketSize = influxConfig.getBytes("max-packet-size") +// val percentiles = influxConfig.getDoubleList("percentiles").asScala.map(_.toDouble).toSeq +// val precision = influxConfig.getString("precision") +// val additionalTags = +// EnvironmentTags.from(Environment.from(config), config.getConfig("additional-tags")) +// +// Settings(address, maxPacketSize, percentiles, additionalTags, precision) +// } +// +// def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { +// import snapshot._ +// val packetBuffer = +// new MetricDataPacketBuffer(settings.maxPacketSize, clientChannel, settings.address) +// val builder = new StringBuilder +// val timestamp = getTimestamp(snapshot.to) +// +// counters.foreach { c => +// writeLongMetricValue(builder, c, "count", timestamp) +// packetBuffer.appendMeasurement(builder.toString) +// builder.clear() +// } +// +// gauges.foreach { g => +// writeDoubleMetricValue(builder, g, "value", timestamp) +// packetBuffer.appendMeasurement(builder.toString) +// builder.clear() +// } +// +// histograms.foreach { h => +// writeMetricDistribution(builder, h, settings.percentiles, timestamp) +// packetBuffer.appendMeasurement(builder.toString) +// builder.clear() +// } +// +// rangeSamplers.foreach { rs => +// writeMetricDistribution(builder, rs, settings.percentiles, timestamp) +// packetBuffer.appendMeasurement(builder.toString) +// builder.clear() +// } +// } +// +// private def writeLongMetricValue( +// builder: StringBuilder, +// metric: MetricSnapshot.Values[Long], +// fieldName: String, +// timestamp: String +// ): Unit = +// metric.instruments.foreach { instrument => +// writeNameAndTags(builder, metric.name, instrument.tags) +// writeIntField(builder, fieldName, instrument.value, appendSeparator = false) +// writeTimestamp(builder, timestamp) +// } +// +// private def writeDoubleMetricValue( +// builder: StringBuilder, +// metric: MetricSnapshot.Values[Double], +// fieldName: String, +// timestamp: String +// ): Unit = +// metric.instruments.foreach { instrument => +// writeNameAndTags(builder, metric.name, instrument.tags) +// writeDoubleField(builder, fieldName, instrument.value, appendSeparator = false) +// writeTimestamp(builder, timestamp) +// } +// +// private def writeMetricDistribution( +// builder: StringBuilder, +// metric: MetricSnapshot.Distributions, +// percentiles: Seq[Double], +// timestamp: String +// ): Unit = +// metric.instruments.foreach { instrument => +// if (instrument.value.count > 0) { +// writeNameAndTags(builder, metric.name, instrument.tags) +// writeIntField(builder, "count", instrument.value.count) +// writeIntField(builder, "sum", instrument.value.sum) +// writeIntField(builder, "mean", instrument.value.sum / instrument.value.count) +// writeIntField(builder, "min", instrument.value.min) +// +// percentiles.foreach { p => +// writeDoubleField( +// builder, +// "p" + String.valueOf(p), +// instrument.value.percentile(p).value.toDouble +// ) +// } +// +// writeIntField(builder, "max", instrument.value.max, appendSeparator = false) +// writeTimestamp(builder, timestamp) +// } +// } +// +// private def writeNameAndTags(builder: StringBuilder, name: String, metricTags: TagSet): Unit = { +// builder +// .append(escapeName(name)) +// +// val tags = (if (settings.additionalTags.nonEmpty()) metricTags.withTags(settings.additionalTags) +// else metricTags).all() +// +// if (tags.nonEmpty) { +// tags.foreach { t => +// builder +// .append(',') +// .append(escapeString(t.key)) +// .append("=") +// .append(escapeString(Tag.unwrapValue(t).toString)) +// } +// } +// +// builder.append(' ') +// } +// +// private def escapeName(in: String): String = +// in.replace(" ", "\\ ") +// .replace(",", "\\,") +// +// private def escapeString(in: String): String = +// in.replace(" ", "\\ ") +// .replace("=", "\\=") +// .replace(",", "\\,") +// +// def writeDoubleField( +// builder: StringBuilder, +// fieldName: String, +// value: Double, +// appendSeparator: Boolean = true +// ): Unit = { +// builder +// .append(fieldName) +// .append('=') +// .append(String.valueOf(value)) +// +// if (appendSeparator) +// builder.append(',') +// } +// +// def writeIntField( +// builder: StringBuilder, +// fieldName: String, +// value: Long, +// appendSeparator: Boolean = true +// ): Unit = { +// builder +// .append(fieldName) +// .append('=') +// .append(String.valueOf(value)) +// .append('i') +// +// if (appendSeparator) +// builder.append(',') +// } +// +// def writeTimestamp(builder: StringBuilder, timestamp: String): Unit = +// builder +// .append(' ') +// .append(timestamp) +// .append("\n") +//} @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) object UdpInfluxDBReporter { @@ -239,8 +240,7 @@ object UdpInfluxDBReporter { } final case class Settings( - address: InetSocketAddress, - maxPacketSize: Long, + batchInterval: FiniteDuration, percentiles: Seq[Double], additionalTags: TagSet, measurementPrecision: String diff --git a/node/src/main/scala/coop/rchain/node/runtime/NetworkServers.scala b/node/src/main/scala/coop/rchain/node/runtime/NetworkServers.scala index b9c7fd0bab6..6e77fb7cfa9 100644 --- a/node/src/main/scala/coop/rchain/node/runtime/NetworkServers.scala +++ b/node/src/main/scala/coop/rchain/node/runtime/NetworkServers.scala @@ -185,25 +185,25 @@ object NetworkServers { reportingRoutes ) - def metricsInit[F[_]: Async]( + def metricsInit[F[_]: Async: Log]( nodeConf: NodeConf, kamonConf: Config, prometheusReporter: NewPrometheusReporter - ): Resource[F, Unit] = { - def start: F[Unit] = Dispatcher.parallel[F].use { d => - Sync[F].delay { - Kamon.reconfigure(kamonConf.withFallback(Kamon.config())) - if (nodeConf.metrics.influxdb) - Kamon.addReporter("BatchInfluxDB", new BatchInfluxDBReporter[F](d)).void() - if (nodeConf.metrics.influxdbUdp) - Kamon.addReporter("UdpInfluxDb", new UdpInfluxDBReporter()).void() - if (nodeConf.metrics.prometheus) Kamon.addReporter("Prometheus", prometheusReporter).void() - if (nodeConf.metrics.zipkin) Kamon.addReporter("Zipkin", new ZipkinReporter()).void() - // TODO API for processMetrics is changed in new version of Kamon. It has been never used so comment out. - // reconsider use in future -// if (nodeConf.metrics.sigar) -// kamon.instrumentation.system.process.ProcessMetrics.startCollecting() - } + ): Resource[F, Unit] = Dispatcher.parallel[F].flatMap { d => + def start: F[Unit] = Sync[F].delay { + Kamon.reconfigure(kamonConf.withFallback(Kamon.config())) + + if (nodeConf.metrics.influxdb) + Kamon.addReporter("BatchInfluxDB", new BatchInfluxDBReporter[F](d)).void() + // if (nodeConf.metrics.influxdbUdp) + // Kamon.addReporter("UdpInfluxDb", new UdpInfluxDBReporter()).void() + if (nodeConf.metrics.prometheus) Kamon.addReporter("Prometheus", prometheusReporter).void() + if (nodeConf.metrics.zipkin) Kamon.addReporter("Zipkin", new ZipkinReporter()).void() + // TODO API for processMetrics is changed in new version of Kamon. It has been never used so comment out. + // reconsider use in future + // if (nodeConf.metrics.sigar) + // kamon.instrumentation.system.process.ProcessMetrics.startCollecting() + Kamon.init(kamonConf.withFallback(Kamon.config())) } import scala.concurrent.ExecutionContext.Implicits.global diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 782b1f45706..6389c71e6d7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -65,7 +65,7 @@ object Dependencies { .intransitive() //we only use the lib for one util class (org.lightningj.util.ZBase32) that has no dependencies val lmdbjava = "org.lmdbjava" % "lmdbjava" % "0.9.0" val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.4.6" - val logstashLogback = "net.logstash.logback" % "logstash-logback-encoder" % "6.6" + val logstashLogback = "net.logstash.logback" % "logstash-logback-encoder" % "7.4" val lz4 = "org.lz4" % "lz4-java" % "1.7.1" val magnolia = "com.propensive" %% "magnolia" % "0.17.0" val mockito = "org.mockito" %% "mockito-scala-cats" % "1.17.14" % "test" @@ -81,12 +81,13 @@ object Dependencies { val scalapbRuntime = "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" val scalapbRuntimeLib = "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion val scalapbRuntimegGrpc = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion - val grpcNetty = "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion - val grpcServices = "io.grpc" % "grpc-services" % scalapb.compiler.Version.grpcJavaVersion - val nettyBoringSsl = "io.netty" % "netty-tcnative-boringssl-static" % "2.0.46.Final" - val nettyTcnative = "io.netty" % "netty-tcnative" % "2.0.46.Final" classifier osClassifier - val nettyTcnativeLinux = "io.netty" % "netty-tcnative" % "2.0.46.Final" classifier "linux-x86_64" - val nettyTcnativeFedora = "io.netty" % "netty-tcnative" % "2.0.46.Final" classifier "linux-x86_64-fedora" + val grpcServices = "io.grpc" % "grpc-services" % "1.60.0" + val grpcNetty = "io.grpc" % "grpc-netty" % "1.60.0" + val grpcNettyShaded = "io.grpc" % "grpc-netty-shaded" % "1.60.0" + val nettyBoringSsl = "io.netty" % "netty-tcnative-boringssl-static" % "2.0.50.Final" + val nettyTcnative = "io.netty" % "netty-tcnative" % "2.0.50.Final" classifier osClassifier + val nettyTcnativeLinux = "io.netty" % "netty-tcnative" % "2.0.50.Final" classifier "linux-x86_64" + val nettyTcnativeFedora = "io.netty" % "netty-tcnative" % "2.0.50.Final" classifier "linux-x86_64-fedora" val scalaCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0" val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" % "test" val scalatestPlus = "org.scalatestplus" %% "scalacheck-1-16" % "3.2.13.0" % "test" @@ -98,7 +99,7 @@ object Dependencies { val slf4j = "org.slf4j" % "slf4j-api" % slf4jVersion val weupnp = "org.bitlet" % "weupnp" % "0.1.4" val sourcecode = "com.lihaoyi" %% "sourcecode" % "0.2.1" - val grpcNettyShaded = "io.grpc" % "grpc-netty-shaded" % scalapb.compiler.Version.grpcJavaVersion + val influxdb = "com.influxdb" % "influxdb3-java" % "0.4.0" // format: on @@ -120,29 +121,37 @@ object Dependencies { sourcecode, scalatest, // Overrides for transitive dependencies (we don't use them directly, hence no val-s), - "com.squareup.okhttp3" % "okhttp" % "3.12.1", - "org.objenesis" % "objenesis" % "3.2", - "org.typelevel" % "jawn-parser_2.13" % "1.1.2", - "com.github.jnr" % "jnr-ffi" % "2.2.13", - "com.lihaoyi" %% "geny" % "1.0.0", - "org.scala-lang.modules" %% "scala-xml" % "2.1.0", - "com.typesafe" % "config" % "1.4.2", - // Added to resolve conflicts in scalapb plugin v0.11.3 - "com.google.code.gson" % "gson" % "2.10.1", - "com.google.protobuf" % "protobuf-java" % "3.12.2", - "com.google.errorprone" % "error_prone_annotations" % "2.18.0", - "io.perfmark" % "perfmark-api" % "0.23.0", - "org.codehaus.mojo" % "animal-sniffer-annotations" % "1.19", - "io.circe" %% "circe-jawn" % "0.14.1", - "io.circe" %% "circe-core" % "0.14.1", - "com.comcast" %% "ip4s-core" % "3.0.4", - "org.typelevel" %% "cats-free" % "2.9.0", - "org.typelevel" %% "literally" % "1.0.2", + "com.squareup.okhttp3" % "okhttp" % "3.12.1", + "org.objenesis" % "objenesis" % "3.2", + "org.typelevel" % "jawn-parser_2.13" % "1.1.2", + "com.github.jnr" % "jnr-ffi" % "2.2.13", + "com.lihaoyi" %% "geny" % "1.0.0", + "org.scala-lang.modules" %% "scala-xml" % "2.1.0", + "com.typesafe" % "config" % "1.4.2", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2", + "com.fasterxml.jackson.core" % "jackson-annotations" % "2.15.2", + "com.fasterxml.jackson.core" % "jackson-core" % "2.15.2", + "com.google.code.gson" % "gson" % "2.10.1", + "com.google.protobuf" % "protobuf-java" % "3.19.6", + "com.google.errorprone" % "error_prone_annotations" % "2.18.0", + "io.perfmark" % "perfmark-api" % "0.23.0", + "org.codehaus.mojo" % "animal-sniffer-annotations" % "1.19", + "io.circe" %% "circe-jawn" % "0.14.1", + "io.circe" %% "circe-core" % "0.14.1", + "com.comcast" %% "ip4s-core" % "3.0.4", + "org.typelevel" %% "cats-free" % "2.9.0", + "org.typelevel" %% "literally" % "1.0.2", // Strange version conflict, it requires the same version but in square brackets (range?). // e.g. io.grpc:grpc-core:1.37.0 ([1.37.0] wanted) // https://stackoverflow.com/questions/59423185/strange-versions-conflict-in-sbt-strict-mode - "io.grpc" % "grpc-api" % scalapb.compiler.Version.grpcJavaVersion, - "io.grpc" % "grpc-core" % scalapb.compiler.Version.grpcJavaVersion + "io.grpc" % "grpc-netty" % "1.60.0", + "io.grpc" % "grpc-api" % "1.60.0", + "io.grpc" % "grpc-context" % "1.56.0", + "io.grpc" % "grpc-core" % "1.60.0", + "io.grpc" % "grpc-protobuf" % "1.60.0", + "io.grpc" % "grpc-stub" % "1.60.0", + "io.netty" % "netty-tcnative-boringssl-static" % "2.0.50.Final", + "com.google.j2objc" % "j2objc-annotations" % "1.3" ) private val kindProjector = compilerPlugin( @@ -178,7 +187,7 @@ object Dependencies { Seq(scalapbRuntimeLib) val kamonDependencies: Seq[ModuleID] = - Seq(kamonCore, kamonSystemMetrics, kamonPrometheus, kamonZipkin, kamonInfluxDb) + Seq(kamonCore, kamonSystemMetrics, kamonPrometheus, kamonZipkin, influxdb) val apiServerDependencies: Seq[ModuleID] = http4sDependencies ++ circeDependencies diff --git a/project/plugins.sbt b/project/plugins.sbt index 119fc0ec11e..baa8316f2e3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6") // Yes it's weird to do the following, but it's what is mandated by the scalapb documentation -libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3" +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.15" addSbtPlugin("com.typesafe.sbt" % "sbt-license-report" % "1.2.0") addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.0.14") @@ -18,5 +18,5 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.6") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2") addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") addSbtPlugin("com.sksamuel.scapegoat" %% "sbt-scapegoat" % "1.1.1") -addSbtPlugin("org.typelevel" % "sbt-fs2-grpc" % "2.5.11") +addSbtPlugin("org.typelevel" % "sbt-fs2-grpc" % "2.7.14") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4") diff --git a/rholang/src/main/scala/coop/rchain/rholang/interpreter/errors.scala b/rholang/src/main/scala/coop/rchain/rholang/interpreter/errors.scala index c1b51786662..89e59777d99 100644 --- a/rholang/src/main/scala/coop/rchain/rholang/interpreter/errors.scala +++ b/rholang/src/main/scala/coop/rchain/rholang/interpreter/errors.scala @@ -1,7 +1,6 @@ package coop.rchain.rholang.interpreter import coop.rchain.rholang.interpreter.compiler.SourcePosition -import net.logstash.logback.encoder.org.apache.commons.lang3.exception.ExceptionUtils object errors { @@ -107,7 +106,7 @@ object errors { interpreterErrors: Vector[InterpreterError], errors: Vector[Throwable] ) extends InterpreterError( - s"Error: Aggregate Error\n${(interpreterErrors ++ errors).map(ExceptionUtils.getStackTrace).mkString}" + s"Error: Aggregate Error\n${(interpreterErrors ++ errors).map(_.getStackTrace).mkString}" ) // Current implementation of SpaceMatcher (extractDataCandidates) causes unmatched comms