Skip to content

Commit a1a29d7

Browse files
author
Roman Janusz
committed
mongo transaction support, typed wrappers for MongoClient, MongoDatabase, ClientSession
1 parent e2c250e commit a1a29d7

File tree

8 files changed

+362
-72
lines changed

8 files changed

+362
-72
lines changed

commons-macros/src/main/scala/com/avsystem/commons/macros/misc/MiscMacros.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ final class MiscMacros(ctx: blackbox.Context) extends AbstractMacroCommons(ctx)
7575
q"new ${c.prefix}.ValName(${owner.asTerm.getter.name.decodedName.toString})"
7676
}
7777

78+
def optionalizeFirstArg(expr: Tree): Tree = expr match {
79+
case Apply(MaybeTypeApply(Select(prefix, name: TermName), targs), head :: tail) =>
80+
q"if($head ne null) $expr else ${c.untypecheck(prefix)}.$name[..$targs](..$tail)"
81+
case _ =>
82+
c.abort(expr.pos, "function application expected")
83+
}
84+
7885
def compilationError(error: Tree): Tree = error match {
7986
case StringLiteral(errorStr) =>
8087
abortAt(errorStr, error.pos)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.avsystem.commons
2+
package mongo.typed
3+
4+
import com.mongodb.reactivestreams.client.ClientSession
5+
import com.mongodb.session.ServerSession
6+
import com.mongodb.{ClientSessionOptions, ServerAddress, TransactionOptions}
7+
import monix.eval.Task
8+
import org.bson.{BsonDocument, BsonTimestamp}
9+
10+
import java.io.Closeable
11+
12+
class TypedClientSession(val nativeSession: ClientSession)
13+
extends Closeable with TypedMongoUtils {
14+
15+
def hasActiveTransaction: Boolean =
16+
nativeSession.hasActiveTransaction
17+
18+
def transactionOptions: TransactionOptions =
19+
nativeSession.getTransactionOptions
20+
21+
def startTransaction(
22+
transactionOptions: TransactionOptions = TransactionOptions.builder().build()
23+
): Unit =
24+
nativeSession.startTransaction(transactionOptions)
25+
26+
def commitTransaction: Task[Unit] =
27+
single(nativeSession.commitTransaction()).void
28+
29+
def abortTransaction: Task[Unit] =
30+
single(nativeSession.abortTransaction()).void
31+
32+
def pinnedServerAddress: Option[ServerAddress] =
33+
Option(nativeSession.getPinnedServerAddress)
34+
35+
def transactionContext(): Option[AnyRef] =
36+
Option(nativeSession.getTransactionContext)
37+
38+
def setTransactionContext(address: ServerAddress, transactionContext: Any): Unit =
39+
nativeSession.setTransactionContext(address, transactionContext)
40+
41+
def clearTransactionContext(): Unit =
42+
nativeSession.clearTransactionContext()
43+
44+
def recoveryToken(): Option[BsonDocument] =
45+
Option(nativeSession.getRecoveryToken)
46+
47+
def setRecoveryToken(recoverToken: BsonDocument): Unit =
48+
nativeSession.setRecoveryToken(recoverToken)
49+
50+
def options: ClientSessionOptions =
51+
nativeSession.getOptions
52+
53+
def casuallyConsistent: Boolean =
54+
nativeSession.isCausallyConsistent
55+
56+
def originator: AnyRef =
57+
nativeSession.getOriginator
58+
59+
def serverSession: ServerSession =
60+
nativeSession.getServerSession
61+
62+
def operationTime: BsonTimestamp =
63+
nativeSession.getOperationTime
64+
65+
def advanceOperationTime(operationTime: BsonTimestamp): Unit =
66+
nativeSession.advanceOperationTime(operationTime)
67+
68+
def advanceClusterTime(clusterTime: BsonDocument): Unit =
69+
nativeSession.advanceClusterTime(clusterTime)
70+
71+
def snapshotTimestamp: BsonTimestamp =
72+
nativeSession.getSnapshotTimestamp
73+
74+
def setSnapshotTimestamp(snapshotTimestamp: BsonTimestamp): Unit =
75+
nativeSession.setSnapshotTimestamp(snapshotTimestamp)
76+
77+
def clusterTime: BsonDocument =
78+
nativeSession.getClusterTime
79+
80+
def close(): Unit =
81+
nativeSession.close()
82+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.avsystem.commons
2+
package mongo.typed
3+
4+
import com.avsystem.commons.mongo.BsonValueInput
5+
import com.avsystem.commons.serialization.GenCodec
6+
import com.mongodb.connection.ClusterDescription
7+
import com.mongodb.reactivestreams.client.{MongoClient, MongoClients}
8+
import com.mongodb.{ClientSessionOptions, ConnectionString, MongoClientSettings, MongoDriverInformation}
9+
import monix.eval.Task
10+
import monix.reactive.Observable
11+
import org.bson.Document
12+
13+
import java.io.Closeable
14+
15+
object TypedMongoClient {
16+
def apply(): TypedMongoClient =
17+
new TypedMongoClient(MongoClients.create())
18+
19+
def apply(connectionString: String): TypedMongoClient =
20+
new TypedMongoClient(MongoClients.create(connectionString))
21+
22+
def apply(connectionString: ConnectionString): TypedMongoClient =
23+
new TypedMongoClient(MongoClients.create(connectionString))
24+
25+
def apply(settings: MongoClientSettings): TypedMongoClient =
26+
new TypedMongoClient(MongoClients.create(settings))
27+
28+
def apply(
29+
connectionString: ConnectionString,
30+
driverInformation: MongoDriverInformation,
31+
): TypedMongoClient =
32+
new TypedMongoClient(MongoClients.create(connectionString, driverInformation))
33+
34+
def apply(
35+
settings: MongoClientSettings,
36+
driverInformation: MongoDriverInformation,
37+
): TypedMongoClient =
38+
new TypedMongoClient(MongoClients.create(settings, driverInformation))
39+
}
40+
41+
/**
42+
* A better-typed wrapper over [[MongoClient]]. Uses Monix [[Task]] and [[Observable]] instead of
43+
* [[org.reactivestreams.Publisher]]. Returns similar better-typed wrappers for database and client session objects.
44+
*/
45+
class TypedMongoClient(
46+
val nativeClient: MongoClient,
47+
val clientSession: OptArg[TypedClientSession] = OptArg.Empty
48+
) extends TypedMongoUtils with Closeable {
49+
private val sessionOrNull = clientSession.toOpt.map(_.nativeSession).orNull
50+
51+
def withSession(session: TypedClientSession): TypedMongoClient =
52+
new TypedMongoClient(nativeClient, session)
53+
54+
def getDatabase(name: String): TypedMongoDatabase =
55+
new TypedMongoDatabase(nativeClient.getDatabase(name))
56+
57+
def listDatabaseNames: Observable[String] =
58+
multi(optionalizeFirstArg(nativeClient.listDatabaseNames(sessionOrNull)))
59+
60+
def listDatabases: Observable[Document] =
61+
multi(optionalizeFirstArg(nativeClient.listDatabases(sessionOrNull)))
62+
63+
def listDatabases[T: GenCodec]: Observable[T] =
64+
listDatabases.map(doc => BsonValueInput.read[T](doc.toBsonDocument))
65+
66+
//TODO: `watch` methods
67+
68+
def startSession(
69+
options: ClientSessionOptions = ClientSessionOptions.builder().build()
70+
): Task[TypedClientSession] =
71+
single(nativeClient.startSession(options)).map(new TypedClientSession(_))
72+
73+
def clusterDescription(): ClusterDescription =
74+
nativeClient.getClusterDescription
75+
76+
def close(): Unit = nativeClient.close()
77+
}

0 commit comments

Comments
 (0)