diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml new file mode 100644 index 000000000..954e44c00 --- /dev/null +++ b/.github/workflows/gradle.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + java: [ '8', '11', '14' ] + + steps: + - uses: actions/checkout@v2 + - name: Setup Java + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.java }} + - name: Build with Gradle + run: ./gradlew build + - name: Publish coverage + run: ./gradlew jacocoTestReport coveralls diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index e28794d45..000000000 --- a/.travis.yml +++ /dev/null @@ -1,13 +0,0 @@ -language: java - -jdk: - - openjdk8 - - openjdk11 - - openjdk14 - - openjdk15 - -services: - - docker - -after_success: - - ./gradlew jacocoTestReport coveralls diff --git a/README.md b/README.md index d047bb217..00769c68b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/bwaldvogel/mongo-java-server.png?branch=master)](https://travis-ci.org/bwaldvogel/mongo-java-server) +[![CI](https://github.com/bwaldvogel/mongo-java-server/workflows/CI/badge.svg)](https://github.com/bwaldvogel/mongo-java-server/actions) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/de.bwaldvogel/mongo-java-server/badge.svg)](http://maven-badges.herokuapp.com/maven-central/de.bwaldvogel/mongo-java-server) [![Coverage Status](https://coveralls.io/repos/github/bwaldvogel/mongo-java-server/badge.svg?branch=master)](https://coveralls.io/github/bwaldvogel/mongo-java-server?branch=master) [![BSD 3-Clause License](https://img.shields.io/github/license/bwaldvogel/mongo-java-server.svg)](https://opensource.org/licenses/BSD-3-Clause) diff --git a/core/build.gradle b/core/build.gradle index 8d546231b..20e1d50c5 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -2,6 +2,7 @@ dependencies { implementation group: 'io.netty', name: 'netty-transport', version: 'latest.release' implementation group: 'io.netty', name: 'netty-codec', version: 'latest.release' implementation group: 'io.netty', name: 'netty-handler', version: 'latest.release' + api group: 'com.h2database', name: 'h2', version: 'latest.release' testImplementation group: 'org.mongodb', name: 'mongo-java-driver', version: 'latest.release' testImplementation "org.mockito:mockito-core:latest.release" diff --git a/core/gradle.lockfile b/core/gradle.lockfile index 2c20a1360..4cf8f8c43 100644 --- a/core/gradle.lockfile +++ b/core/gradle.lockfile @@ -3,12 +3,13 @@ # This file is expected to be part of source control. ch.qos.logback:logback-classic:1.2.3=testRuntimeClasspath ch.qos.logback:logback-core:1.2.3=testRuntimeClasspath -io.netty:netty-buffer:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.10.18=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.10.18=testCompileClasspath,testRuntimeClasspath nl.jqno.equalsverifier:equalsverifier:3.5=testCompileClasspath,testRuntimeClasspath diff --git a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java index abde2b8ab..9d0fbf360 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java @@ -1,5 +1,6 @@ package de.bwaldvogel.mongo; +import java.io.Serializable; import java.util.concurrent.CompletionStage; import de.bwaldvogel.mongo.backend.QueryParameters; diff --git a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoDatabase.java index 800990532..3229a5223 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoDatabase.java @@ -2,6 +2,7 @@ import java.util.concurrent.CompletionStage; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.backend.QueryResult; import de.bwaldvogel.mongo.bson.Document; import de.bwaldvogel.mongo.oplog.Oplog; @@ -13,7 +14,7 @@ public interface AsyncMongoDatabase { - CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog); + CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession); CompletionStage handleQueryAsync(MongoQuery query); diff --git a/core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java b/core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java index 99b619692..22bb6eb2c 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java +++ b/core/src/main/java/de/bwaldvogel/mongo/MongoBackend.java @@ -132,4 +132,6 @@ default CompletionStage closeAsync() { MongoBackend version(ServerVersion version); + void setServerAddress(String serverAddress); + } diff --git a/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java index f10ea038d..6e32d49ab 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java @@ -8,6 +8,7 @@ import de.bwaldvogel.mongo.backend.ArrayFilters; import de.bwaldvogel.mongo.backend.Index; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.backend.QueryParameters; import de.bwaldvogel.mongo.backend.QueryResult; import de.bwaldvogel.mongo.bson.Document; @@ -67,6 +68,8 @@ default QueryResult handleQuery(Document query, int numberToSkip, int limit) { QueryResult handleQuery(QueryParameters queryParameters); + QueryResult handleQuery(QueryParameters queryParameters, MongoSession mongoSession); + @Override default CompletionStage handleQueryAsync(QueryParameters queryParameters) { return FutureUtils.wrap(() -> handleQuery(queryParameters)); @@ -81,6 +84,11 @@ default List insertDocuments(List documents) { Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters, boolean isMulti, boolean isUpsert, Oplog oplog); + default Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters, + boolean isMulti, boolean isUpsert, Oplog oplog, MongoSession mongoSession) { + return updateDocuments(selector, update, arrayFilters, isMulti, isUpsert, oplog, mongoSession); + } + default int deleteDocuments(Document selector, int limit) { return deleteDocuments(selector, limit, NoopOplog.get()); } @@ -89,6 +97,8 @@ default int deleteDocuments(Document selector, int limit) { Document handleDistinct(Document query); + Document handleDistinct(Document query, MongoSession mongoSession); + Document getStats(); Document validate(); @@ -97,6 +107,8 @@ default int deleteDocuments(Document selector, int limit) { int count(Document query, int skip, int limit); + int count(Document query, int skip, int limit, MongoSession mongoSession); + default boolean isEmpty() { return count() == 0; } @@ -112,5 +124,4 @@ default int getNumIndexes() { void renameTo(MongoDatabase newDatabase, String newCollectionName); void drop(); - } diff --git a/core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java index dbe066930..f29cfdfe6 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/MongoDatabase.java @@ -4,6 +4,7 @@ import de.bwaldvogel.mongo.backend.CollectionOptions; import de.bwaldvogel.mongo.backend.QueryResult; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.bson.Document; import de.bwaldvogel.mongo.oplog.Oplog; import de.bwaldvogel.mongo.util.FutureUtils; @@ -19,11 +20,11 @@ public interface MongoDatabase extends AsyncMongoDatabase { void handleClose(Channel channel); - Document handleCommand(Channel channel, String command, Document query, Oplog oplog); + Document handleCommand(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession); @Override - default CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) { - return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog)); + default CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) { + return FutureUtils.wrap(() -> handleCommand(channel, command, query, oplog, mongoSession)); } QueryResult handleQuery(MongoQuery query); @@ -55,6 +56,8 @@ default CompletionStage handleDeleteAsync(MongoDelete delete, Oplog oplog) void handleUpdate(MongoUpdate update, Oplog oplog); + void handleUpdate(MongoUpdate update, Oplog oplog, MongoSession mongoSession); + @Override default CompletionStage handleUpdateAsync(MongoUpdate update, Oplog oplog) { return FutureUtils.wrap(() -> { diff --git a/core/src/main/java/de/bwaldvogel/mongo/MongoServer.java b/core/src/main/java/de/bwaldvogel/mongo/MongoServer.java index 6bb8c79a9..92552a772 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/MongoServer.java +++ b/core/src/main/java/de/bwaldvogel/mongo/MongoServer.java @@ -102,6 +102,10 @@ public void initChannel(SocketChannel ch) throws Exception { }); channel = bootstrap.bind().syncUninterruptibly().channel(); + InetSocketAddress localAddress = getLocalAddress(); + if (localAddress != null) { + backend.setServerAddress(String.format("%s:%d", localAddress.getHostName(), localAddress.getPort())); + } log.info("started {}", this); } catch (RuntimeException e) { diff --git a/core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java b/core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java index 72488a510..62f4bace2 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java +++ b/core/src/main/java/de/bwaldvogel/mongo/ServerVersion.java @@ -8,7 +8,8 @@ public enum ServerVersion { MONGO_3_0(Arrays.asList(3, 0, 0), 2), - MONGO_3_6(Arrays.asList(3, 6, 0), 6); + MONGO_3_6(Arrays.asList(3, 6, 0), 6), + MONGO_4_2(Arrays.asList(4, 2, 0), 8); private final List versionArray; private final int wireVersion; diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java index 2be6192c8..3e269bf05 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java @@ -13,10 +13,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import org.h2.mvstore.tx.Transaction; +import org.h2.mvstore.tx.TransactionStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +55,7 @@ public abstract class AbstractMongoBackend implements MongoBackend { protected static final String OPLOG_COLLECTION_NAME = "oplog.rs"; - static final String ADMIN_DB_NAME = "admin"; + public static final String ADMIN_DB_NAME = "admin"; private final Map databases = new ConcurrentHashMap<>(); @@ -64,6 +67,10 @@ public abstract class AbstractMongoBackend implements MongoBackend { private final CursorRegistry cursorRegistry = new CursorRegistry(); protected Oplog oplog = NoopOplog.get(); + private String serverAddress; + + protected final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + protected TransactionStore transactionStore; protected AbstractMongoBackend() { this(defaultClock()); @@ -150,7 +157,7 @@ private Document getLog(String argument) { return response; } - private Document handleAdminCommand(String command, Document query) { + protected Document handleAdminCommand(String command, Document query) { if (command.equalsIgnoreCase("listdatabases")) { List databases = listDatabaseNames().stream() .sorted() @@ -201,13 +208,21 @@ private Document handleAdminCommand(String command, Document query) { } else if (command.equalsIgnoreCase("ping")) { return successResponse(); } else if (command.equalsIgnoreCase("endSessions")) { - log.debug("endSessions on admin database"); + handleEndSessions(query); return successResponse(); } else { throw new NoSuchCommandException(command); } } + private void handleEndSessions(Document query) { + log.debug("endSessions"); + ArrayList endingSessions = (ArrayList)query.get("endSessions"); + endingSessions.stream().map(s -> s.get("id")) + .filter(sessions::containsKey) + .forEach(sessions::remove); + } + private static Document successResponse() { Document response = new Document(); Utils.markOkay(response); @@ -314,6 +329,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String response.put("maxWireVersion", Integer.valueOf(version.getWireVersion())); response.put("minWireVersion", Integer.valueOf(0)); response.put("localTime", Instant.now(clock)); + response.put("setName", "rs0"); + response.put("hosts", Collections.singleton(serverAddress)); + response.put("me", serverAddress); + response.put("primary", serverAddress); + response.put("logicalSessionTimeoutMinutes", 100); + response.put("connectionId", 21210); Utils.markOkay(response); return response; } else if (command.equalsIgnoreCase("buildinfo")) { @@ -328,6 +349,12 @@ private Document handleCommandSync(Channel channel, String databaseName, String return handleGetMore(databaseName, command, query); } else if (command.equalsIgnoreCase("killCursors")) { return handleKillCursors(query); + } else if (command.equalsIgnoreCase("commitTransaction")) { + UUID sessionId = Utils.getSessionId(query); + sessions.get(sessionId).commit(); + Document response = new Document("lsid", sessionId); + Utils.markOkay(response); + return response; } return null; } @@ -342,8 +369,26 @@ public Document handleCommand(Channel channel, String databaseName, String comma if (databaseName.equals(ADMIN_DB_NAME)) { return handleAdminCommand(command, query); } + MongoSession mongoSession = MongoSession.NoopSession(); + if (query != null) { + if ((boolean)query.getOrDefault("autocommit", true)) { + return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, null); + } - return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog); + UUID sessionId = Utils.getSessionId(query); + if (sessionId == null) { + throw new RuntimeException("SessionId cannot be null. Make sure you are using a mongo driver version that support sessions and transactions."); + } + if (sessions.containsKey(sessionId)) { + mongoSession = sessions.get(sessionId); + } else { + Transaction transaction = transactionStore.begin(); + log.info(String.format("Starting new transaction with id %d: %s", transaction.getId(), transaction.getName())); + mongoSession = new MongoSession(sessionId, transactionStore.begin()); + sessions.put(sessionId, mongoSession); + } + } + return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog, mongoSession); } @Override @@ -373,7 +418,7 @@ public CompletionStage handleCommandAsync(Channel channel, String data return FutureUtils.wrap(() -> handleAdminCommand(command, query)); } - return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog); + return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog, null); } @Override @@ -498,6 +543,11 @@ public void dropDatabase(String databaseName) { } } + @Override + public void setServerAddress(String serverAddress) { + this.serverAddress = serverAddress; + } + @Override public void handleClose(Channel channel) { for (MongoDatabase db : databases.values()) { diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java index 9269f828d..caaf21650 100755 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java @@ -2,6 +2,7 @@ import static de.bwaldvogel.mongo.backend.Constants.ID_FIELD; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -9,6 +10,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeSet; @@ -57,19 +59,20 @@ protected boolean documentMatchesQuery(Document document, Document query) { } protected QueryResult queryDocuments(Document query, Document orderBy, int numberToSkip, int limit, int batchSize, - Document fieldSelector) { + Document fieldSelector, MongoSession mongoSession) { for (Index

index : indexes) { if (index.canHandle(query)) { Iterable

positions = index.getPositions(query); - return matchDocuments(query, positions, orderBy, numberToSkip, limit, batchSize, fieldSelector); + return matchDocuments(query, positions, orderBy, numberToSkip, limit, batchSize, fieldSelector, mongoSession); } } - return matchDocuments(query, orderBy, numberToSkip, limit, batchSize, fieldSelector); + return matchDocuments(query, orderBy, numberToSkip, limit, batchSize, fieldSelector, mongoSession); } protected abstract QueryResult matchDocuments(Document query, Document orderBy, int numberToSkip, - int numberToReturn, int batchSize, Document fieldSelector); + int numberToReturn, int batchSize, Document fieldSelector, + MongoSession mongoSession); protected QueryResult matchDocumentsFromStream(Stream documentStream, Document query, Document orderBy, int numberToSkip, int limit, int batchSize, Document fieldSelector) { @@ -77,6 +80,18 @@ protected QueryResult matchDocumentsFromStream(Stream documentStream, return matchDocumentsFromStream(query, documentStream, numberToSkip, limit, batchSize, documentComparator, fieldSelector); } + protected QueryResult matchDocumentsFromStream(Stream documentStream, Document query, Document orderBy, + int numberToSkip, int limit, int batchSize, Document fieldSelector, + MongoSession mongoSession) { + if (mongoSession == null) { + return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector); + } + Comparator documentComparator = deriveComparator(orderBy); + return matchDocumentsFromStream(query, documentStream, numberToSkip, limit, batchSize, documentComparator, fieldSelector + ); + + } + protected QueryResult matchDocumentsFromStream(Document query, Stream documentStream, int numberToSkip, int limit, int batchSize, Comparator documentComparator, @@ -107,11 +122,11 @@ protected QueryResult matchDocumentsFromStream(Document query, Stream protected QueryResult matchDocuments(Document query, Iterable

positions, Document orderBy, int numberToSkip, int limit, int batchSize, - Document fieldSelector) { + Document fieldSelector, MongoSession mongoSession) { Stream documentStream = StreamSupport.stream(positions.spliterator(), false) - .map(position -> getDocument(position)); + .map(position -> getDocument(position, mongoSession)); - return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector); + return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector, mongoSession); } protected static boolean isNaturalDescending(Document orderBy) { @@ -141,6 +156,13 @@ protected static DocumentComparator deriveComparator(Document orderBy) { return null; } + protected Document getDocument(P position, MongoSession mongoSession) { + if (mongoSession == null) { + return getDocument(position); + } + throw new RuntimeException("Not implemented"); + } + protected abstract Document getDocument(P position); protected abstract void updateDataSize(int sizeDelta); @@ -362,6 +384,10 @@ static void validateUpdateQuery(Document update) { @Override public Document findAndModify(Document query) { + return findAndModify(query, null); + } + + public Document findAndModify(Document query, MongoSession mongoSession) { boolean returnNew = Utils.isTrue(query.get("new")); if (!query.containsKey("remove") && !query.containsKey("update")) { @@ -394,11 +420,11 @@ public Document findAndModify(Document query) { Integer matchPos = matcher.matchPosition(document, (Document) queryObject.get("query")); ArrayFilters arrayFilters = ArrayFilters.parse(query, updateQuery); - Document oldDocument = updateDocument(document, updateQuery, arrayFilters, matchPos); + Map.Entry oldAndNewDocs = updateDocument(document, updateQuery, arrayFilters, matchPos, mongoSession); if (returnNew) { - returnDocument = document; + returnDocument = oldAndNewDocs.getValue(); } else { - returnDocument = oldDocument; + returnDocument = oldAndNewDocs.getKey(); } lastErrorObject = new Document("updatedExisting", Boolean.TRUE); lastErrorObject.put("n", Integer.valueOf(1)); @@ -433,6 +459,11 @@ public Document findAndModify(Document query) { @Override public QueryResult handleQuery(QueryParameters queryParameters) { + return handleQuery(queryParameters, null); + } + + @Override + public QueryResult handleQuery(QueryParameters queryParameters, MongoSession mongoSession) { final Document query; final Document orderBy; Document querySelector = queryParameters.getQuerySelector(); @@ -448,7 +479,7 @@ public QueryResult handleQuery(QueryParameters queryParameters) { } return queryDocuments(query, orderBy, queryParameters.getNumberToSkip(), queryParameters.getLimit(), - queryParameters.getBatchSize(), queryParameters.getProjection()); + queryParameters.getBatchSize(), queryParameters.getProjection(), mongoSession); } @Override @@ -480,11 +511,15 @@ private static Document toErrorDocument(MongoServerError e, int index) { @Override public Document handleDistinct(Document query) { + return handleDistinct(query, null); + } + + public Document handleDistinct(Document query, MongoSession mongoSession) { String key = (String) query.get("key"); Document filter = (Document) query.getOrDefault("query", new Document()); Set values = new TreeSet<>(ValueComparator.ascWithoutListHandling().withDefaultComparatorForUuids()); - for (Document document : queryDocuments(filter, null, 0, 0, 0, null)) { + for (Document document : queryDocuments(filter, null, 0, 0, 0, null, mongoSession)) { Object value = Utils.getSubdocumentValueCollectionAware(document, key); if (!(value instanceof Missing)) { if (value instanceof Collection) { @@ -517,6 +552,11 @@ public int deleteDocuments(Document selector, int limit, Oplog oplog) { @Override public Document updateDocuments(Document selector, Document updateQuery, ArrayFilters arrayFilters, boolean isMulti, boolean isUpsert, Oplog oplog) { + return updateDocuments(selector, updateQuery, arrayFilters, isMulti, isUpsert, oplog, null); + } + + public Document updateDocuments(Document selector, Document updateQuery, ArrayFilters arrayFilters, + boolean isMulti, boolean isUpsert, Oplog oplog, MongoSession mongoSession) { if (isMulti) { for (String key : updateQuery.keySet()) { if (!key.startsWith("$")) { @@ -527,11 +567,13 @@ public Document updateDocuments(Document selector, Document updateQuery, ArrayFi int nMatched = 0; List updatedIds = new ArrayList<>(); - for (Document document : queryDocuments(selector, null, 0, 0, 0, null)) { + for (Document document : queryDocuments(selector, null, 0, 0, 0, null, mongoSession)) { Integer matchPos = matcher.matchPosition(document, selector); - Document oldDocument = updateDocument(document, updateQuery, arrayFilters, matchPos); - if (!Utils.nullAwareEquals(oldDocument, document)) { - updatedIds.add(document.get(getIdField())); + Map.Entry oldAndNew = updateDocument(document, updateQuery, arrayFilters, matchPos, mongoSession); + Document oldDocument = oldAndNew.getKey(); + Document newDocument = oldAndNew.getValue(); + if (!Utils.nullAwareEquals(oldDocument, newDocument)) { + updatedIds.add(newDocument.get(getIdField())); } nMatched++; @@ -556,8 +598,8 @@ public Document updateDocuments(Document selector, Document updateQuery, ArrayFi return result; } - private Document updateDocument(Document document, Document updateQuery, - ArrayFilters arrayFilters, Integer matchPos) { + private Map.Entry updateDocument(Document document, Document updateQuery, + ArrayFilters arrayFilters, Integer matchPos, MongoSession mongoSession) { Document oldDocument = document.cloneDeeply(); Document newDocument = calculateUpdateDocument(document, updateQuery, arrayFilters, matchPos, false); @@ -585,16 +627,16 @@ private Document updateDocument(Document document, Document updateQuery, } // update the fields - for (String key : newDocument.keySet()) { - if (key.contains(".")) { - throw new MongoServerException( - "illegal field name. must not happen as it must be caught by the driver"); - } - document.put(key, newDocument.get(key)); - } - handleUpdate(position, oldDocument, document); +// for (String key : newDocument.keySet()) { +// if (key.contains(".")) { +// throw new MongoServerException( +// "illegal field name. must not happen as it must be caught by the driver"); +// } +// document.put(key, newDocument.get(key)); +// } + handleUpdate(position, oldDocument, newDocument, mongoSession); } - return oldDocument; + return new AbstractMap.SimpleEntry<>(oldDocument, newDocument); } private P getSinglePosition(Document document) { @@ -610,6 +652,8 @@ private P getSinglePosition(Document document) { protected abstract void handleUpdate(P position, Document oldDocument, Document newDocument); + protected abstract void handleUpdate(P position, Document oldDocument, Document newDocument, MongoSession mongoSession); + private Document handleUpsert(Document updateQuery, Document selector, ArrayFilters arrayFilters) { Document document = convertSelectorToDocument(selector); @@ -644,6 +688,11 @@ public List> getIndexes() { @Override public int count(Document query, int skip, int limit) { + return count(query, skip, limit, null); + } + + @Override + public int count(Document query, int skip, int limit, MongoSession mongoSession) { if (query == null || query.keySet().isEmpty()) { int count = count(); if (skip > 0) { @@ -657,7 +706,9 @@ public int count(Document query, int skip, int limit) { int numberToReturn = Math.max(limit, 0); int count = 0; - Iterator it = queryDocuments(query, null, skip, numberToReturn, 0, new Document(getIdField(), 1)).iterator(); + Iterator it = queryDocuments( + query, null, skip, numberToReturn, 0, new Document(getIdField(), 1), mongoSession + ).iterator(); while (it.hasNext()) { it.next(); count++; @@ -770,6 +821,13 @@ protected P findDocumentPosition(Document document) { protected abstract Stream> streamAllDocumentsWithPosition(); + protected Stream> streamAllDocumentsWithPosition(MongoSession mongoSession) { + if (mongoSession == null) { + return streamAllDocumentsWithPosition(); + } + throw new RuntimeException("Not Implemented"); + } + private boolean isSystemCollection() { return AbstractMongoDatabase.isSystemCollection(getCollectionName()); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java index 76e728b7d..ece56f7f0 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java @@ -38,6 +38,7 @@ import de.bwaldvogel.mongo.wire.message.MongoQuery; import de.bwaldvogel.mongo.wire.message.MongoUpdate; import io.netty.channel.Channel; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; public abstract class AbstractMongoDatabase

implements MongoDatabase { @@ -64,6 +65,12 @@ protected AbstractMongoDatabase(String databaseName, CursorRegistry cursorRegist this.cursorRegistry = cursorRegistry; } + protected AbstractMongoDatabase(AbstractMongoDatabase

mongoDatabase) { + this.databaseName = mongoDatabase.getDatabaseName(); + this.cursorRegistry = new CursorRegistry(); + collections.putAll(mongoDatabase.collections); + } + protected void initializeNamespacesAndIndexes() { this.namespaces = openOrCreateCollection(NAMESPACES_COLLECTION_NAME, CollectionOptions.withIdField("name")); this.collections.put(namespaces.getCollectionName(), namespaces); @@ -96,7 +103,7 @@ public String toString() { return getClass().getSimpleName() + "(" + getDatabaseName() + ")"; } - private Document commandError(Channel channel, String command, Document query) { + protected Document commandError(Channel channel, String command, Document query) { // getlasterror must not clear the last error if (command.equalsIgnoreCase("getlasterror")) { return commandGetLastError(channel, command, query); @@ -107,13 +114,13 @@ private Document commandError(Channel channel, String command, Document query) { } // handle command synchronously - private Document handleCommandSync(Channel channel, String command, Document query, Oplog oplog) { + private Document handleCommandSync(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) { if (command.equalsIgnoreCase("find")) { return commandFind(command, query); } else if (command.equalsIgnoreCase("insert")) { return commandInsert(channel, command, query, oplog); } else if (command.equalsIgnoreCase("update")) { - return commandUpdate(channel, command, query, oplog); + return commandUpdate(channel, command, query, oplog, mongoSession); } else if (command.equalsIgnoreCase("delete")) { return commandDelete(channel, command, query, oplog); } else if (command.equalsIgnoreCase("create")) { @@ -167,7 +174,7 @@ private Document handleCommandSync(Channel channel, String command, Document que } @Override - public Document handleCommand(Channel channel, String command, Document query, Oplog oplog) { + public Document handleCommand(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) { Document commandErrorDocument = commandError(channel, command, query); if (commandErrorDocument != null) { return commandErrorDocument; @@ -175,11 +182,13 @@ public Document handleCommand(Channel channel, String command, Document query, O clearLastStatus(channel); - return handleCommandSync(channel, command, query, oplog); + return handleCommandSync(channel, command, query, oplog, mongoSession); } @Override - public CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) { + public CompletionStage handleCommandAsync( + Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession + ) { Document commandErrorDocument = commandError(channel, command, query); if (commandErrorDocument != null) { return FutureUtils.wrap(() -> commandErrorDocument); @@ -191,7 +200,7 @@ public CompletionStage handleCommandAsync(Channel channel, String comm return commandFindAsync(command, query); } - return FutureUtils.wrap(() -> handleCommandSync(channel, command, query, oplog)); + return FutureUtils.wrap(() -> handleCommandSync(channel, command, query, oplog, mongoSession)); } private Document listCollections() { @@ -310,7 +319,7 @@ private Document commandInsert(Channel channel, String command, Document query, return result; } - private Document commandUpdate(Channel channel, String command, Document query, Oplog oplog) { + private Document commandUpdate(Channel channel, String command, Document query, Oplog oplog, MongoSession mongoSession) { clearLastStatus(channel); String collectionName = query.get(command).toString(); boolean isOrdered = Utils.isTrue(query.get("ordered")); @@ -334,7 +343,7 @@ private Document commandUpdate(Channel channel, String command, Document query, boolean upsert = Utils.isTrue(updateObj.get("upsert")); final Document result; try { - result = updateDocuments(collectionName, selector, update, arrayFilters, multi, upsert, oplog); + result = updateDocuments(collectionName, selector, update, arrayFilters, multi, upsert, oplog, mongoSession); } catch (MongoServerException e) { writeErrors.add(toWriteError(i, e)); continue; @@ -559,6 +568,10 @@ private Document commandGetLastError(Channel channel, String command, Document q case "$db": Assert.equals(value, getDatabaseName()); break; + case "lsid": + break; + case "$readPreference": + break; default: throw new MongoServerException("unknown subcommand: " + subCommand); } @@ -769,6 +782,11 @@ public void handleDelete(MongoDelete delete, Oplog oplog) { @Override public void handleUpdate(MongoUpdate updateCommand, Oplog oplog) { + handleUpdate(updateCommand, oplog, null); + } + + @Override + public void handleUpdate(MongoUpdate updateCommand, Oplog oplog, MongoSession mongoSession) { Channel channel = updateCommand.getChannel(); String collectionName = updateCommand.getCollectionName(); Document selector = updateCommand.getSelector(); @@ -779,7 +797,7 @@ public void handleUpdate(MongoUpdate updateCommand, Oplog oplog) { clearLastStatus(channel); try { - Document result = updateDocuments(collectionName, selector, update, arrayFilters, multi, upsert, oplog); + Document result = updateDocuments(collectionName, selector, update, arrayFilters, multi, upsert, oplog, mongoSession); putLastResult(channel, result); } catch (MongoServerException e) { putLastError(channel, e); @@ -922,14 +940,14 @@ private Document deleteDocuments(Channel channel, String collectionName, Documen private Document updateDocuments(String collectionName, Document selector, Document update, ArrayFilters arrayFilters, - boolean multi, boolean upsert, Oplog oplog) { + boolean multi, boolean upsert, Oplog oplog, MongoSession mongoSession) { if (isSystemCollection(collectionName)) { throw new MongoServerError(10156, "cannot update system collection"); } MongoCollection

collection = resolveOrCreateCollection(collectionName); - return collection.updateDocuments(selector, update, arrayFilters, multi, upsert, oplog); + return collection.updateDocuments(selector, update, arrayFilters, multi, upsert, oplog, mongoSession); } private void putLastError(Channel channel, MongoServerException ex) { diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractSynchronizedMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractSynchronizedMongoDatabase.java index 8937870c8..f93a63bed 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractSynchronizedMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractSynchronizedMongoDatabase.java @@ -10,6 +10,10 @@ protected AbstractSynchronizedMongoDatabase(String databaseName, CursorRegistry super(databaseName, cursorRegistry); } + protected AbstractSynchronizedMongoDatabase(AbstractSynchronizedMongoDatabase database) { + super(database); + } + @Override protected synchronized MongoCollection

resolveOrCreateCollection(String collectionName) { return super.resolveOrCreateCollection(collectionName); diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/CursorRegistry.java b/core/src/main/java/de/bwaldvogel/mongo/backend/CursorRegistry.java index f8c7e5a9c..f5a664819 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/CursorRegistry.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/CursorRegistry.java @@ -1,5 +1,6 @@ package de.bwaldvogel.mongo.backend; +import java.io.Serializable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/MongoSession.java b/core/src/main/java/de/bwaldvogel/mongo/backend/MongoSession.java new file mode 100644 index 000000000..f9c1c027a --- /dev/null +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/MongoSession.java @@ -0,0 +1,53 @@ +package de.bwaldvogel.mongo.backend; + +import java.util.UUID; +import org.h2.mvstore.tx.Transaction; + +public class MongoSession implements Cloneable { + public final UUID id; + private Transaction tx; + + public MongoSession(UUID id, Transaction tx) { + this.id = id; + this.tx = tx; + } + + public MongoSession() { + this.id = UUID.randomUUID(); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object obj) { + MongoSession other = (MongoSession) obj; + return id.equals(other.id); + } + + public Transaction getTransaction() { + return tx; + } + + public void commit() { + try { + if (tx != null) { + tx.commit(); + } + } catch (Exception ex) { + tx.rollback(); + } + } + + public MongoSession clone() { + return new MongoSession(id, tx); + } + + public static MongoSession NoopSession() { + return new MongoSession(); + } + + +} diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/ReadOnlyProxy.java b/core/src/main/java/de/bwaldvogel/mongo/backend/ReadOnlyProxy.java index 5e29e68d0..bda314b51 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/ReadOnlyProxy.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/ReadOnlyProxy.java @@ -148,4 +148,8 @@ public void handleKillCursors(MongoKillCursors mongoKillCursors) { backend.handleKillCursors(mongoKillCursors); } + @Override + public void setServerAddress(String serverAddress) { + } + } diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java b/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java index 59bb8c329..e524dd0a2 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -623,4 +624,11 @@ public static void copySubdocumentValue(Document input, Document result, String } } + public static UUID getSessionId(Document document) { + if (document != null && document.containsKey("lsid")) { + return (UUID) ((Document)document.get("lsid")).get("id"); + } + return null; + } + } diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Expression.java b/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Expression.java index 44d73bca0..ac16c6651 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Expression.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Expression.java @@ -37,6 +37,7 @@ import de.bwaldvogel.mongo.backend.Utils; import de.bwaldvogel.mongo.backend.ValueComparator; import de.bwaldvogel.mongo.bson.Document; +import de.bwaldvogel.mongo.exception.ErrorCode; import de.bwaldvogel.mongo.exception.FailedToOptimizePipelineError; import de.bwaldvogel.mongo.exception.MongoServerError; @@ -1381,6 +1382,85 @@ Object apply(List expressionValue, Document document) { } }, + $toDouble { + @Override + Object apply(List expressionValue, Document document) { + Object value = requireSingleValue(expressionValue); + if (Missing.isNullOrMissing(value)) { + return null; + } else if (value instanceof Number) { + Number number = (Number) value; + return number.doubleValue(); + } else if (value instanceof Boolean) { + Boolean booleanValue = (Boolean) value; + return booleanValue.booleanValue() ? 1.0 : 0.0; + } else if (value instanceof Instant) { + Instant instant = (Instant) value; + return (double) instant.toEpochMilli(); + } else { + try { + String string = convertToString(value); + return Double.valueOf(string); + } catch (NumberFormatException e) { + throw new MongoServerError(ErrorCode.ConversionFailure, + "Failed to parse number '" + value + "' in $convert with no onError value:" + + " Did not consume whole number."); + } + } + } + }, + + $toInt { + @Override + Object apply(List expressionValue, Document document) { + Object value = requireSingleValue(expressionValue); + if (Missing.isNullOrMissing(value)) { + return null; + } else if (value instanceof Number) { + Number number = (Number) value; + return number.intValue(); + } else if (value instanceof Boolean) { + Boolean booleanValue = (Boolean) value; + return booleanValue.booleanValue() ? 1 : 0; + } else { + try { + String string = convertToString(value); + return Integer.valueOf(string); + } catch (NumberFormatException e) { + throw new MongoServerError(ErrorCode.ConversionFailure, + "Failed to parse number '" + value + "' in $convert with no onError value."); + } + } + } + }, + + $toLong { + @Override + Object apply(List expressionValue, Document document) { + Object value = requireSingleValue(expressionValue); + if (Missing.isNullOrMissing(value)) { + return null; + } else if (value instanceof Number) { + Number number = (Number) value; + return number.longValue(); + } else if (value instanceof Boolean) { + Boolean booleanValue = (Boolean) value; + return booleanValue.booleanValue() ? 1L : 0L; + } else if (value instanceof Instant) { + Instant instant = (Instant) value; + return instant.toEpochMilli(); + } else { + try { + String string = convertToString(value); + return Long.valueOf(string); + } catch (NumberFormatException e) { + throw new MongoServerError(ErrorCode.ConversionFailure, + "Failed to parse number '" + value + "' in $convert with no onError value."); + } + } + } + }, + $toLower { @Override Object apply(List expressionValue, Document document) { diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStage.java b/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStage.java index c225188b1..b9ff7e0bd 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStage.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStage.java @@ -2,7 +2,9 @@ import static de.bwaldvogel.mongo.backend.Constants.ID_FIELD; +import java.util.List; import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.stream.Stream; import de.bwaldvogel.mongo.backend.Missing; @@ -54,7 +56,15 @@ Document projectDocument(Document document) { } else { Utils.removeSubdocumentValue(result, field); } - } else if (projectionValue == null) { + } + else if (projectionValue instanceof List) { + List resolvedProjectionValues = ((List) projectionValue) + .stream() + .map(value -> Expression.evaluateDocument(value, document)) + .collect(Collectors.toList()); + result.put(field, resolvedProjectionValues); + } + else if (projectionValue == null) { result.put(field, null); } else { Object value = Expression.evaluateDocument(projectionValue, document); diff --git a/core/src/main/java/de/bwaldvogel/mongo/bson/Document.java b/core/src/main/java/de/bwaldvogel/mongo/bson/Document.java index ab9437c85..9e03cc95f 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/bson/Document.java +++ b/core/src/main/java/de/bwaldvogel/mongo/bson/Document.java @@ -11,7 +11,9 @@ import de.bwaldvogel.mongo.backend.Missing; -public final class Document implements Map, Bson { +import org.h2.value.VersionedValue; + +public final class Document extends VersionedValue implements Map, Bson { private static final long serialVersionUID = 1L; diff --git a/core/src/main/java/de/bwaldvogel/mongo/exception/ErrorCode.java b/core/src/main/java/de/bwaldvogel/mongo/exception/ErrorCode.java index 862b14003..25bcc7710 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/exception/ErrorCode.java +++ b/core/src/main/java/de/bwaldvogel/mongo/exception/ErrorCode.java @@ -15,9 +15,16 @@ public enum ErrorCode { InvalidOptions(72), IndexOptionsConflict(85), CannotIndexParallelArrays(171), + ConversionFailure(241), DuplicateKey(11000); private final int id; - ErrorCode(int id) { this.id = id; } - public int getValue() { return id; } + + ErrorCode(int id) { + this.id = id; + } + + public int getValue() { + return id; + } } diff --git a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java index e584e38ab..f8842f9d7 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java +++ b/core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java @@ -1,5 +1,6 @@ package de.bwaldvogel.mongo.oplog; +import java.time.Clock; import java.util.List; import java.util.UUID; import java.util.function.Function; @@ -34,6 +35,13 @@ public CollectionBackedOplog(MongoBackend backend, MongoCollection col this.cursorRegistry = cursorRegistry; } + public CollectionBackedOplog(Clock clock, MongoCollection collection, CursorRegistry cursorRegistry) { + this.oplogClock = new OplogClock(clock); + this.collection = collection; + this.cursorRegistry = cursorRegistry; + this.backend = null; + } + @Override public void handleInsert(String namespace, List documents) { if (isOplogCollection(namespace)) { diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java index d2f207712..1817ec2b8 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java @@ -46,7 +46,7 @@ protected MongoDatabase openOrCreateDatabase(String databaseName) { Utils.markOkay(fakeResponse); fakeResponse.put("message", "fakeResponse"); - when(mockDatabase.handleCommand(any(), any(), any(), any())).thenReturn(fakeResponse); + when(mockDatabase.handleCommand(any(), any(), any(), any(), any())).thenReturn(fakeResponse); return mockDatabase; } diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoCollectionTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoCollectionTest.java index 8517e8903..15e12c6f8 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoCollectionTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoCollectionTest.java @@ -6,6 +6,8 @@ import java.util.stream.Stream; +import de.bwaldvogel.mongo.exception.InvalidOptionsException; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -45,7 +47,8 @@ protected void removeDocument(Object position) { @Override protected QueryResult matchDocuments(Document query, Document orderBy, int numberToSkip, - int numberToReturn, int batchSize, Document fieldSelector) { + int numberToReturn, int batchSize, Document fieldSelector, + MongoSession mongoSession) { throw new UnsupportedOperationException(); } @@ -63,6 +66,11 @@ protected void handleUpdate(Object position, Document oldDocument, Document newD // noop } + @Override + protected void handleUpdate(Object position, Document oldDocument, Document newDocument, MongoSession mongoSession) { + throw new InvalidOptionsException("Sessions and transaction is not supported on on this backend, please use H2Backend"); + } + @Override protected Stream> streamAllDocumentsWithPosition() { throw new UnsupportedOperationException(); diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java index 3cfc6537a..d7ea49de0 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java @@ -8,6 +8,8 @@ import java.util.List; import java.util.concurrent.CompletionStage; +import de.bwaldvogel.mongo.MongoDatabase; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -15,6 +17,7 @@ import de.bwaldvogel.mongo.MongoCollection; import de.bwaldvogel.mongo.bson.Document; import io.netty.channel.Channel; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; class AbstractMongoDatabaseTest { @@ -67,7 +70,7 @@ void testHandleCommandAsyncFindReturnEmpty() throws Exception { Document query = new Document(); query.put("find", "testCollection"); - CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null); + CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null, null); Document response = responseFuture.toCompletableFuture().get(); assertThat(response).isNotNull(); @@ -81,7 +84,7 @@ void testHandleCommandAsyncFindReturnSomething() throws Exception { Document query = new Document(); query.put("find", "mockCollection"); - CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null); + CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null, null); Document response = responseFuture.toCompletableFuture().get(); assertThat(response).isNotNull(); diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/MongoSessionTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/MongoSessionTest.java new file mode 100644 index 000000000..51f10e68e --- /dev/null +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/MongoSessionTest.java @@ -0,0 +1,36 @@ +package de.bwaldvogel.mongo.backend; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class MongoSessionTest { + + MongoSession mongoSession; + + @BeforeEach + public void setup() { + mongoSession = new MongoSession(); + } + + @Test + void testHashCode() { + assertEquals(mongoSession.id.hashCode(), mongoSession.hashCode()); + } + + @Test + void testEquals() { + MongoSession eqSession = mongoSession.clone(); + MongoSession diffSession = new MongoSession(); + + assertTrue(mongoSession.equals(eqSession)); + assertFalse(mongoSession.equals(diffSession)); + } + + @Test + void noopSession() { + MongoSession noopSession = MongoSession.NoopSession(); + assertNull(noopSession.getTransaction()); + } +} \ No newline at end of file diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStageTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStageTest.java index 943b2866c..7b37f2500 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStageTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/aggregation/stage/ProjectStageTest.java @@ -4,6 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import java.util.Collections; + import org.junit.jupiter.api.Test; import de.bwaldvogel.mongo.bson.Document; @@ -41,6 +43,24 @@ void testProject_withNestedInclusion() throws Exception { .isEqualTo(json("_id: 1, x: {b: 2, c: 3}")); } + @Test + void testProject_withFieldToBeEvaluated() { + Document projection = new Document(); + projection.put("_id", 1); + projection.put("x", new Document("count", "$count")); + assertThat(project(json("_id: 1, count: 5"), projection)) + .isEqualTo(json("_id: 1, x: {count: 5}")); + } + + @Test + void testProject_withFieldWithinArrayToBeEvaluated() { + Document projection = new Document(); + projection.put("_id", 1); + projection.put("x", Collections.singletonList(new Document("count", "$count"))); + assertThat(project(json("_id: 1, count: 5"), projection)) + .isEqualTo(json("_id: 1, x: [{count: 5}]")); + } + private static Document project(Document document, Document projection) { return new ProjectStage(projection).projectDocument(document); } diff --git a/gradle.lockfile b/gradle.lockfile index 33ded0d37..a1c421d10 100644 --- a/gradle.lockfile +++ b/gradle.lockfile @@ -1,12 +1,13 @@ # This is a Gradle generated file for dependency locking. # Manual edits can break the build and are not advised. # This file is expected to be part of source control. -io.netty:netty-buffer:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath +com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath org.jacoco:org.jacoco.agent:0.8.6=jacocoAgent,jacocoAnt org.jacoco:org.jacoco.ant:0.8.6=jacocoAnt org.jacoco:org.jacoco.core:0.8.6=jacocoAnt @@ -16,4 +17,4 @@ org.ow2.asm:asm-commons:8.0.1=jacocoAnt org.ow2.asm:asm-tree:8.0.1=jacocoAnt org.ow2.asm:asm:8.0.1=jacocoAnt org.slf4j:slf4j-api:1.7.30=default,runtimeClasspath,testRuntimeClasspath -empty=annotationProcessor,archives,compile,compileClasspath,compileOnly,runtime,shadow,signatures,testAnnotationProcessor,testCompile,testCompileClasspath,testCompileOnly,testRuntime +empty=annotationProcessor,archives,compile,compileOnly,runtime,shadow,signatures,testAnnotationProcessor,testCompile,testCompileOnly,testRuntime diff --git a/h2-backend/gradle.lockfile b/h2-backend/gradle.lockfile index c837103ca..bdceb14b6 100644 --- a/h2-backend/gradle.lockfile +++ b/h2-backend/gradle.lockfile @@ -4,12 +4,12 @@ ch.qos.logback:logback-classic:1.2.3=testRuntimeClasspath ch.qos.logback:logback-core:1.2.3=testRuntimeClasspath com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-buffer:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.10.18=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.10.18=testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.0=testCompileClasspath,testRuntimeClasspath @@ -39,14 +39,14 @@ org.ow2.asm:asm:7.2=jacocoAnt org.reactivestreams:reactive-streams:1.0.3=testCompileClasspath,testRuntimeClasspath org.slf4j:jcl-over-slf4j:1.7.30=testRuntimeClasspath org.slf4j:slf4j-api:1.7.30=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-commons:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-mongodb:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-aop:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-beans:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-context:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-core:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-expression:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-jcl:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-test:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-tx:5.2.10.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-commons:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-mongodb:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-aop:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-beans:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-context:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-core:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-expression:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-jcl:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-test:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-tx:5.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath empty=annotationProcessor,archives,compile,compileOnly,runtime,signatures,testAnnotationProcessor,testCompile,testCompileOnly,testRuntime diff --git a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Backend.java b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Backend.java index 269aa4aa2..3c6e88d61 100644 --- a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Backend.java +++ b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Backend.java @@ -3,6 +3,7 @@ import java.time.Clock; import org.h2.mvstore.MVStore; +import org.h2.mvstore.tx.TransactionStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ public H2Backend(MVStore mvStore) { public H2Backend(MVStore mvStore, Clock clock) { super(clock); this.mvStore = mvStore; - + transactionStore = new TransactionStore(mvStore); mvStore.getMapNames().stream() .filter(mapName -> mapName.startsWith(H2Database.DATABASES_PREFIX)) .map(mapName -> { @@ -101,4 +102,5 @@ public String toString() { return getClass().getSimpleName() + "[" + mvStore.getFileStore().getFileName() + "]"; } } + } diff --git a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Collection.java b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Collection.java index 5ad8e67a5..d2a87b807 100644 --- a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Collection.java +++ b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Collection.java @@ -1,10 +1,14 @@ package de.bwaldvogel.mongo.backend.h2; +import java.util.AbstractMap; import java.util.NoSuchElementException; import java.util.UUID; import java.util.stream.Stream; import org.h2.mvstore.MVMap; +import org.h2.mvstore.tx.Transaction; +import org.h2.mvstore.tx.TransactionMap; +import org.h2.value.VersionedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +19,7 @@ import de.bwaldvogel.mongo.backend.CursorRegistry; import de.bwaldvogel.mongo.backend.DocumentWithPosition; import de.bwaldvogel.mongo.backend.Missing; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.backend.QueryResult; import de.bwaldvogel.mongo.backend.Utils; import de.bwaldvogel.mongo.backend.ValueComparator; @@ -24,18 +29,18 @@ public class H2Collection extends AbstractSynchronizedMongoCollection { private static final Logger log = LoggerFactory.getLogger(H2Collection.class); - private final MVMap dataMap; + private final MVMap dataMap; private final MVMap metaMap; private static final String DATA_SIZE_KEY = "dataSize"; public H2Collection(MongoDatabase database, String collectionName, CollectionOptions options, - MVMap dataMap, MVMap metaMap, CursorRegistry cursorRegistry) { + MVMap dataMap, MVMap metaMap, CursorRegistry cursorRegistry) { super(database, collectionName, options, cursorRegistry); this.dataMap = dataMap; this.metaMap = metaMap; if (!this.metaMap.containsKey(DATA_SIZE_KEY)) { - this.metaMap.put(DATA_SIZE_KEY, Long.valueOf(0)); + this.metaMap.put(DATA_SIZE_KEY, 0L); } else { log.debug("dataSize of {}: {}", getFullName(), getDataSize()); } @@ -45,7 +50,7 @@ public H2Collection(MongoDatabase database, String collectionName, CollectionOpt protected void updateDataSize(int sizeDelta) { synchronized (metaMap) { Number value = (Number) metaMap.get(DATA_SIZE_KEY); - Long newValue = Long.valueOf(value.longValue() + sizeDelta); + Long newValue = value.longValue() + sizeDelta; metaMap.put(DATA_SIZE_KEY, newValue); } } @@ -64,12 +69,27 @@ protected Object addDocumentInternal(Document document) { } else { key = UUID.randomUUID(); } - - Document previous = dataMap.put(Missing.ofNullable(key), document); + Document previous = (Document) dataMap.put(Missing.ofNullable(key), document); Assert.isNull(previous, () -> "Document with key '" + key + "' already existed in " + this + ": " + previous); return key; } + @Override + protected void handleUpdate(Object position, Document oldDocument, Document newDocument) { + dataMap.put(Missing.ofNullable(position), newDocument); + } + + @Override + protected void handleUpdate(Object position, Document oldDocument, Document newDocument, MongoSession mongoSession) { + if (mongoSession == null) { + handleUpdate(position, oldDocument, newDocument); + return; + } + Transaction tx = mongoSession.getTransaction(); + TransactionMap txMap = tx.openMap(dataMap); + txMap.put(Missing.ofNullable(position), newDocument); + } + @Override public int count() { return dataMap.size(); @@ -80,14 +100,22 @@ public boolean isEmpty() { return dataMap.isEmpty(); } + protected Document getDocument(Object position, MongoSession mongoSession) { + if (mongoSession == null) { + return getDocument(position); + } + TransactionMap txMap = mongoSession.getTransaction().openMap(dataMap); + return txMap.get(position); + } + @Override protected Document getDocument(Object position) { - return dataMap.get(position); + return (Document) dataMap.get(position).getCommittedValue(); } @Override protected void removeDocument(Object position) { - Document remove = dataMap.remove(position); + Document remove = (Document) dataMap.remove(position); if (remove == null) { throw new NoSuchElementException("No document with key " + position); } @@ -95,27 +123,35 @@ protected void removeDocument(Object position) { @Override protected Stream> streamAllDocumentsWithPosition() { - return dataMap.entrySet().stream() - .map(entry -> new DocumentWithPosition<>(entry.getValue(), entry.getKey())); + return streamAllDocumentsWithPosition(dataMap); + } + + @Override + protected Stream> streamAllDocumentsWithPosition(MongoSession mongoSession) { + if (mongoSession == null) { + return streamAllDocumentsWithPosition(); + } + TransactionMap txMap = mongoSession.getTransaction().openMap(dataMap); + return streamAllDocumentsWithPosition(txMap.map); + } + + private Stream> streamAllDocumentsWithPosition(AbstractMap map) { + return map.entrySet().stream() + .map(entry -> new DocumentWithPosition<>((Document) entry.getValue(), entry.getKey())); } @Override protected QueryResult matchDocuments(Document query, Document orderBy, int numberToSkip, int limit, int batchSize, - Document fieldSelector) { + Document fieldSelector, MongoSession mongoSession) { final Stream documentStream; if (isNaturalDescending(orderBy)) { - documentStream = streamAllDocumentsWithPosition() + documentStream = streamAllDocumentsWithPosition(mongoSession) .sorted((o1, o2) -> ValueComparator.desc().compare(o1.getPosition(), o2.getPosition())) .map(DocumentWithPosition::getDocument); } else { - documentStream = dataMap.values().stream(); + documentStream = dataMap.values().stream().map(v -> (Document) v); } - return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector); - } - - @Override - protected void handleUpdate(Object position, Document oldDocument, Document newDocument) { - dataMap.put(Missing.ofNullable(position), newDocument); + return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector, mongoSession); } } diff --git a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Database.java b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Database.java index 278a00011..58b79f0b9 100644 --- a/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Database.java +++ b/h2-backend/src/main/java/de/bwaldvogel/mongo/backend/h2/H2Database.java @@ -7,6 +7,8 @@ import org.h2.mvstore.FileStore; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; +import org.h2.mvstore.tx.TransactionStore; +import org.h2.value.VersionedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +31,14 @@ public class H2Database extends AbstractSynchronizedMongoDatabase { static final String DATABASES_PREFIX = "databases."; private final MVStore mvStore; + private TransactionStore transactionStore; public H2Database(String databaseName, MVStore mvStore, CursorRegistry cursorRegistry) { super(databaseName, cursorRegistry); this.mvStore = mvStore; + transactionStore = new TransactionStore(mvStore); + transactionStore.init(); + initializeNamespacesAndIndexes(); } @@ -74,7 +80,7 @@ public void drop(Oplog oplog) { @Override protected MongoCollection openOrCreateCollection(String collectionName, CollectionOptions options) { String fullCollectionName = getFullCollectionNamespace(collectionName); - MVMap dataMap = mvStore.openMap(DATABASES_PREFIX + fullCollectionName); + MVMap dataMap = mvStore.openMap(DATABASES_PREFIX + fullCollectionName); MVMap metaMap = mvStore.openMap(META_PREFIX + fullCollectionName); return new H2Collection(this, collectionName, options, dataMap, metaMap, cursorRegistry); } diff --git a/h2-backend/src/test/java/de/bwaldvogel/mongo/backend/memory/H2TransactionTest.java b/h2-backend/src/test/java/de/bwaldvogel/mongo/backend/memory/H2TransactionTest.java new file mode 100644 index 000000000..83f54f544 --- /dev/null +++ b/h2-backend/src/test/java/de/bwaldvogel/mongo/backend/memory/H2TransactionTest.java @@ -0,0 +1,13 @@ +package de.bwaldvogel.mongo.backend.memory; + +import de.bwaldvogel.mongo.MongoBackend; +import de.bwaldvogel.mongo.ServerVersion; +import de.bwaldvogel.mongo.backend.AbstractTransactionTest; +import de.bwaldvogel.mongo.backend.h2.H2Backend; + +public class H2TransactionTest extends AbstractTransactionTest { + @Override + protected MongoBackend createBackend() throws Exception { + return H2Backend.inMemory(clock).version(ServerVersion.MONGO_4_2); + } +} diff --git a/memory-backend/gradle.lockfile b/memory-backend/gradle.lockfile index 5f51f1769..bdceb14b6 100644 --- a/memory-backend/gradle.lockfile +++ b/memory-backend/gradle.lockfile @@ -3,12 +3,13 @@ # This file is expected to be part of source control. ch.qos.logback:logback-classic:1.2.3=testRuntimeClasspath ch.qos.logback:logback-core:1.2.3=testRuntimeClasspath -io.netty:netty-buffer:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath +com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.10.18=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.10.18=testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.0=testCompileClasspath,testRuntimeClasspath @@ -38,14 +39,14 @@ org.ow2.asm:asm:7.2=jacocoAnt org.reactivestreams:reactive-streams:1.0.3=testCompileClasspath,testRuntimeClasspath org.slf4j:jcl-over-slf4j:1.7.30=testRuntimeClasspath org.slf4j:slf4j-api:1.7.30=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-commons:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-mongodb:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-aop:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-beans:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-context:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-core:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-expression:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-jcl:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-test:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-tx:5.2.10.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-commons:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-mongodb:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-aop:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-beans:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-context:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-core:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-expression:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-jcl:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-test:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-tx:5.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath empty=annotationProcessor,archives,compile,compileOnly,runtime,signatures,testAnnotationProcessor,testCompile,testCompileOnly,testRuntime diff --git a/memory-backend/src/main/java/de/bwaldvogel/mongo/backend/memory/MemoryCollection.java b/memory-backend/src/main/java/de/bwaldvogel/mongo/backend/memory/MemoryCollection.java index 9ca22ff2b..5dc403cf7 100644 --- a/memory-backend/src/main/java/de/bwaldvogel/mongo/backend/memory/MemoryCollection.java +++ b/memory-backend/src/main/java/de/bwaldvogel/mongo/backend/memory/MemoryCollection.java @@ -14,8 +14,12 @@ import de.bwaldvogel.mongo.backend.CollectionOptions; import de.bwaldvogel.mongo.backend.CursorRegistry; import de.bwaldvogel.mongo.backend.DocumentWithPosition; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.backend.QueryResult; import de.bwaldvogel.mongo.bson.Document; +import de.bwaldvogel.mongo.exception.InvalidOptionsException; +import de.bwaldvogel.mongo.exception.MongoServerException; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; public class MemoryCollection extends AbstractSynchronizedMongoCollection { @@ -54,7 +58,9 @@ protected Integer addDocumentInternal(Document document) { } @Override - protected QueryResult matchDocuments(Document query, Document orderBy, int numberToSkip, int limit, int batchSize, Document fieldSelector) { + protected QueryResult matchDocuments( + Document query, Document orderBy, int numberToSkip, int limit, int batchSize, Document fieldSelector, + MongoSession mongoSession) { Iterable documents = iterateAllDocuments(orderBy); Stream documentStream = StreamSupport.stream(documents.spliterator(), false); return matchDocumentsFromStream(documentStream, query, orderBy, numberToSkip, limit, batchSize, fieldSelector); @@ -108,7 +114,19 @@ protected Document getDocument(Integer position) { @Override protected void handleUpdate(Integer position, Document oldDocument, Document newDocument) { - // noop + Document doc = documents.get(position); + for (String key : newDocument.keySet()) { + if (key.contains(".")) { + throw new MongoServerException( + "illegal field name. must not happen as it must be caught by the driver"); + } + doc.put(key, newDocument.get(key)); + } + } + + @Override + protected void handleUpdate(Integer position, Document oldDocument, Document newDocument, MongoSession mongoSession) { + handleUpdate(position, oldDocument, newDocument); } } diff --git a/postgresql-backend/gradle.lockfile b/postgresql-backend/gradle.lockfile index aa47e0ccc..14b670973 100644 --- a/postgresql-backend/gradle.lockfile +++ b/postgresql-backend/gradle.lockfile @@ -8,16 +8,17 @@ com.fasterxml.jackson.core:jackson-core:2.12.0=compileClasspath,default,runtimeC com.fasterxml.jackson.core:jackson-databind:2.12.0=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.12.0=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson:jackson-bom:2.12.0=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-api:3.2.5=testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-transport-zerodep:3.2.5=testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-transport:3.2.5=testCompileClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-api:3.2.7=testCompileClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-transport-zerodep:3.2.7=testCompileClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-transport:3.2.7=testCompileClasspath,testRuntimeClasspath +com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.zaxxer:HikariCP:3.4.5=testCompileClasspath,testRuntimeClasspath -io.netty:netty-buffer:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath junit:junit:4.13.1=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.10.18=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.10.18=testCompileClasspath,testRuntimeClasspath @@ -55,15 +56,15 @@ org.rnorth.duct-tape:duct-tape:1.0.8=testCompileClasspath,testRuntimeClasspath org.rnorth.visible-assertions:visible-assertions:2.1.2=testCompileClasspath,testRuntimeClasspath org.slf4j:jcl-over-slf4j:1.7.30=testRuntimeClasspath org.slf4j:slf4j-api:1.7.30=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-commons:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-mongodb:2.2.11.RELEASE=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-aop:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-beans:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-context:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-core:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-expression:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-jcl:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-test:5.3.1=testCompileClasspath,testRuntimeClasspath -org.springframework:spring-tx:5.2.10.RELEASE=testCompileClasspath,testRuntimeClasspath -org.testcontainers:testcontainers:1.15.0=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-commons:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-mongodb:2.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-aop:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-beans:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-context:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-core:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-expression:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-jcl:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-test:5.3.2=testCompileClasspath,testRuntimeClasspath +org.springframework:spring-tx:5.2.12.RELEASE=testCompileClasspath,testRuntimeClasspath +org.testcontainers:testcontainers:1.15.1=testCompileClasspath,testRuntimeClasspath empty=annotationProcessor,archives,compile,compileOnly,runtime,signatures,testAnnotationProcessor,testCompile,testCompileOnly,testRuntime diff --git a/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlCollection.java b/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlCollection.java index 67c2acbef..7a62a69a5 100644 --- a/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlCollection.java +++ b/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlCollection.java @@ -19,6 +19,7 @@ import de.bwaldvogel.mongo.backend.CollectionOptions; import de.bwaldvogel.mongo.backend.CursorRegistry; import de.bwaldvogel.mongo.backend.DocumentWithPosition; +import de.bwaldvogel.mongo.backend.MongoSession; import de.bwaldvogel.mongo.backend.QueryResult; import de.bwaldvogel.mongo.bson.Document; import de.bwaldvogel.mongo.exception.DuplicateKeyError; @@ -57,7 +58,7 @@ public int count() { @Override protected QueryResult matchDocuments(Document query, Document orderBy, int numberToSkip, int numberToReturn, int batchSize, - Document fieldSelector) { + Document fieldSelector, MongoSession mongoSession) { String sql = "SELECT data FROM " + getQualifiedTablename() + " " + convertOrderByToSql(orderBy); try (Connection connection = backend.getConnection(); PreparedStatement stmt = connection.prepareStatement(sql); @@ -129,7 +130,7 @@ private static int getSortValue(Document orderBy, String key) { @Override protected QueryResult matchDocuments(Document query, Iterable positions, Document orderBy, int numberToSkip, int limit, int batchSize, - Document fieldSelector) { + Document fieldSelector, MongoSession mongoSession) { throw new UnsupportedOperationException("not yet implemented"); } @@ -281,6 +282,11 @@ protected void handleUpdate(Long position, Document oldDocument, Document newDoc } } + @Override + protected void handleUpdate(Long position, Document oldDocument, Document newDocument, MongoSession mongoSession) { + handleUpdate(position, oldDocument, newDocument); + } + @Override public void renameTo(MongoDatabase newDatabase, String newCollectionName) { String oldTablename = PostgresqlCollection.getTablename(getCollectionName()); diff --git a/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlDatabase.java b/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlDatabase.java index c40a4c71b..de7be8610 100644 --- a/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlDatabase.java +++ b/postgresql-backend/src/main/java/de/bwaldvogel/mongo/backend/postgresql/PostgresqlDatabase.java @@ -6,6 +6,7 @@ import java.util.List; import de.bwaldvogel.mongo.MongoCollection; +import de.bwaldvogel.mongo.MongoDatabase; import de.bwaldvogel.mongo.backend.AbstractSynchronizedMongoDatabase; import de.bwaldvogel.mongo.backend.CollectionOptions; import de.bwaldvogel.mongo.backend.CursorRegistry; @@ -14,6 +15,7 @@ import de.bwaldvogel.mongo.backend.postgresql.index.PostgresUniqueIndex; import de.bwaldvogel.mongo.exception.MongoServerException; import de.bwaldvogel.mongo.oplog.Oplog; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; public class PostgresqlDatabase extends AbstractSynchronizedMongoDatabase { diff --git a/test-common/gradle.lockfile b/test-common/gradle.lockfile index a66fc8eec..9425b4f8f 100644 --- a/test-common/gradle.lockfile +++ b/test-common/gradle.lockfile @@ -4,15 +4,16 @@ ch.qos.logback:logback-classic:1.2.3=default,runtimeClasspath,testRuntimeClasspath ch.qos.logback:logback-core:1.2.3=default,runtimeClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-annotations:2.10.3=testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-api:3.2.5=testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-transport-zerodep:3.2.5=testCompileClasspath,testRuntimeClasspath -com.github.docker-java:docker-java-transport:3.2.5=testCompileClasspath,testRuntimeClasspath -io.netty:netty-buffer:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.54.Final=default,runtimeClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-api:3.2.7=testCompileClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-transport-zerodep:3.2.7=testCompileClasspath,testRuntimeClasspath +com.github.docker-java:docker-java-transport:3.2.7=testCompileClasspath,testRuntimeClasspath +com.h2database:h2:1.4.200=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.56.Final=default,runtimeClasspath,testRuntimeClasspath junit:junit:4.12=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.10.18=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.10.18=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath @@ -48,15 +49,15 @@ org.rnorth.duct-tape:duct-tape:1.0.8=testCompileClasspath,testRuntimeClasspath org.rnorth.visible-assertions:visible-assertions:2.1.2=testCompileClasspath,testRuntimeClasspath org.slf4j:jcl-over-slf4j:1.7.30=testRuntimeClasspath org.slf4j:slf4j-api:1.7.30=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-commons:2.2.11.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework.data:spring-data-mongodb:2.2.11.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-aop:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-beans:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-context:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-core:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-expression:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-jcl:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-test:5.3.1=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.springframework:spring-tx:5.2.10.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.testcontainers:testcontainers:1.15.0=testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-commons:2.2.12.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework.data:spring-data-mongodb:2.2.12.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-aop:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-beans:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-context:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-core:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-expression:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-jcl:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-test:5.3.2=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.springframework:spring-tx:5.2.12.RELEASE=compileClasspath,default,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.testcontainers:testcontainers:1.15.1=testCompileClasspath,testRuntimeClasspath empty=annotationProcessor,archives,compile,compileOnly,runtime,signatures,testAnnotationProcessor,testCompile,testCompileOnly,testRuntime diff --git a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractAggregationTest.java b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractAggregationTest.java index 8c8775f33..e1e799ece 100644 --- a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractAggregationTest.java +++ b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractAggregationTest.java @@ -6,7 +6,7 @@ import static de.bwaldvogel.mongo.backend.TestUtils.jsonList; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -1999,8 +1999,7 @@ void testAggregateWithRedact() { collection.insertOne(json("_id: 1")); collection.insertOne(json("_id: 2")); - List redactedDocuments = collection.aggregate(pipeline).into(new ArrayList<>()); - assertThat(redactedDocuments) + assertThat(collection.aggregate(pipeline)) .containsOnly(json("_id: 1")); } @@ -2014,6 +2013,115 @@ public void testAggregateWithGeoNear() throws Exception { .withMessageContaining("Command failed with error -1: '$geoNear is not yet implemented. See https://github.com/bwaldvogel/mongo-java-server/issues/138'"); } + // https://github.com/bwaldvogel/mongo-java-server/issues/172 + @Test + void testAggregateWithToDouble() throws Exception { + List pipeline = jsonList("$project: {value: {$toDouble: '$x'}}"); + + collection.insertOne(json("_id: 1, x: '12'")); + collection.insertOne(json("_id: 2, x: '7.5'")); + collection.insertOne(json("_id: 3, x: 9")); + collection.insertOne(json("_id: 4, x: false")); + collection.insertOne(json("_id: 5, x: true")); + collection.insertOne(json("_id: 6").append("x", Instant.ofEpochMilli(1234567890L))); + collection.insertOne(json("_id: 7")); + collection.insertOne(json("_id: 8, x: null")); + + assertThat(collection.aggregate(pipeline)) + .containsOnly( + json("_id: 1, value: 12.0"), + json("_id: 2, value: 7.5"), + json("_id: 3, value: 9.0"), + json("_id: 4, value: 0.0"), + json("_id: 5, value: 1.0"), + json("_id: 6, value: 1234567890.0"), + json("_id: 7, value: null"), + json("_id: 8, value: null") + ); + } + + @Test + void testAggregateWithConvertToDouble_illegalValue() throws Exception { + List pipeline = jsonList("$project: {value: {$toDouble: '$x'}}"); + + collection.insertOne(json("_id: 1, x: 'abc'")); + + assertThatExceptionOfType(MongoCommandException.class) + .isThrownBy(() -> collection.aggregate(pipeline).first()) + .withMessageContaining("Command failed with error 241 (ConversionFailure): " + + "'Failed to parse number 'abc' in $convert with no onError value: Did not consume whole number.'"); + } + + @Test + void testAggregateWithToInt() throws Exception { + List pipeline = jsonList("$project: {value: {$toInt: '$x'}}"); + + collection.insertOne(json("_id: 1, x: '12'")); + collection.insertOne(json("_id: 2, x: 9")); + collection.insertOne(json("_id: 3, x: false")); + collection.insertOne(json("_id: 4, x: true")); + collection.insertOne(json("_id: 5")); + collection.insertOne(json("_id: 6, x: null")); + + assertThat(collection.aggregate(pipeline)) + .containsOnly( + json("_id: 1, value: 12"), + json("_id: 2, value: 9"), + json("_id: 3, value: 0"), + json("_id: 4, value: 1"), + json("_id: 5, value: null"), + json("_id: 6, value: null") + ); + } + + @Test + void testAggregateWithConvertToInt_illegalValue() throws Exception { + List pipeline = jsonList("$project: {value: {$toInt: '$x'}}"); + + collection.insertOne(json("_id: 1, x: 'abc'")); + + assertThatExceptionOfType(MongoCommandException.class) + .isThrownBy(() -> collection.aggregate(pipeline).first()) + .withMessageContaining("Command failed with error 241 (ConversionFailure): " + + "'Failed to parse number 'abc' in $convert with no onError value"); + } + + @Test + void testAggregateWithToLong() throws Exception { + List pipeline = jsonList("$project: {value: {$toLong: '$x'}}"); + + collection.insertOne(json("_id: 1, x: '12'")); + collection.insertOne(json("_id: 2, x: 9")); + collection.insertOne(json("_id: 3, x: false")); + collection.insertOne(json("_id: 4, x: true")); + collection.insertOne(json("_id: 5")); + collection.insertOne(json("_id: 6, x: null")); + collection.insertOne(json("_id: 7").append("x", Instant.ofEpochMilli(123456789L))); + + assertThat(collection.aggregate(pipeline)) + .containsOnly( + json("_id: 1").append("value", 12L), + json("_id: 2").append("value", 9L), + json("_id: 3").append("value", 0L), + json("_id: 4").append("value", 1L), + json("_id: 5").append("value", null), + json("_id: 6").append("value", null), + json("_id: 7").append("value", 123456789L) + ); + } + + @Test + void testAggregateWithConvertToLong_illegalValue() throws Exception { + List pipeline = jsonList("$project: {value: {$toLong: '$x'}}"); + + collection.insertOne(json("_id: 1, x: 'abc'")); + + assertThatExceptionOfType(MongoCommandException.class) + .isThrownBy(() -> collection.aggregate(pipeline).first()) + .withMessageContaining("Command failed with error 241 (ConversionFailure): " + + "'Failed to parse number 'abc' in $convert with no onError value"); + } + private static Function withSortedStringList(String key) { return document -> { @SuppressWarnings("unchecked") diff --git a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTest.java b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTest.java index 08e3a7d45..399d4eada 100644 --- a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTest.java +++ b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTest.java @@ -42,7 +42,7 @@ public abstract class AbstractTest { static com.mongodb.reactivestreams.client.MongoCollection asyncCollection; private static MongoServer mongoServer; - private static MongoClient asyncClient; + protected static MongoClient asyncClient; protected static InetSocketAddress serverAddress; protected static MongoBackend backend; @@ -61,7 +61,7 @@ public void setUp() throws Exception { protected void dropAllDatabases() { for (String databaseName : syncClient.listDatabaseNames()) { - if (databaseName.equals("admin") || databaseName.equals("local")) { + if (databaseName.equals("admin") || databaseName.equals("local") || databaseName.equals("config")) { continue; } syncClient.dropDatabase(databaseName); diff --git a/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTransactionTest.java b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTransactionTest.java new file mode 100644 index 000000000..d31fa615d --- /dev/null +++ b/test-common/src/main/java/de/bwaldvogel/mongo/backend/AbstractTransactionTest.java @@ -0,0 +1,50 @@ +package de.bwaldvogel.mongo.backend; + +import static com.mongodb.client.model.Updates.set; +import static de.bwaldvogel.mongo.backend.TestUtils.json; + +import org.bson.Document; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; +import com.mongodb.client.ClientSession; + +public abstract class AbstractTransactionTest extends AbstractTest { + + private static final Logger log = LoggerFactory.getLogger(AbstractTransactionTest.class); + + @Test + public void testTransactionUpdate() { + collection.insertOne(json("_id: 5, value: 100")); + ClientSession clientSession = syncClient.startSession(); + TransactionOptions txnOptions = TransactionOptions.builder() + .readPreference(ReadPreference.primary()) + .readConcern(ReadConcern.LOCAL) + .writeConcern(WriteConcern.MAJORITY) + .build(); + clientSession.startTransaction(txnOptions); + collection.updateOne(clientSession, json("_id: 5"), set("value", 3)); + collection.updateOne(clientSession, json("_id: 5"), set("value", 2)); + + Document doc = collection.find(json("_id: 5")).first(); + assertThat(doc).isNotNull(); + assertThat(doc.get("value")).isEqualTo(100); + + try { + clientSession.commitTransaction(); + } catch (RuntimeException e) { + log.error(e.getMessage()); + } finally { + clientSession.close(); + } + + doc = collection.find(json("_id: 5")).first(); + assertThat(doc).isNotNull(); + assertThat(doc.get("value")).isEqualTo(2); + } +} \ No newline at end of file diff --git a/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoReplicaSet.java b/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoReplicaSet.java new file mode 100644 index 000000000..a2fd59484 --- /dev/null +++ b/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoReplicaSet.java @@ -0,0 +1,62 @@ +package de.bwaldvogel.mongo; + +import static org.testcontainers.containers.Network.newNetwork; + +import java.net.InetSocketAddress; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; + +public final class RealMongoReplicaSet { + public static RealMongoReplicaSet INSTANCE = new RealMongoReplicaSet(); + private static final int MONGO_PORT = 27017; + private static final String MONGO_IMAGE = "mongo:4.2.8"; + + private InetSocketAddress address; + + @SuppressWarnings("checkstyle:IllegalCatch") + private RealMongoReplicaSet() { + Network network = newNetwork(); + + GenericContainer m1 = new GenericContainer<>(MONGO_IMAGE) + .withNetwork(network) + .withNetworkAliases("M1") + .withExposedPorts(MONGO_PORT) + .withCommand("--replSet rs0 --bind_ip localhost,M1"); + + GenericContainer m2 = new GenericContainer<>(MONGO_IMAGE) + .withNetwork(network) + .withNetworkAliases("M2") + .withExposedPorts(MONGO_PORT) + .withCommand("--replSet rs0 --bind_ip localhost,M2"); + + GenericContainer m3 = new GenericContainer<>(MONGO_IMAGE) + .withNetwork(network) + .withNetworkAliases("M3") + .withExposedPorts(MONGO_PORT) + .withCommand("--replSet rs0 --bind_ip localhost,M3"); + + m1.start(); + m2.start(); + m3.start(); + + try { + m1.execInContainer("/bin/bash", "-c", + "mongo --eval 'printjson(rs.initiate({_id:\"rs0\"," + + "members:[{_id:0,host:\"M1:27017\"},{_id:1,host:\"M2:27017\"},{_id:2,host:\"M3:27017\"}]}))' " + + "--quiet"); + m1.execInContainer("/bin/bash", "-c", + "until mongo --eval \"printjson(rs.isMaster())\" | grep ismaster | grep true > /dev/null 2>&1;" + + "do sleep 1;done"); + } catch (Exception e) { + throw new IllegalStateException("Failed to initiate rs.", e); + } + + address = new InetSocketAddress(m1.getContainerIpAddress(), m1.getFirstMappedPort()); + } + + public InetSocketAddress getAddress() { + return address; + } + +} \ No newline at end of file diff --git a/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoTransactionsTest.java b/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoTransactionsTest.java new file mode 100644 index 000000000..fbdbf2d86 --- /dev/null +++ b/test-common/src/test/java/de/bwaldvogel/mongo/RealMongoTransactionsTest.java @@ -0,0 +1,38 @@ +package de.bwaldvogel.mongo; + +import java.net.InetSocketAddress; + +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import de.bwaldvogel.mongo.backend.AbstractTransactionTest; + +public class RealMongoTransactionsTest extends AbstractTransactionTest { + + private static final Logger log = LoggerFactory.getLogger(RealMongoTransactionsTest.class); + private static RealMongoReplicaSet mongoReplicaSet; + + @BeforeAll + public static void setUpMongoContainer() { + if (Boolean.getBoolean("mongo-java-server-use-existing-container")) { + log.info("Not starting a test container in favor of an existing container."); + return; + } + mongoReplicaSet = RealMongoReplicaSet.INSTANCE; + } + + @Override + protected MongoBackend createBackend() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected void setUpBackend() throws Exception { + if (mongoReplicaSet != null) { + serverAddress = mongoReplicaSet.getAddress(); + } else { + serverAddress = new InetSocketAddress("127.0.0.1", 27018); + } + } +} \ No newline at end of file