Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

ZIO telemetry is a purely-functional, type-safe [OpenTracing][link-otr] client, allowing to use opentracing with ZIO fibers.

Opentracing is a standard and API for distributed tracing, i.e. collecting timings, and logs across process boundaries. Well known implementations are [Jaeger][jaeger] and [Zipkin][zipkin].
OpenTracing is a standard and API for distributed tracing, i.e. collecting timings, and logs across process boundaries. Well known implementations are [Jaeger][jaeger] and [Zipkin][zipkin].

To use ZIO telemetry, you will need a `Clock` and a `Telemetry` Service in you environment:
To use ZIO telemetry, you will need a `Clock` and a `Telemetry` service in you environment:

```scala
import io.opentracing.mock.MockTracer
Expand Down Expand Up @@ -41,12 +41,12 @@ val zio = UIO.unit
.log("doing some serious work here!")
.span("child span")

// read back a baggage item using teh companion object instead of a combinator
// read back a baggage item using the companion object instead of a combinator
val baggageIO = Telemetry.getBaggage("foo")

```

To propagate contexts across process boundaries, extraction and injection can be used. The current span context is injected into a carrier, which is passed through some side channel to the next process. There it is injected back and a child span of it is started. For the example we use the standardized `TextMap` carrier. For details about extraction and injection, please refer to [Opentracing Documentation][otr-inject-extract].
To propagate contexts across process boundaries, extraction and injection can be used. The current span context is injected into a carrier, which is passed through some side channel to the next process. There it is injected back and a child span of it is started. For the example we use the standardized `TextMap` carrier. For details about extraction and injection, please refer to [OpenTracing Documentation][otr-inject-extract].

Due to the use of the (mutable) opentracing carrier APIs, injection and extraction are not referentially transparent.

Expand Down
170 changes: 9 additions & 161 deletions src/main/scala/zio/telemetry/Telemetry.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
package zio.telemetry

import io.opentracing.Span
import io.opentracing.SpanContext
import io.opentracing.Tracer
import io.opentracing.propagation.Format
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
import zio.Exit
import zio.Cause
import zio.FiberRef
import zio.IO
import zio.Task
import zio.UIO
import zio.ZIO
import zio.ZManaged
import zio.URIO
import zio.clock.Clock

trait Telemetry extends Serializable {
Expand All @@ -22,156 +13,13 @@ trait Telemetry extends Serializable {
object Telemetry {

trait Service {
def currentSpan: FiberRef[Span]
def tracer: Tracer
}

def managed(
tracer: Tracer,
rootOpName: String = "ROOT"
): ZManaged[Clock, Nothing, Telemetry.Service] =
ZManaged.make(
for {
span <- UIO(tracer.buildSpan(rootOpName).start())
ref <- FiberRef.make(span)
tracer_ = tracer
} yield new Telemetry.Service {
override val currentSpan: FiberRef[Span] = ref
override val tracer: Tracer = tracer_
}
)(_.currentSpan.get.flatMap(span => UIO(span.finish)))

def underlying[R, R1 <: R with Telemetry, E, A](
f: Tracer => ZIO[R, E, A]
): ZIO[R1, E, A] =
getTracer.flatMap(f)

def spanFrom[R, R1 <: R with Clock with Telemetry, E, A, C <: Object](
format: Format[C],
carrier: C,
zio: ZIO[R, E, A],
opName: String,
tagError: Boolean = true,
logError: Boolean = true
): ZIO[R1, E, A] =
getTracer.flatMap { tracer =>
Task(tracer.extract(format, carrier))
.fold(_ => None, Option.apply)
.flatMap {
case None => zio
case Some(spanCtx) =>
span(
zio,
tracer.buildSpan(opName).asChildOf(spanCtx).start,
tagError,
logError
)
}
}

def inject[R, R1 <: R with Clock with Telemetry, E, A, C <: Object](
format: Format[C],
carrier: C
): ZIO[Telemetry, Nothing, Unit] =
for {
tracer <- getTracer
span <- getSpan
_ <- ZIO.effectTotal(tracer.inject(span.context(), format, carrier))
} yield ()

def root[R, R1 <: R with Clock with Telemetry, E, A](
zio: ZIO[R, E, A],
opName: String,
tagError: Boolean = true,
logError: Boolean = true
): ZIO[R1, E, A] =
for {
tracer <- getTracer
root <- UIO(tracer.buildSpan(opName).start())
r <- span(zio, root, tagError, logError)
} yield r

def span[R, R1 <: R with Clock with Telemetry, E, A](
zio: ZIO[R, E, A],
opName: String,
tagError: Boolean = true,
logError: Boolean = true
): ZIO[R1, E, A] =
for {
tracer <- getTracer
old <- getSpan
child <- UIO(tracer.buildSpan(opName).asChildOf(old).start())
r <- span(zio, child, tagError, logError)
} yield r

def span[R, R1 <: R with Clock with Telemetry, E, A](
zio: ZIO[R, E, A],
span: Span,
tagError: Boolean,
logError: Boolean
): ZIO[R1, E, A] =
ZManaged
.make[R1, E, Span](getSpan <* setSpan(span)) { old =>
getCurrentTimeMicros.flatMap(now => UIO(span.finish(now))) *> setSpan(
old
)
}
.use(
_ =>
zio.catchAllCause { cause =>
tag("error", true).when(tagError) *>
log(
Map("error.object" -> cause, "stack" -> cause.prettyPrint)
).when(logError) *>
IO.done(Exit.Failure(cause))
}
)

def context: ZIO[Telemetry, Nothing, SpanContext] =
getSpan.map(_.context)

def getBaggageItem(key: String): ZIO[Telemetry, Nothing, Option[String]] =
getSpan.map(_.getBaggageItem(key)).map(Option.apply)
type A

def setBaggageItem(
key: String,
value: String
): ZIO[Telemetry, Nothing, Unit] =
getSpan.flatMap(span => UIO(span.setBaggageItem(key, value))).unit

def tag(key: String, value: String): ZIO[Telemetry, Nothing, Unit] =
getSpan.flatMap(span => UIO(span.setTag(key, value))).unit

def tag(key: String, value: Int): ZIO[Telemetry, Nothing, Unit] =
getSpan.flatMap(span => UIO(span.setTag(key, value))).unit

def tag(key: String, value: Boolean): ZIO[Telemetry, Nothing, Unit] =
getSpan.flatMap(span => UIO(span.setTag(key, value))).unit

def log(msg: String): ZIO[Clock with Telemetry, Nothing, Unit] =
for {
span <- getSpan
now <- getCurrentTimeMicros
_ <- UIO(span.log(now, msg))
} yield ()

def log(fields: Map[String, _]): ZIO[Clock with Telemetry, Nothing, Unit] =
for {
span <- getSpan
now <- getCurrentTimeMicros
_ <- UIO(span.log(now, fields.asJava))
} yield ()

private def getSpan: ZIO[Telemetry, Nothing, Span] =
ZIO.accessM[Telemetry](_.telemetry.currentSpan.get)

private def setSpan(span: Span): ZIO[Telemetry, Nothing, Unit] =
ZIO.accessM[Telemetry](_.telemetry.currentSpan.set(span))

private def getTracer: ZIO[Telemetry, Nothing, Tracer] =
ZIO.environment[Telemetry].map(_.telemetry.tracer)

private def getCurrentTimeMicros: ZIO[Clock, Nothing, Long] =
ZIO.accessM[Clock](_.clock.currentTime(TimeUnit.MICROSECONDS))
def currentSpan: FiberRef[A]
def root(opName: String): URIO[Clock, A]
def span(span: A, opName: String): URIO[Clock, A]
def finish(span: A): URIO[Clock, Unit]
def error(span: A, cause: Cause[_], tagError: Boolean, logError: Boolean): UIO[Unit]
}

}
16 changes: 16 additions & 0 deletions src/main/scala/zio/telemetry/opentracing/OpenTracing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package zio.telemetry.opentracing

import io.opentracing.Span
import io.opentracing.Tracer
import zio.telemetry.Telemetry

trait OpenTracing extends Telemetry {
override def telemetry: OpenTracing.Service
}

object OpenTracing {
trait Service extends Telemetry.Service {
override type A = Span
private[opentracing] val tracer: Tracer
}
}
124 changes: 124 additions & 0 deletions src/main/scala/zio/telemetry/opentracing/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package zio.telemetry

import java.util.concurrent.TimeUnit

import io.opentracing.Span
import io.opentracing.SpanContext
import io.opentracing.Tracer
import io.opentracing.propagation.Format
import zio.Cause
import zio.FiberRef
import zio.Task
import zio.UIO
import zio.URIO
import zio.ZIO
import zio.ZManaged
import zio.clock.Clock

import scala.jdk.CollectionConverters._

package object opentracing {

def managed(tracer: Tracer, rootOpName: String = "ROOT"): ZManaged[Clock, Nothing, OpenTracing] =
ZManaged.make(
for {
span <- UIO(tracer.buildSpan(rootOpName).start())
ref <- FiberRef.make(span)
tracer_ = tracer
} yield new OpenTracing {

override val telemetry: OpenTracing.Service = new OpenTracing.Service {
override type A = Span

override val tracer: Tracer = tracer_
override val currentSpan: FiberRef[Span] = ref

override def root(opName: String): URIO[Clock, Span] =
UIO(tracer.buildSpan(opName).start())

override def span(span: Span, opName: String): URIO[Clock, Span] =
for {
old <- currentSpan.get
child <- UIO(tracer.buildSpan(opName).asChildOf(old).start())
} yield child

override def finish(span: Span): URIO[Clock, Unit] =
URIO.accessM(_.clock.currentTime(TimeUnit.MICROSECONDS).map(span.finish))

override def error(span: Span, cause: Cause[_], tagError: Boolean, logError: Boolean): UIO[Unit] =
UIO(span.setTag("error", true)).when(tagError) *>
UIO(span.log(Map("error.object" -> cause, "stack" -> cause.prettyPrint).asJava)).when(logError)

}
}
)(_.telemetry.currentSpan.get.flatMap(span => UIO(span.finish())))

implicit final class OpenTracingOps(val telemetry: OpenTracing.Service) extends AnyVal {
def spanFrom[R, R1 <: R with Clock, E, A, C <: Object](
format: Format[C],
carrier: C,
zio: ZIO[R, E, A],
opName: String,
tagError: Boolean = true,
logError: Boolean = true
): ZIO[R1, E, A] =
Task(telemetry.tracer.extract(format, carrier))
.fold(_ => None, Option.apply)
.flatMap {
case None => zio
case Some(spanCtx) =>
zio.span(telemetry)(
telemetry.tracer.buildSpan(opName).asChildOf(spanCtx).start,
tagError,
logError
)
}

def inject[R, R1 <: R with Clock with OpenTracing.Service, E, A, C <: Object](
format: Format[C],
carrier: C
): ZIO[Telemetry, Nothing, Unit] =
telemetry.currentSpan.get.flatMap { span =>
ZIO.effectTotal(telemetry.tracer.inject(span.context(), format, carrier)).unit
}

def context: ZIO[OpenTracing.Service, Nothing, SpanContext] =
telemetry.currentSpan.get.map(_.context)

def getBaggageItem(key: String): UIO[Option[String]] =
getSpan.map(_.getBaggageItem(key)).map(Option(_))

def setBaggageItem(key: String, value: String): UIO[Unit] =
getSpan.map(_.setBaggageItem(key, value)).unit

def tag(key: String, value: String): UIO[Unit] =
getSpan.map(_.setTag(key, value)).unit

def tag(key: String, value: Int): UIO[Unit] =
getSpan.map(_.setTag(key, value)).unit

def tag(key: String, value: Boolean): UIO[Unit] =
getSpan.map(_.setTag(key, value)).unit

def log(msg: String): ZIO[Clock, Nothing, Unit] =
for {
span <- getSpan
now <- getCurrentTimeMicros
_ <- UIO(span.log(now, msg))
} yield ()

def log(fields: Map[String, _]): ZIO[Clock, Nothing, Unit] =
for {
span <- getSpan
now <- getCurrentTimeMicros
_ <- UIO(span.log(now, fields.asJava))
} yield ()

private def getSpan: UIO[Span] =
telemetry.currentSpan.get

private def getCurrentTimeMicros: ZIO[Clock, Nothing, Long] =
ZIO.accessM(_.clock.currentTime(TimeUnit.MICROSECONDS))

}
}
Loading