Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@


// Our Scala versions.
lazy val `scala-2.12` = "2.12.13"
lazy val `scala-2.13` = "2.13.6"
lazy val `scala-3.0` = "3.0.2"
lazy val `scala-3` = "3.0.2"

// This is used in a couple places
lazy val fs2Version = "3.1.1"
Expand Down Expand Up @@ -46,7 +45,7 @@ lazy val commonSettings = Seq(

// Compilation
scalaVersion := `scala-2.13`,
crossScalaVersions := Seq(`scala-2.12`, `scala-2.13`, `scala-3.0`),
crossScalaVersions := Seq(`scala-2.13`, `scala-3`),
scalacOptions -= "-language:experimental.macros", // doesn't work cross-version
Compile / doc / scalacOptions --= Seq("-Xfatal-warnings"),
Compile / doc / scalacOptions ++= Seq(
Expand Down Expand Up @@ -115,6 +114,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform)
"org.scodec" %%% "scodec-cats" % "1.1.0",
"org.tpolecat" %%% "natchez-core" % natchezVersion,
"org.tpolecat" %%% "sourcepos" % "1.0.1",
"org.tpolecat" %%% "pool-party" % "0.0.4",
"org.scala-lang.modules" %%% "scala-collection-compat" % "2.4.4",
) ++ Seq(
"com.beachape" %%% "enumeratum" % "1.6.1",
Expand Down
33 changes: 30 additions & 3 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import fs2.io.net.{ Network, SocketGroup }
import fs2.Pipe
import fs2.Stream
import natchez.Trace
import org.tpolecat.poolparty.PooledResourceBuilder
import skunk.codec.all.bool
import skunk.data._
import skunk.net.Protocol
Expand All @@ -22,6 +23,9 @@ import skunk.net.SSLNegotiation
import skunk.data.TransactionIsolationLevel
import skunk.data.TransactionAccessMode
import skunk.net.protocol.Describe
import org.tpolecat.poolparty.PoolEvent
import org.tpolecat.poolparty.PoolEvent.FinalizerFailure
import org.tpolecat.poolparty.PoolEvent.HealthCheckFailure

/**
* Represents a live connection to a Postgres database. Operations provided here are safe to use
Expand Down Expand Up @@ -235,6 +239,26 @@ object Session {

}

object PoolReporters {

/**
* A very minimal pool event reporter, which dumps all pool events when `debug` is true
* and dumps stack traces if a health check or finalizer fails with an exception (otherwise
* these exceptions are lost since they're serviced by a worker thread). We should make this
* more configurable.
* @param debug if true, log all events
*/
def default[F[_]: Console: Applicative](debug: Boolean): PoolEvent[Session[F]] => F[Unit] = e =>
Console[F].println(s"Pool: $e").whenA(debug) *> {
e match {
case FinalizerFailure(_, _, _, ex) => Console[F].printStackTrace(ex)
case HealthCheckFailure(_, _, _, ex) => Console[F].printStackTrace(ex)
case _ => Applicative[F].unit
}
}

}

/**
* Resource yielding a `SessionPool` managing up to `max` concurrent `Session`s. Typically you
* will `use` this resource once on application startup and pass the resulting
Expand All @@ -259,7 +283,7 @@ object Session {
* @param queryCache Size of the cache for query checking
* @group Constructors
*/
def pooled[F[_]: Concurrent: Trace: Network: Console](
def pooled[F[_]: Temporal: Trace: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand All @@ -282,7 +306,10 @@ object Session {
for {
dc <- Resource.eval(Describe.Cache.empty[F](commandCache, queryCache))
sslOp <- Resource.eval(ssl.toSSLNegotiationOptions(if (debug) logger.some else none))
pool <- Pool.of(session(Network[F], sslOp, dc), max)(Recyclers.full)
pool <- PooledResourceBuilder.of(session(Network[F], sslOp, dc), max)
.withHealthCheck(Recyclers.full[F].run)
.withReporter(PoolReporters.default[F](debug))
.build
} yield pool

}
Expand All @@ -293,7 +320,7 @@ object Session {
* single-session pool. This method is shorthand for `Session.pooled(..., max = 1, ...).flatten`.
* @see pooled
*/
def single[F[_]: Concurrent: Trace: Network: Console](
def single[F[_]: Temporal: Trace: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down
168 changes: 0 additions & 168 deletions modules/core/shared/src/main/scala/util/Pool.scala

This file was deleted.

Loading