diff --git a/pom.xml b/pom.xml index dfa0372..4555872 100644 --- a/pom.xml +++ b/pom.xml @@ -7,17 +7,60 @@ db-scheduler-mongo com.github.piemjean - 9.3 + 11.0.0-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + piemjean + piem.jean@gmail.com + + + + + scm:git:git://github.com/piemjean/db-scheduler-mopngo.git + scm:git:ssh://github.com:piemjean/db-scheduler-mopngo.git + https://github.com/piemjean/db-scheduler-mongo/tree/master + + + + + + ${nexus.server.projet.id} + Releases of ${project.artifactId} + ${nexus.releases.repository}/ + + + + + true + ${nexus.server.projet.id} + Snapshots of ${project.artifactId} + ${nexus.snapshots.repository}/ + + + + + ${nexus.server.projet.id} + ${project.url} + + + 1.8 ${jdk.version} ${jdk.version} - 9.3 + 11.0 3.12.7 1.7.30 3.0.0 @@ -26,6 +69,7 @@ 3.19.0 30.1-jre 2.22.2 + UTF-8 @@ -129,6 +173,46 @@ + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + org.apache.maven.plugins maven-surefire-plugin @@ -137,27 +221,4 @@ - - - - ${nexus.server.projet.id} - Releases of ${project.artifactId} - ${nexus.releases.repository}/ - - - - - true - ${nexus.server.projet.id} - Snapshots of ${project.artifactId} - ${nexus.snapshots.repository}/ - - - - - ${nexus.server.projet.id} - ${project.url} - - - diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java index e0bb964..7e04081 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java @@ -1,5 +1,6 @@ package com.github.kagkarlsson.scheduler; +import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; @@ -12,30 +13,33 @@ public class MongoScheduler extends Scheduler { - protected MongoScheduler(Clock clock, - TaskRepository schedulerTaskRepository, - TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, - ExecutorService executorService, - SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, - boolean enableImmediateExecution, - StatsRegistry statsRegistry, int pollingLimit, - Duration deleteUnresolvedAfter, Duration shutdownMaxWait, - List onStartup) { - super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, - executorService, schedulerName, executeDueWaiter, heartbeatInterval, - enableImmediateExecution, - statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, onStartup); - } + protected MongoScheduler(Clock clock, + TaskRepository schedulerTaskRepository, + TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, + ExecutorService executorService, + SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, + boolean enableImmediateExecution, + StatsRegistry statsRegistry, + Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, + boolean logStackTrace, + List onStartup) { + super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, + executorService, schedulerName, executeDueWaiter, heartbeatInterval, + enableImmediateExecution, + statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, deleteUnresolvedAfter, shutdownMaxWait, + logLevel, + logStackTrace, onStartup); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, Task... knownTasks) { - List> knownTasksList = new ArrayList<>(); - knownTasksList.addAll(Arrays.asList(knownTasks)); - return create(mongoClient, database, collection, knownTasksList); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, Task... knownTasks) { + List> knownTasksList = new ArrayList<>(); + knownTasksList.addAll(Arrays.asList(knownTasks)); + return create(mongoClient, database, collection, knownTasksList); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, List> knownTasks) { - return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, List> knownTasks) { + return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); + } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java index 57c7ded..3fdbf6d 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java @@ -27,6 +27,7 @@ public class MongoSchedulerBuilder extends SchedulerBuilder { * * @param mongoClient - object handling mongo connection * @param databaseName - mongo database name + * @param collection - name of collection in which tasks are stored * @param knownTasks - list of known tasks */ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, String collection, @@ -39,10 +40,6 @@ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, St @Override public Scheduler build() { - if (pollingLimit < executorThreads) { - LOG.warn("Polling-limit is less than number of threads. Should be equal or higher."); - } - if (schedulerName == null) { schedulerName = new SchedulerName.Hostname(); } @@ -50,9 +47,9 @@ public Scheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); final MongoTaskRepository mongoTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); final MongoTaskRepository clientTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); ExecutorService candidateExecutorService = executorService; if (candidateExecutorService == null) { @@ -69,7 +66,7 @@ public Scheduler build() { schedulerName.getName()); return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService, - schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, - deleteUnresolvedAfter, shutdownMaxWait, startTasks); + schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, + deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks); } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java index c240fab..ae0075e 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java @@ -17,6 +17,7 @@ import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.mongodb.ErrorCategory; @@ -30,6 +31,8 @@ import com.mongodb.client.model.IndexOptions; import com.mongodb.client.model.ReturnDocument; import com.mongodb.client.result.DeleteResult; + +import java.sql.PreparedStatement; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -53,15 +56,17 @@ public class MongoTaskRepository implements TaskRepository { private final TaskResolver taskResolver; private final SchedulerName schedulerSchedulerName; private final Serializer serializer; + private final Clock clock; private final MongoCollection collection; public MongoTaskRepository(TaskResolver taskResolver, - SchedulerName schedulerSchedulerName, - Serializer serializer, String databaseName, String tableName, MongoClient mongoClient) { + SchedulerName schedulerSchedulerName, + Serializer serializer, String databaseName, String tableName, MongoClient mongoClient, Clock clock) { this.taskResolver = taskResolver; this.schedulerSchedulerName = schedulerSchedulerName; this.serializer = serializer; + this.clock = clock; CodecRegistry pojoCodecRegistry = fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), @@ -87,11 +92,14 @@ private void uniqueIndexCreation() { } @Override - public boolean createIfNotExists(Execution execution) { - LOG.debug("Creation request for execution {}", execution); + public boolean createIfNotExists(SchedulableInstance schedulableInstance) { + LOG.debug("Creation request for execution {}", schedulableInstance); + Optional execution = this.getExecution(schedulableInstance.getTaskName(), schedulableInstance.getId()); + if (execution.isPresent()) { + return false; + } // Search criterion : taskName, taskInstance - final Bson query = buildFilterFromExecution(execution, false); - Optional taskEntityOpt = toEntity(execution); + Optional taskEntityOpt = toEntity(schedulableInstance); if (!taskEntityOpt.isPresent()) { return false; } @@ -196,6 +204,11 @@ public void getScheduledExecutions( .filter(Optional::isPresent).map(Optional::get).forEach(consumer); } + @Override + public List lockAndGetDue(Instant instant, int i) { + throw new UnsupportedOperationException("lockAndFetch not supported in mongodatabase"); + } + @Override public void remove(Execution execution) { final Bson filter = buildFilterFromExecution(execution); @@ -355,32 +368,32 @@ public int removeExecutions(String taskName) { return Long.valueOf(deleted.getDeletedCount()).intValue(); } + @Override + public void checkSupportsLockAndFetch() { + throw new IllegalArgumentException("Mongodb does not support specific lock-and-fetch since operations are already atomic"); + } + /** * Mapping method to get an Entity from an Execution * * @param in - Execution to map * @return TaskEntity mapped from execution */ - private Optional toEntity(Execution in) { + private Optional toEntity(SchedulableInstance in) { if (in == null) { return Optional.empty(); } TaskEntity out = new TaskEntity(); - Optional> taskInstanceOpt = Optional - .ofNullable(in.taskInstance); + Optional> taskInstanceOpt = Optional.ofNullable(in.getTaskInstance()); taskInstanceOpt.map(TaskInstance::getTaskName).ifPresent(out::setTaskName); taskInstanceOpt.map(TaskInstance::getId).ifPresent(out::setTaskInstance); taskInstanceOpt.map(TaskInstance::getData).map(serializer::serialize) .ifPresent(out::setTaskData); - out.setExecutionTime(in.getExecutionTime()); - out.setPicked(in.isPicked()); - out.setPickedBy(in.pickedBy); - out.setLastFailure(in.lastFailure); - out.setLastSuccess(in.lastSuccess); - out.setLastHeartbeat(in.lastHeartbeat); - out.setVersion(in.version); + out.setExecutionTime(in.getNextExecutionTime(clock.now())); + out.setPicked(false); + out.setVersion(1); return Optional.of(out); } diff --git a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java index d437e24..3a5b572 100644 --- a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java +++ b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java @@ -3,9 +3,9 @@ import static org.assertj.core.api.Assertions.assertThat; import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; -import com.github.kagkarlsson.scheduler.task.Execution; -import com.github.kagkarlsson.scheduler.task.Task; -import com.github.kagkarlsson.scheduler.task.TaskInstance; +import com.github.kagkarlsson.scheduler.task.*; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.utils.ExecutionBuilder; import com.github.kagkarlsson.scheduler.utils.TestUtils; import com.mongodb.ErrorCategory; @@ -62,15 +62,18 @@ void init() throws IOException { .thenReturn(Optional.of(taskResolved)); repository = new MongoTaskRepository(taskResolver, schedulerName, serializer, - "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient()); + "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient(), + new SystemClock()); } @Test void testCreateIfNotExistsOk() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); assertThat(created).isTrue(); @@ -94,8 +97,10 @@ void testCreateIfNotExistsOk() { @Test void testCreateIfNotExistsKo() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); TaskEntity taskEntityInitial = new TaskEntity(); taskEntityInitial.setTaskInstance("idInstance"); @@ -103,7 +108,7 @@ void testCreateIfNotExistsKo() { this.emebddedMongodbExtension.getCollection().insertOne(taskEntityInitial); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); // Check that no execution is created because it already exists assertThat(created).isFalse(); }