Skip to content

Commit c5362c8

Browse files
author
Roman Janusz
committed
added some nicer utilities for transactions + some tests
1 parent a1a29d7 commit c5362c8

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

commons-mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedClientSession.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.avsystem.commons
22
package mongo.typed
33

4+
import cats.effect.ExitCase
45
import com.mongodb.reactivestreams.client.ClientSession
56
import com.mongodb.session.ServerSession
67
import com.mongodb.{ClientSessionOptions, ServerAddress, TransactionOptions}
@@ -29,6 +30,18 @@ class TypedClientSession(val nativeSession: ClientSession)
2930
def abortTransaction: Task[Unit] =
3031
single(nativeSession.abortTransaction()).void
3132

33+
def inTransaction[T](
34+
transactionOptions: TransactionOptions = TransactionOptions.builder().build()
35+
)(
36+
task: Task[T]
37+
): Task[T] = Task.defer {
38+
startTransaction(transactionOptions)
39+
task.guaranteeCase {
40+
case ExitCase.Completed => commitTransaction
41+
case _ => abortTransaction
42+
}
43+
}
44+
3245
def pinnedServerAddress: Option[ServerAddress] =
3346
Option(nativeSession.getPinnedServerAddress)
3447

commons-mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoClient.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package mongo.typed
33

44
import com.avsystem.commons.mongo.BsonValueInput
55
import com.avsystem.commons.serialization.GenCodec
6+
import com.mongodb._
67
import com.mongodb.connection.ClusterDescription
78
import com.mongodb.reactivestreams.client.{MongoClient, MongoClients}
8-
import com.mongodb.{ClientSessionOptions, ConnectionString, MongoClientSettings, MongoDriverInformation}
99
import monix.eval.Task
1010
import monix.reactive.Observable
1111
import org.bson.Document
@@ -70,6 +70,21 @@ class TypedMongoClient(
7070
): Task[TypedClientSession] =
7171
single(nativeClient.startSession(options)).map(new TypedClientSession(_))
7272

73+
def inSession[T](
74+
options: ClientSessionOptions = ClientSessionOptions.builder().build()
75+
)(
76+
task: TypedClientSession => Task[T]
77+
): Task[T] =
78+
startSession(options).bracket(task)(s => Task(s.close()))
79+
80+
def inTransaction[T](
81+
sessionOptions: ClientSessionOptions = ClientSessionOptions.builder().build(),
82+
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
83+
)(
84+
task: TypedClientSession => Task[T]
85+
): Task[T] =
86+
inSession(sessionOptions)(s => s.inTransaction(transactionOptions)(task(s)))
87+
7388
def clusterDescription(): ClusterDescription =
7489
nativeClient.getClusterDescription
7590

commons-mongo/jvm/src/test/scala/com/avsystem/commons/mongo/typed/TypedMongoCollectionTest.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,4 +225,46 @@ class TypedMongoCollectionTest extends AnyFunSuite with ScalaFutures with Before
225225
val filter = Rtaie.ref(_.str).is("foo")
226226
assert(rtaieColl.findOne(filter, Rtaie.IdRef zip Rtaie.SelfRef).value.contains((id, entity)))
227227
}
228+
229+
test("successful transaction") {
230+
val query = Rte.IdRef.is(entities(0).id)
231+
val prop = Rte.ref(_.int)
232+
233+
val transaction =
234+
client.inTransaction() { session =>
235+
val coll = rteColl.withSession(session)
236+
for {
237+
_ <- coll.updateOne(query, prop.inc(1))
238+
fromOutside <- rteColl.findOne(query, prop)
239+
fromInside <- coll.findOne(query, prop)
240+
} yield (fromOutside, fromInside)
241+
}
242+
243+
val (fromOutside, fromInside) = transaction.value
244+
val committed = rteColl.findOne(query, prop).value
245+
246+
// no dirty reads, old value should be visible before commit
247+
assert(fromOutside.contains(entities(0).int))
248+
// we should see the new value from within the transaction
249+
assert(fromInside.contains(entities(0).int + 1))
250+
// we should see the new value after commit
251+
assert(committed.contains(entities(0).int + 1))
252+
}
253+
254+
test("failed transaction") {
255+
val query = Rte.IdRef.is(entities(0).id)
256+
val prop = Rte.ref(_.int)
257+
258+
val transaction =
259+
client.inTransaction() { session =>
260+
val coll = rteColl.withSession(session)
261+
coll.updateOne(query, prop.inc(1)) *> Task.raiseError(new Exception)
262+
}
263+
264+
transaction.materialize.value
265+
266+
val committed = rteColl.findOne(query, prop).value
267+
// we should see the old value because the transaction should be aborted
268+
assert(committed.contains(entities(0).int))
269+
}
228270
}

0 commit comments

Comments
 (0)