diff --git a/README.md b/README.md index 7563472c..813fc4b1 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,9 @@ ZIO telemetry is a purely-functional, type-safe [OpenTracing][link-otr] client, allowing to use opentracing with ZIO fibers. -Opentracing is a standard and API for distributed tracing, i.e. collecting timings, and logs across process boundaries. Well known implementations are [Jaeger][jaeger] and [Zipkin][zipkin]. +OpenTracing is a standard and API for distributed tracing, i.e. collecting timings, and logs across process boundaries. Well known implementations are [Jaeger][jaeger] and [Zipkin][zipkin]. -To use ZIO telemetry, you will need a `Clock` and a `Telemetry` Service in you environment: +To use ZIO telemetry, you will need a `Clock` and a `Telemetry` service in you environment: ```scala import io.opentracing.mock.MockTracer @@ -41,12 +41,12 @@ val zio = UIO.unit .log("doing some serious work here!") .span("child span") -// read back a baggage item using teh companion object instead of a combinator +// read back a baggage item using the companion object instead of a combinator val baggageIO = Telemetry.getBaggage("foo") ``` -To propagate contexts across process boundaries, extraction and injection can be used. The current span context is injected into a carrier, which is passed through some side channel to the next process. There it is injected back and a child span of it is started. For the example we use the standardized `TextMap` carrier. For details about extraction and injection, please refer to [Opentracing Documentation][otr-inject-extract]. +To propagate contexts across process boundaries, extraction and injection can be used. The current span context is injected into a carrier, which is passed through some side channel to the next process. There it is injected back and a child span of it is started. For the example we use the standardized `TextMap` carrier. For details about extraction and injection, please refer to [OpenTracing Documentation][otr-inject-extract]. Due to the use of the (mutable) opentracing carrier APIs, injection and extraction are not referentially transparent. diff --git a/src/main/scala/zio/telemetry/Telemetry.scala b/src/main/scala/zio/telemetry/Telemetry.scala index 0d3c5683..7a507026 100644 --- a/src/main/scala/zio/telemetry/Telemetry.scala +++ b/src/main/scala/zio/telemetry/Telemetry.scala @@ -1,18 +1,9 @@ package zio.telemetry -import io.opentracing.Span -import io.opentracing.SpanContext -import io.opentracing.Tracer -import io.opentracing.propagation.Format -import java.util.concurrent.TimeUnit -import scala.jdk.CollectionConverters._ -import zio.Exit +import zio.Cause import zio.FiberRef -import zio.IO -import zio.Task import zio.UIO -import zio.ZIO -import zio.ZManaged +import zio.URIO import zio.clock.Clock trait Telemetry extends Serializable { @@ -22,156 +13,13 @@ trait Telemetry extends Serializable { object Telemetry { trait Service { - def currentSpan: FiberRef[Span] - def tracer: Tracer - } - - def managed( - tracer: Tracer, - rootOpName: String = "ROOT" - ): ZManaged[Clock, Nothing, Telemetry.Service] = - ZManaged.make( - for { - span <- UIO(tracer.buildSpan(rootOpName).start()) - ref <- FiberRef.make(span) - tracer_ = tracer - } yield new Telemetry.Service { - override val currentSpan: FiberRef[Span] = ref - override val tracer: Tracer = tracer_ - } - )(_.currentSpan.get.flatMap(span => UIO(span.finish))) - - def underlying[R, R1 <: R with Telemetry, E, A]( - f: Tracer => ZIO[R, E, A] - ): ZIO[R1, E, A] = - getTracer.flatMap(f) - - def spanFrom[R, R1 <: R with Clock with Telemetry, E, A, C <: Object]( - format: Format[C], - carrier: C, - zio: ZIO[R, E, A], - opName: String, - tagError: Boolean = true, - logError: Boolean = true - ): ZIO[R1, E, A] = - getTracer.flatMap { tracer => - Task(tracer.extract(format, carrier)) - .fold(_ => None, Option.apply) - .flatMap { - case None => zio - case Some(spanCtx) => - span( - zio, - tracer.buildSpan(opName).asChildOf(spanCtx).start, - tagError, - logError - ) - } - } - - def inject[R, R1 <: R with Clock with Telemetry, E, A, C <: Object]( - format: Format[C], - carrier: C - ): ZIO[Telemetry, Nothing, Unit] = - for { - tracer <- getTracer - span <- getSpan - _ <- ZIO.effectTotal(tracer.inject(span.context(), format, carrier)) - } yield () - - def root[R, R1 <: R with Clock with Telemetry, E, A]( - zio: ZIO[R, E, A], - opName: String, - tagError: Boolean = true, - logError: Boolean = true - ): ZIO[R1, E, A] = - for { - tracer <- getTracer - root <- UIO(tracer.buildSpan(opName).start()) - r <- span(zio, root, tagError, logError) - } yield r - - def span[R, R1 <: R with Clock with Telemetry, E, A]( - zio: ZIO[R, E, A], - opName: String, - tagError: Boolean = true, - logError: Boolean = true - ): ZIO[R1, E, A] = - for { - tracer <- getTracer - old <- getSpan - child <- UIO(tracer.buildSpan(opName).asChildOf(old).start()) - r <- span(zio, child, tagError, logError) - } yield r - - def span[R, R1 <: R with Clock with Telemetry, E, A]( - zio: ZIO[R, E, A], - span: Span, - tagError: Boolean, - logError: Boolean - ): ZIO[R1, E, A] = - ZManaged - .make[R1, E, Span](getSpan <* setSpan(span)) { old => - getCurrentTimeMicros.flatMap(now => UIO(span.finish(now))) *> setSpan( - old - ) - } - .use( - _ => - zio.catchAllCause { cause => - tag("error", true).when(tagError) *> - log( - Map("error.object" -> cause, "stack" -> cause.prettyPrint) - ).when(logError) *> - IO.done(Exit.Failure(cause)) - } - ) - - def context: ZIO[Telemetry, Nothing, SpanContext] = - getSpan.map(_.context) - - def getBaggageItem(key: String): ZIO[Telemetry, Nothing, Option[String]] = - getSpan.map(_.getBaggageItem(key)).map(Option.apply) + type A - def setBaggageItem( - key: String, - value: String - ): ZIO[Telemetry, Nothing, Unit] = - getSpan.flatMap(span => UIO(span.setBaggageItem(key, value))).unit - - def tag(key: String, value: String): ZIO[Telemetry, Nothing, Unit] = - getSpan.flatMap(span => UIO(span.setTag(key, value))).unit - - def tag(key: String, value: Int): ZIO[Telemetry, Nothing, Unit] = - getSpan.flatMap(span => UIO(span.setTag(key, value))).unit - - def tag(key: String, value: Boolean): ZIO[Telemetry, Nothing, Unit] = - getSpan.flatMap(span => UIO(span.setTag(key, value))).unit - - def log(msg: String): ZIO[Clock with Telemetry, Nothing, Unit] = - for { - span <- getSpan - now <- getCurrentTimeMicros - _ <- UIO(span.log(now, msg)) - } yield () - - def log(fields: Map[String, _]): ZIO[Clock with Telemetry, Nothing, Unit] = - for { - span <- getSpan - now <- getCurrentTimeMicros - _ <- UIO(span.log(now, fields.asJava)) - } yield () - - private def getSpan: ZIO[Telemetry, Nothing, Span] = - ZIO.accessM[Telemetry](_.telemetry.currentSpan.get) - - private def setSpan(span: Span): ZIO[Telemetry, Nothing, Unit] = - ZIO.accessM[Telemetry](_.telemetry.currentSpan.set(span)) - - private def getTracer: ZIO[Telemetry, Nothing, Tracer] = - ZIO.environment[Telemetry].map(_.telemetry.tracer) - - private def getCurrentTimeMicros: ZIO[Clock, Nothing, Long] = - ZIO.accessM[Clock](_.clock.currentTime(TimeUnit.MICROSECONDS)) + def currentSpan: FiberRef[A] + def root(opName: String): URIO[Clock, A] + def span(span: A, opName: String): URIO[Clock, A] + def finish(span: A): URIO[Clock, Unit] + def error(span: A, cause: Cause[_], tagError: Boolean, logError: Boolean): UIO[Unit] + } } diff --git a/src/main/scala/zio/telemetry/opentracing/OpenTracing.scala b/src/main/scala/zio/telemetry/opentracing/OpenTracing.scala new file mode 100644 index 00000000..f1141ac1 --- /dev/null +++ b/src/main/scala/zio/telemetry/opentracing/OpenTracing.scala @@ -0,0 +1,16 @@ +package zio.telemetry.opentracing + +import io.opentracing.Span +import io.opentracing.Tracer +import zio.telemetry.Telemetry + +trait OpenTracing extends Telemetry { + override def telemetry: OpenTracing.Service +} + +object OpenTracing { + trait Service extends Telemetry.Service { + override type A = Span + private[opentracing] val tracer: Tracer + } +} diff --git a/src/main/scala/zio/telemetry/opentracing/package.scala b/src/main/scala/zio/telemetry/opentracing/package.scala new file mode 100644 index 00000000..c97cc772 --- /dev/null +++ b/src/main/scala/zio/telemetry/opentracing/package.scala @@ -0,0 +1,124 @@ +package zio.telemetry + +import java.util.concurrent.TimeUnit + +import io.opentracing.Span +import io.opentracing.SpanContext +import io.opentracing.Tracer +import io.opentracing.propagation.Format +import zio.Cause +import zio.FiberRef +import zio.Task +import zio.UIO +import zio.URIO +import zio.ZIO +import zio.ZManaged +import zio.clock.Clock + +import scala.jdk.CollectionConverters._ + +package object opentracing { + + def managed(tracer: Tracer, rootOpName: String = "ROOT"): ZManaged[Clock, Nothing, OpenTracing] = + ZManaged.make( + for { + span <- UIO(tracer.buildSpan(rootOpName).start()) + ref <- FiberRef.make(span) + tracer_ = tracer + } yield new OpenTracing { + + override val telemetry: OpenTracing.Service = new OpenTracing.Service { + override type A = Span + + override val tracer: Tracer = tracer_ + override val currentSpan: FiberRef[Span] = ref + + override def root(opName: String): URIO[Clock, Span] = + UIO(tracer.buildSpan(opName).start()) + + override def span(span: Span, opName: String): URIO[Clock, Span] = + for { + old <- currentSpan.get + child <- UIO(tracer.buildSpan(opName).asChildOf(old).start()) + } yield child + + override def finish(span: Span): URIO[Clock, Unit] = + URIO.accessM(_.clock.currentTime(TimeUnit.MICROSECONDS).map(span.finish)) + + override def error(span: Span, cause: Cause[_], tagError: Boolean, logError: Boolean): UIO[Unit] = + UIO(span.setTag("error", true)).when(tagError) *> + UIO(span.log(Map("error.object" -> cause, "stack" -> cause.prettyPrint).asJava)).when(logError) + + } + } + )(_.telemetry.currentSpan.get.flatMap(span => UIO(span.finish()))) + + implicit final class OpenTracingOps(val telemetry: OpenTracing.Service) extends AnyVal { + def spanFrom[R, R1 <: R with Clock, E, A, C <: Object]( + format: Format[C], + carrier: C, + zio: ZIO[R, E, A], + opName: String, + tagError: Boolean = true, + logError: Boolean = true + ): ZIO[R1, E, A] = + Task(telemetry.tracer.extract(format, carrier)) + .fold(_ => None, Option.apply) + .flatMap { + case None => zio + case Some(spanCtx) => + zio.span(telemetry)( + telemetry.tracer.buildSpan(opName).asChildOf(spanCtx).start, + tagError, + logError + ) + } + + def inject[R, R1 <: R with Clock with OpenTracing.Service, E, A, C <: Object]( + format: Format[C], + carrier: C + ): ZIO[Telemetry, Nothing, Unit] = + telemetry.currentSpan.get.flatMap { span => + ZIO.effectTotal(telemetry.tracer.inject(span.context(), format, carrier)).unit + } + + def context: ZIO[OpenTracing.Service, Nothing, SpanContext] = + telemetry.currentSpan.get.map(_.context) + + def getBaggageItem(key: String): UIO[Option[String]] = + getSpan.map(_.getBaggageItem(key)).map(Option(_)) + + def setBaggageItem(key: String, value: String): UIO[Unit] = + getSpan.map(_.setBaggageItem(key, value)).unit + + def tag(key: String, value: String): UIO[Unit] = + getSpan.map(_.setTag(key, value)).unit + + def tag(key: String, value: Int): UIO[Unit] = + getSpan.map(_.setTag(key, value)).unit + + def tag(key: String, value: Boolean): UIO[Unit] = + getSpan.map(_.setTag(key, value)).unit + + def log(msg: String): ZIO[Clock, Nothing, Unit] = + for { + span <- getSpan + now <- getCurrentTimeMicros + _ <- UIO(span.log(now, msg)) + } yield () + + def log(fields: Map[String, _]): ZIO[Clock, Nothing, Unit] = + for { + span <- getSpan + now <- getCurrentTimeMicros + _ <- UIO(span.log(now, fields.asJava)) + } yield () + + private def getSpan: UIO[Span] = + telemetry.currentSpan.get + + private def getCurrentTimeMicros: ZIO[Clock, Nothing, Long] = + ZIO.accessM(_.clock.currentTime(TimeUnit.MICROSECONDS)) + + } +} diff --git a/src/main/scala/zio/telemetry/package.scala b/src/main/scala/zio/telemetry/package.scala index 83c23b89..9f279d3f 100644 --- a/src/main/scala/zio/telemetry/package.scala +++ b/src/main/scala/zio/telemetry/package.scala @@ -1,48 +1,58 @@ package zio -import io.opentracing.Span -import io.opentracing.propagation.Format import zio.clock.Clock package object telemetry { - implicit class TelemetrySyntax[R, E, A](private val zio: ZIO[R, E, A]) extends AnyVal { + implicit class TelemetryOps[R, E, A](private val zio: ZIO[R, E, A]) extends AnyVal { - def spanFrom[C <: Object]( - format: Format[C], - carrier: C, + def root[R1 <: R with Clock with Telemetry]( opName: String, tagError: Boolean = true, logError: Boolean = true - ): ZIO[R with Clock with Telemetry, E, A] = - Telemetry.spanFrom( - format, - carrier, - zio, - opName, - tagError, - logError - ) - - def root( - opName: String, - tagError: Boolean = true, - logError: Boolean = true - ): ZIO[R with Clock with Telemetry, E, A] = - Telemetry.root(zio, opName, tagError, logError) + ): ZIO[R1, E, A] = + for { + telemetry <- getTelemetry + root <- telemetry.root(opName) + r <- span(telemetry)(root, tagError, logError) + } yield r - def span( + def span[R1 <: R with Clock with Telemetry]( opName: String, tagError: Boolean = true, logError: Boolean = true - ): ZIO[R with Clock with Telemetry, E, A] = - Telemetry.span(zio, opName, tagError, logError) + ): ZIO[R1, E, A] = + for { + telemetry <- getTelemetry + old <- getSpan(telemetry) + child <- telemetry.span(old, opName) + r <- span(telemetry)(child, tagError, logError) + } yield r - def span( - span: Span, + def span[R1 <: R with Clock](telemetry: Telemetry.Service)( + span: telemetry.A, tagError: Boolean, logError: Boolean - ): ZIO[R with Clock with Telemetry, E, A] = - Telemetry.span(zio, span, tagError, logError) + ): ZIO[R1, E, A] = + for { + old <- getSpan(telemetry) + r <- (setSpan(telemetry)(span) *> + zio.catchAllCause { cause => + telemetry.error(span, cause, tagError, logError) *> + IO.done(Exit.Failure(cause)) + }).ensuring( + telemetry.finish(span) *> + setSpan(telemetry)(old) + ) + } yield r + + private def setSpan(telemetry: Telemetry.Service)(span: telemetry.A): UIO[Unit] = + telemetry.currentSpan.set(span) + + private def getSpan(telemetry: Telemetry.Service): UIO[telemetry.A] = + telemetry.currentSpan.get + + private def getTelemetry: ZIO[Telemetry, Nothing, Telemetry.Service] = + ZIO.environment[Telemetry].map(_.telemetry) } } diff --git a/src/test/scala/zio/telemetry/TelemetryTest.scala b/src/test/scala/zio/telemetry/TelemetryTest.scala index d379cd5b..e19410bc 100644 --- a/src/test/scala/zio/telemetry/TelemetryTest.scala +++ b/src/test/scala/zio/telemetry/TelemetryTest.scala @@ -6,16 +6,16 @@ import io.opentracing.propagation.Format import io.opentracing.propagation.TextMapAdapter import scala.collection.mutable import scala.jdk.CollectionConverters._ -import zio.telemetry.Telemetry._ +import zio.duration._ +import zio.telemetry.opentracing._ import zio.telemetry.TelemetryTestUtils._ import zio.test._ import zio.test.Assertion._ import zio.test.DefaultRunnableSpec +import zio.test.environment.TestClock import zio.UIO import zio.ZIO import zio.ZManaged -import zio.test.environment.TestClock -import zio.duration._ object TelemetryTest extends DefaultRunnableSpec( @@ -88,12 +88,13 @@ object TelemetryTest for { tracer <- makeTracer tm = new TextMapAdapter(mutable.Map.empty.asJava) - _ <- makeService(tracer).use((for { - _ <- inject(Format.Builtin.TEXT_MAP, tm).span("foo") - _ <- UIO.unit - .spanFrom(Format.Builtin.TEXT_MAP, tm, "baz") - .span("bar") - } yield ()).provide) + _ <- makeService(tracer).use { env => + (env.telemetry.inject(Format.Builtin.TEXT_MAP, tm).span("foo") *> + env.telemetry + .spanFrom(Format.Builtin.TEXT_MAP, tm, UIO.unit, "baz") + .span("bar")) + .provide(env) + } } yield { val spans = tracer.finishedSpans().asScala val root = spans.find(_.operationName() == "ROOT") @@ -112,11 +113,14 @@ object TelemetryTest testM("tagging") { for { tracer <- makeTracer - _ <- makeService(tracer).use((for { - _ <- tag("boolean", true) - _ <- tag("int", 1) - _ <- tag("string", "foo") - } yield ()).provide) + _ <- makeService(tracer).use( + env => + (for { + _ <- env.telemetry.tag("boolean", true) + _ <- env.telemetry.tag("int", 1) + _ <- env.telemetry.tag("string", "foo") + } yield ()).provide(env) + ) } yield { val tags = tracer.finishedSpans().asScala.head.tags.asScala.toMap val expected = Map[String, Any]("boolean" -> true, "int" -> 1, "string" -> "foo") @@ -126,11 +130,14 @@ object TelemetryTest testM("logging") { for { tracer <- makeTracer - _ <- makeService(tracer).use((for { - _ <- log("message") - _ <- TestClock.adjust(1000.micros) - _ <- log(Map("msg" -> "message", "size" -> 1)) - } yield ()).provide) + _ <- makeService(tracer).use( + env => + (for { + _ <- env.telemetry.log("message") + _ <- TestClock.adjust(1000.micros) + _ <- env.telemetry.log(Map("msg" -> "message", "size" -> 1)) + } yield ()).provide(env) + ) } yield { val tags = tracer.finishedSpans().asScala.head.logEntries.asScala.map(le => le.timestampMicros -> le.fields.asScala) @@ -144,10 +151,11 @@ object TelemetryTest testM("baggage") { val test = for { - _ <- setBaggageItem("foo", "bar") - _ <- setBaggageItem("bar", "baz") - fooBag <- getBaggageItem("foo") - barBag <- getBaggageItem("bar") + env <- ZIO.environment[OpenTracing] + _ <- env.telemetry.setBaggageItem("foo", "bar") + _ <- env.telemetry.setBaggageItem("bar", "baz") + fooBag <- env.telemetry.getBaggageItem("foo") + barBag <- env.telemetry.getBaggageItem("bar") } yield assert(fooBag, isSome(equalTo("bar"))) && assert(barBag, isSome(equalTo("baz"))) test.provideSomeManaged(makeTracer.toManaged_.flatMap(makeService)) @@ -161,14 +169,14 @@ object TelemetryTestUtils { def makeService( tracer: MockTracer - ): ZManaged[TestClock, Nothing, TestClock with Telemetry] = + ): ZManaged[TestClock, Nothing, TestClock with OpenTracing] = for { - clockService <- ZIO.environment[TestClock].toManaged_ - telemetryService <- managed(tracer) - } yield new TestClock with Telemetry { + clockService <- ZIO.environment[TestClock].toManaged_ + telemetry_ <- managed(tracer) + } yield new TestClock with OpenTracing { override val clock: TestClock.Service[Any] = clockService.clock override val scheduler: TestClock.Service[Any] = clockService.scheduler - override def telemetry: Telemetry.Service = telemetryService + override def telemetry: OpenTracing.Service = telemetry_.telemetry } }