diff --git a/pom.xml b/pom.xml index dfa0372..0b2ed24 100644 --- a/pom.xml +++ b/pom.xml @@ -7,25 +7,69 @@ db-scheduler-mongo com.github.piemjean - 9.3 + 11.0.0-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + piemjean + piem.jean@gmail.com + + + + + scm:git:git://github.com/piemjean/db-scheduler-mopngo.git + scm:git:ssh://github.com:piemjean/db-scheduler-mopngo.git + https://github.com/piemjean/db-scheduler-mongo/tree/master + + + + + + ${nexus.server.projet.id} + Releases of ${project.artifactId} + ${nexus.releases.repository}/ + + + + + true + ${nexus.server.projet.id} + Snapshots of ${project.artifactId} + ${nexus.snapshots.repository}/ + + + + + ${nexus.server.projet.id} + ${project.url} + + + 1.8 ${jdk.version} ${jdk.version} - 9.3 - 3.12.7 + 11.0 + 4.5.1 1.7.30 - 3.0.0 + 3.4.5 5.6.0 3.7.7 3.19.0 30.1-jre 2.22.2 + UTF-8 @@ -37,7 +81,20 @@ org.mongodb - mongo-java-driver + mongodb-driver-sync + ${mongo-java-driver.version} + + + + + org.mongodb + mongodb-driver-core + ${mongo-java-driver.version} + + + + org.mongodb + bson ${mongo-java-driver.version} @@ -116,6 +173,12 @@ de.flapdoodle.embed.mongo ${embed-mongo.version} test + + + com.mongodb + mongo-java-driver + + @@ -129,6 +192,46 @@ + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + org.apache.maven.plugins maven-surefire-plugin @@ -137,27 +240,4 @@ - - - - ${nexus.server.projet.id} - Releases of ${project.artifactId} - ${nexus.releases.repository}/ - - - - - true - ${nexus.server.projet.id} - Snapshots of ${project.artifactId} - ${nexus.snapshots.repository}/ - - - - - ${nexus.server.projet.id} - ${project.url} - - - diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java index e0bb964..2cf4dd3 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java @@ -1,9 +1,11 @@ package com.github.kagkarlsson.scheduler; +import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; -import com.mongodb.MongoClient; +import com.mongodb.client.MongoClient; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -12,30 +14,33 @@ public class MongoScheduler extends Scheduler { - protected MongoScheduler(Clock clock, - TaskRepository schedulerTaskRepository, - TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, - ExecutorService executorService, - SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, - boolean enableImmediateExecution, - StatsRegistry statsRegistry, int pollingLimit, - Duration deleteUnresolvedAfter, Duration shutdownMaxWait, - List onStartup) { - super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, - executorService, schedulerName, executeDueWaiter, heartbeatInterval, - enableImmediateExecution, - statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, onStartup); - } + protected MongoScheduler(Clock clock, + TaskRepository schedulerTaskRepository, + TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, + ExecutorService executorService, + SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, + boolean enableImmediateExecution, + StatsRegistry statsRegistry, + Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, + boolean logStackTrace, + List onStartup) { + super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, + executorService, schedulerName, executeDueWaiter, heartbeatInterval, + enableImmediateExecution, + statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, deleteUnresolvedAfter, shutdownMaxWait, + logLevel, + logStackTrace, onStartup); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, Task... knownTasks) { - List> knownTasksList = new ArrayList<>(); - knownTasksList.addAll(Arrays.asList(knownTasks)); - return create(mongoClient, database, collection, knownTasksList); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, Task... knownTasks) { + List> knownTasksList = new ArrayList<>(); + knownTasksList.addAll(Arrays.asList(knownTasks)); + return create(mongoClient, database, collection, knownTasksList); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, List> knownTasks) { - return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, List> knownTasks) { + return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); + } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java index 57c7ded..9e19f1e 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java @@ -4,10 +4,11 @@ import static com.github.kagkarlsson.scheduler.Scheduler.THREAD_PREFIX; import com.github.kagkarlsson.scheduler.task.Task; -import com.mongodb.MongoClient; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + +import com.mongodb.client.MongoClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +28,11 @@ public class MongoSchedulerBuilder extends SchedulerBuilder { * * @param mongoClient - object handling mongo connection * @param databaseName - mongo database name + * @param collection - name of collection in which tasks are stored * @param knownTasks - list of known tasks */ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, String collection, - List> knownTasks) { + List> knownTasks) { super(null, knownTasks); this.mongoClient = mongoClient; this.databaseName = databaseName; @@ -39,10 +41,6 @@ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, St @Override public Scheduler build() { - if (pollingLimit < executorThreads) { - LOG.warn("Polling-limit is less than number of threads. Should be equal or higher."); - } - if (schedulerName == null) { schedulerName = new SchedulerName.Hostname(); } @@ -50,9 +48,9 @@ public Scheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); final MongoTaskRepository mongoTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); final MongoTaskRepository clientTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); ExecutorService candidateExecutorService = executorService; if (candidateExecutorService == null) { @@ -69,7 +67,7 @@ public Scheduler build() { schedulerName.getName()); return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService, - schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, - deleteUnresolvedAfter, shutdownMaxWait, startTasks); + schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, + deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks); } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java index c240fab..433b516 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java @@ -17,19 +17,21 @@ import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.mongodb.ErrorCategory; -import com.mongodb.MongoClient; import com.mongodb.MongoClientSettings; import com.mongodb.MongoWriteException; import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.FindOneAndUpdateOptions; import com.mongodb.client.model.IndexOptions; import com.mongodb.client.model.ReturnDocument; import com.mongodb.client.result.DeleteResult; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -53,15 +55,17 @@ public class MongoTaskRepository implements TaskRepository { private final TaskResolver taskResolver; private final SchedulerName schedulerSchedulerName; private final Serializer serializer; + private final Clock clock; private final MongoCollection collection; public MongoTaskRepository(TaskResolver taskResolver, - SchedulerName schedulerSchedulerName, - Serializer serializer, String databaseName, String tableName, MongoClient mongoClient) { + SchedulerName schedulerSchedulerName, + Serializer serializer, String databaseName, String tableName, MongoClient mongoClient, Clock clock) { this.taskResolver = taskResolver; this.schedulerSchedulerName = schedulerSchedulerName; this.serializer = serializer; + this.clock = clock; CodecRegistry pojoCodecRegistry = fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), @@ -87,11 +91,14 @@ private void uniqueIndexCreation() { } @Override - public boolean createIfNotExists(Execution execution) { - LOG.debug("Creation request for execution {}", execution); + public boolean createIfNotExists(SchedulableInstance schedulableInstance) { + LOG.debug("Creation request for execution {}", schedulableInstance); + Optional execution = this.getExecution(schedulableInstance.getTaskName(), schedulableInstance.getId()); + if (execution.isPresent()) { + return false; + } // Search criterion : taskName, taskInstance - final Bson query = buildFilterFromExecution(execution, false); - Optional taskEntityOpt = toEntity(execution); + Optional taskEntityOpt = toEntity(schedulableInstance); if (!taskEntityOpt.isPresent()) { return false; } @@ -196,6 +203,11 @@ public void getScheduledExecutions( .filter(Optional::isPresent).map(Optional::get).forEach(consumer); } + @Override + public List lockAndGetDue(Instant instant, int i) { + throw new UnsupportedOperationException("lockAndFetch not supported in mongodatabase"); + } + @Override public void remove(Execution execution) { final Bson filter = buildFilterFromExecution(execution); @@ -355,32 +367,32 @@ public int removeExecutions(String taskName) { return Long.valueOf(deleted.getDeletedCount()).intValue(); } + @Override + public void checkSupportsLockAndFetch() { + throw new IllegalArgumentException("Mongodb does not support specific lock-and-fetch since operations are already atomic"); + } + /** * Mapping method to get an Entity from an Execution * * @param in - Execution to map * @return TaskEntity mapped from execution */ - private Optional toEntity(Execution in) { + private Optional toEntity(SchedulableInstance in) { if (in == null) { return Optional.empty(); } TaskEntity out = new TaskEntity(); - Optional> taskInstanceOpt = Optional - .ofNullable(in.taskInstance); + Optional> taskInstanceOpt = Optional.ofNullable(in.getTaskInstance()); taskInstanceOpt.map(TaskInstance::getTaskName).ifPresent(out::setTaskName); taskInstanceOpt.map(TaskInstance::getId).ifPresent(out::setTaskInstance); taskInstanceOpt.map(TaskInstance::getData).map(serializer::serialize) .ifPresent(out::setTaskData); - out.setExecutionTime(in.getExecutionTime()); - out.setPicked(in.isPicked()); - out.setPickedBy(in.pickedBy); - out.setLastFailure(in.lastFailure); - out.setLastSuccess(in.lastSuccess); - out.setLastHeartbeat(in.lastHeartbeat); - out.setVersion(in.version); + out.setExecutionTime(in.getNextExecutionTime(clock.now())); + out.setPicked(false); + out.setVersion(1); return Optional.of(out); } diff --git a/src/test/java/com/github/kagkarlsson/scheduler/EmebddedMongodbExtension.java b/src/test/java/com/github/kagkarlsson/scheduler/EmebddedMongodbExtension.java index 9a431f8..25890d3 100644 --- a/src/test/java/com/github/kagkarlsson/scheduler/EmebddedMongodbExtension.java +++ b/src/test/java/com/github/kagkarlsson/scheduler/EmebddedMongodbExtension.java @@ -5,8 +5,8 @@ import com.github.kagkarlsson.scheduler.utils.TestUtils; import com.github.kagkarlsson.scheduler.utils.TestUtils.MongoTools; -import com.mongodb.MongoClient; import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import java.io.IOException; diff --git a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java index d437e24..2c7c0cf 100644 --- a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java +++ b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java @@ -3,9 +3,9 @@ import static org.assertj.core.api.Assertions.assertThat; import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; -import com.github.kagkarlsson.scheduler.task.Execution; -import com.github.kagkarlsson.scheduler.task.Task; -import com.github.kagkarlsson.scheduler.task.TaskInstance; +import com.github.kagkarlsson.scheduler.task.*; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.utils.ExecutionBuilder; import com.github.kagkarlsson.scheduler.utils.TestUtils; import com.mongodb.ErrorCategory; @@ -62,15 +62,18 @@ void init() throws IOException { .thenReturn(Optional.of(taskResolved)); repository = new MongoTaskRepository(taskResolver, schedulerName, serializer, - "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient()); + "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient(), + new SystemClock()); } @Test void testCreateIfNotExistsOk() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); assertThat(created).isTrue(); @@ -94,8 +97,10 @@ void testCreateIfNotExistsOk() { @Test void testCreateIfNotExistsKo() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); TaskEntity taskEntityInitial = new TaskEntity(); taskEntityInitial.setTaskInstance("idInstance"); @@ -103,7 +108,7 @@ void testCreateIfNotExistsKo() { this.emebddedMongodbExtension.getCollection().insertOne(taskEntityInitial); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); // Check that no execution is created because it already exists assertThat(created).isFalse(); } @@ -639,7 +644,7 @@ void testIndexUniqueness() { .collect(Collectors.toList()); // Check exception - ErrorCategory category = ErrorCategory.fromErrorCode(exception.getCode()); + ErrorCategory category = ErrorCategory.fromErrorCode(exception.getError().getCode()); assertThat(category).isEqualTo(ErrorCategory.DUPLICATE_KEY); // Check that only one document was inserted diff --git a/src/test/java/com/github/kagkarlsson/scheduler/utils/TestUtils.java b/src/test/java/com/github/kagkarlsson/scheduler/utils/TestUtils.java index 07b9848..314bdf1 100644 --- a/src/test/java/com/github/kagkarlsson/scheduler/utils/TestUtils.java +++ b/src/test/java/com/github/kagkarlsson/scheduler/utils/TestUtils.java @@ -1,6 +1,9 @@ package com.github.kagkarlsson.scheduler.utils; -import com.mongodb.MongoClient; +import com.mongodb.MongoClientSettings; +import com.mongodb.ServerAddress; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import de.flapdoodle.embed.mongo.MongodExecutable; import de.flapdoodle.embed.mongo.MongodProcess; import de.flapdoodle.embed.mongo.MongodStarter; @@ -9,13 +12,10 @@ import de.flapdoodle.embed.mongo.distribution.Version; import de.flapdoodle.embed.process.runtime.Network; import java.io.IOException; -import java.net.ServerSocket; import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Arrays; +import java.util.Collections; public class TestUtils { @@ -61,7 +61,10 @@ public static MongoTools startEmbeddedMongo() throws IOException { MongodExecutable mongodExecutable = starter.prepare(mongodConfig); MongodProcess mongod = mongodExecutable.start(); - MongoClient mongoClient = new MongoClient("localhost", port); + MongoClient mongoClient = MongoClients.create(MongoClientSettings.builder() + .applyToClusterSettings(b -> b.hosts(Collections.singletonList( + new ServerAddress("localhost", port) + ))).build()); MongoTools mongoTools = new MongoTools(); mongoTools.setClient(mongoClient);