From e5b2cefc7d92f969ab54aace871d0393c210f779 Mon Sep 17 00:00:00 2001 From: piemjean Date: Sat, 6 Feb 2021 11:35:43 +0100 Subject: [PATCH 1/6] update pom.xml for development snapshot --- pom.xml | 108 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 84 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index dfa0372..e50badc 100644 --- a/pom.xml +++ b/pom.xml @@ -7,11 +7,54 @@ db-scheduler-mongo com.github.piemjean - 9.3 + develop-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + piemjean + piem.jean@gmail.com + + + + + scm:git:git://github.com/piemjean/db-scheduler-mopngo.git + scm:git:ssh://github.com:piemjean/db-scheduler-mopngo.git + https://github.com/piemjean/db-scheduler-mongo/tree/master + + + + + + ${nexus.server.projet.id} + Releases of ${project.artifactId} + ${nexus.releases.repository}/ + + + + + true + ${nexus.server.projet.id} + Snapshots of ${project.artifactId} + ${nexus.snapshots.repository}/ + + + + + ${nexus.server.projet.id} + ${project.url} + + + 1.8 ${jdk.version} @@ -129,6 +172,46 @@ + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + org.apache.maven.plugins maven-surefire-plugin @@ -137,27 +220,4 @@ - - - - ${nexus.server.projet.id} - Releases of ${project.artifactId} - ${nexus.releases.repository}/ - - - - - true - ${nexus.server.projet.id} - Snapshots of ${project.artifactId} - ${nexus.snapshots.repository}/ - - - - - ${nexus.server.projet.id} - ${project.url} - - - From 6d97fb50b63b705c42632fe432f3e5ffdd0d6e05 Mon Sep 17 00:00:00 2001 From: piemjean Date: Mon, 26 Apr 2021 09:46:27 +0200 Subject: [PATCH 2/6] Init of release 9.4.0 with adaptation from db-scheduler 9.4 --- pom.xml | 4 +- .../kagkarlsson/scheduler/MongoScheduler.java | 51 ++++++++++--------- .../scheduler/MongoSchedulerBuilder.java | 3 +- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index e50badc..7e3f420 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ db-scheduler-mongo com.github.piemjean - develop-SNAPSHOT + 9.4.0-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo @@ -60,7 +60,7 @@ ${jdk.version} ${jdk.version} - 9.3 + 9.4 3.12.7 1.7.30 3.0.0 diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java index e0bb964..49ce466 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java @@ -1,5 +1,6 @@ package com.github.kagkarlsson.scheduler; +import com.github.kagkarlsson.scheduler.logging.LogLevel; import com.github.kagkarlsson.scheduler.stats.StatsRegistry; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; @@ -12,30 +13,32 @@ public class MongoScheduler extends Scheduler { - protected MongoScheduler(Clock clock, - TaskRepository schedulerTaskRepository, - TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, - ExecutorService executorService, - SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, - boolean enableImmediateExecution, - StatsRegistry statsRegistry, int pollingLimit, - Duration deleteUnresolvedAfter, Duration shutdownMaxWait, - List onStartup) { - super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, - executorService, schedulerName, executeDueWaiter, heartbeatInterval, - enableImmediateExecution, - statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, onStartup); - } + protected MongoScheduler(Clock clock, + TaskRepository schedulerTaskRepository, + TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize, + ExecutorService executorService, + SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, + boolean enableImmediateExecution, + StatsRegistry statsRegistry, int pollingLimit, + Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, + boolean logStackTrace, + List onStartup) { + super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, + executorService, schedulerName, executeDueWaiter, heartbeatInterval, + enableImmediateExecution, + statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, logLevel, + logStackTrace, onStartup); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, Task... knownTasks) { - List> knownTasksList = new ArrayList<>(); - knownTasksList.addAll(Arrays.asList(knownTasks)); - return create(mongoClient, database, collection, knownTasksList); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, Task... knownTasks) { + List> knownTasksList = new ArrayList<>(); + knownTasksList.addAll(Arrays.asList(knownTasks)); + return create(mongoClient, database, collection, knownTasksList); + } - public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, - String collection, List> knownTasks) { - return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); - } + public static MongoSchedulerBuilder create(MongoClient mongoClient, String database, + String collection, List> knownTasks) { + return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks); + } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java index 57c7ded..bc7e785 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java @@ -27,6 +27,7 @@ public class MongoSchedulerBuilder extends SchedulerBuilder { * * @param mongoClient - object handling mongo connection * @param databaseName - mongo database name + * @param collection - name of collection in which tasks are stored * @param knownTasks - list of known tasks */ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, String collection, @@ -70,6 +71,6 @@ public Scheduler build() { return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService, schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, - deleteUnresolvedAfter, shutdownMaxWait, startTasks); + deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks); } } From 7edc3e5880726a78cd14d4c22638f5b86882f99b Mon Sep 17 00:00:00 2001 From: piemjean Date: Mon, 26 Apr 2021 09:49:37 +0200 Subject: [PATCH 3/6] Final version change in pom.xml for 9.4.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7e3f420..138beeb 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ db-scheduler-mongo com.github.piemjean - 9.4.0-SNAPSHOT + 9.4.0 db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo From b746b05d378956cba77a1efa0796386ab7ad1218 Mon Sep 17 00:00:00 2001 From: piemjean Date: Mon, 26 Apr 2021 10:28:17 +0200 Subject: [PATCH 4/6] Update version in pom.xml to initiate update to db-scheduler 10.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 138beeb..fee6db5 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ db-scheduler-mongo com.github.piemjean - 9.4.0 + 10.0.0-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo From 1a2f7c8a07ec91f65772db9d03f6b3e9f3cbc12d Mon Sep 17 00:00:00 2001 From: piemjean Date: Mon, 26 Apr 2021 10:50:22 +0200 Subject: [PATCH 5/6] Init of integration of db-scheduler 10.0 --- pom.xml | 2 +- .../github/kagkarlsson/scheduler/MongoScheduler.java | 5 +++-- .../kagkarlsson/scheduler/MongoSchedulerBuilder.java | 6 +----- .../kagkarlsson/scheduler/MongoTaskRepository.java | 11 ++++++++++- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index fee6db5..5a6f1cf 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ ${jdk.version} ${jdk.version} - 9.4 + 10.0 3.12.7 1.7.30 3.0.0 diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java index 49ce466..7e04081 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java @@ -19,14 +19,15 @@ protected MongoScheduler(Clock clock, ExecutorService executorService, SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, - StatsRegistry statsRegistry, int pollingLimit, + StatsRegistry statsRegistry, Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel, boolean logStackTrace, List onStartup) { super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize, executorService, schedulerName, executeDueWaiter, heartbeatInterval, enableImmediateExecution, - statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, logLevel, + statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, deleteUnresolvedAfter, shutdownMaxWait, + logLevel, logStackTrace, onStartup); } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java index bc7e785..85961e4 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java @@ -40,10 +40,6 @@ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, St @Override public Scheduler build() { - if (pollingLimit < executorThreads) { - LOG.warn("Polling-limit is less than number of threads. Should be equal or higher."); - } - if (schedulerName == null) { schedulerName = new SchedulerName.Hostname(); } @@ -70,7 +66,7 @@ public Scheduler build() { schedulerName.getName()); return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService, - schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, + schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks); } } diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java index c240fab..27db12a 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java @@ -90,7 +90,6 @@ private void uniqueIndexCreation() { public boolean createIfNotExists(Execution execution) { LOG.debug("Creation request for execution {}", execution); // Search criterion : taskName, taskInstance - final Bson query = buildFilterFromExecution(execution, false); Optional taskEntityOpt = toEntity(execution); if (!taskEntityOpt.isPresent()) { return false; @@ -196,6 +195,11 @@ public void getScheduledExecutions( .filter(Optional::isPresent).map(Optional::get).forEach(consumer); } + @Override + public List lockAndGetDue(Instant instant, int i) { + throw new UnsupportedOperationException("lockAndFetch not supported in mongodatabase"); + } + @Override public void remove(Execution execution) { final Bson filter = buildFilterFromExecution(execution); @@ -355,6 +359,11 @@ public int removeExecutions(String taskName) { return Long.valueOf(deleted.getDeletedCount()).intValue(); } + @Override + public void checkSupportsLockAndFetch() { + throw new IllegalArgumentException("Mongodb does not support specific lock-and-fetch since operations are already atomic"); + } + /** * Mapping method to get an Entity from an Execution * From ae6af8509ff70c393a011e75b7f754ddf0a4a6a1 Mon Sep 17 00:00:00 2001 From: Karsten Ohme Date: Mon, 4 Apr 2022 23:00:39 +0200 Subject: [PATCH 6/6] Support for db-scheduler version 11 --- pom.xml | 5 +-- .../scheduler/MongoSchedulerBuilder.java | 4 +-- .../scheduler/MongoTaskRepository.java | 34 +++++++++++-------- .../scheduler/MongoTaskRepositoryTest.java | 25 ++++++++------ 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index 5a6f1cf..4555872 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ db-scheduler-mongo com.github.piemjean - 10.0.0-SNAPSHOT + 11.0.0-SNAPSHOT db-scheduler: Mongodb support Module providing Mongodb support for db-scheduler https://github.com/piemjean/db-scheduler-mongo @@ -60,7 +60,7 @@ ${jdk.version} ${jdk.version} - 10.0 + 11.0 3.12.7 1.7.30 3.0.0 @@ -69,6 +69,7 @@ 3.19.0 30.1-jre 2.22.2 + UTF-8 diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java index 85961e4..3fdbf6d 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoSchedulerBuilder.java @@ -47,9 +47,9 @@ public Scheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); final MongoTaskRepository mongoTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); final MongoTaskRepository clientTaskRepository = new MongoTaskRepository(taskResolver, - schedulerName, serializer, databaseName, tableName, mongoClient); + schedulerName, serializer, databaseName, tableName, mongoClient, clock); ExecutorService candidateExecutorService = executorService; if (candidateExecutorService == null) { diff --git a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java index 27db12a..ae0075e 100644 --- a/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java +++ b/src/main/java/com/github/kagkarlsson/scheduler/MongoTaskRepository.java @@ -17,6 +17,7 @@ import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.task.Execution; +import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.Task; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.mongodb.ErrorCategory; @@ -30,6 +31,8 @@ import com.mongodb.client.model.IndexOptions; import com.mongodb.client.model.ReturnDocument; import com.mongodb.client.result.DeleteResult; + +import java.sql.PreparedStatement; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -53,15 +56,17 @@ public class MongoTaskRepository implements TaskRepository { private final TaskResolver taskResolver; private final SchedulerName schedulerSchedulerName; private final Serializer serializer; + private final Clock clock; private final MongoCollection collection; public MongoTaskRepository(TaskResolver taskResolver, - SchedulerName schedulerSchedulerName, - Serializer serializer, String databaseName, String tableName, MongoClient mongoClient) { + SchedulerName schedulerSchedulerName, + Serializer serializer, String databaseName, String tableName, MongoClient mongoClient, Clock clock) { this.taskResolver = taskResolver; this.schedulerSchedulerName = schedulerSchedulerName; this.serializer = serializer; + this.clock = clock; CodecRegistry pojoCodecRegistry = fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), @@ -87,10 +92,14 @@ private void uniqueIndexCreation() { } @Override - public boolean createIfNotExists(Execution execution) { - LOG.debug("Creation request for execution {}", execution); + public boolean createIfNotExists(SchedulableInstance schedulableInstance) { + LOG.debug("Creation request for execution {}", schedulableInstance); + Optional execution = this.getExecution(schedulableInstance.getTaskName(), schedulableInstance.getId()); + if (execution.isPresent()) { + return false; + } // Search criterion : taskName, taskInstance - Optional taskEntityOpt = toEntity(execution); + Optional taskEntityOpt = toEntity(schedulableInstance); if (!taskEntityOpt.isPresent()) { return false; } @@ -370,26 +379,21 @@ public void checkSupportsLockAndFetch() { * @param in - Execution to map * @return TaskEntity mapped from execution */ - private Optional toEntity(Execution in) { + private Optional toEntity(SchedulableInstance in) { if (in == null) { return Optional.empty(); } TaskEntity out = new TaskEntity(); - Optional> taskInstanceOpt = Optional - .ofNullable(in.taskInstance); + Optional> taskInstanceOpt = Optional.ofNullable(in.getTaskInstance()); taskInstanceOpt.map(TaskInstance::getTaskName).ifPresent(out::setTaskName); taskInstanceOpt.map(TaskInstance::getId).ifPresent(out::setTaskInstance); taskInstanceOpt.map(TaskInstance::getData).map(serializer::serialize) .ifPresent(out::setTaskData); - out.setExecutionTime(in.getExecutionTime()); - out.setPicked(in.isPicked()); - out.setPickedBy(in.pickedBy); - out.setLastFailure(in.lastFailure); - out.setLastSuccess(in.lastSuccess); - out.setLastHeartbeat(in.lastHeartbeat); - out.setVersion(in.version); + out.setExecutionTime(in.getNextExecutionTime(clock.now())); + out.setPicked(false); + out.setVersion(1); return Optional.of(out); } diff --git a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java index d437e24..3a5b572 100644 --- a/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java +++ b/src/test/java/com/github/kagkarlsson/scheduler/MongoTaskRepositoryTest.java @@ -3,9 +3,9 @@ import static org.assertj.core.api.Assertions.assertThat; import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; -import com.github.kagkarlsson.scheduler.task.Execution; -import com.github.kagkarlsson.scheduler.task.Task; -import com.github.kagkarlsson.scheduler.task.TaskInstance; +import com.github.kagkarlsson.scheduler.task.*; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.utils.ExecutionBuilder; import com.github.kagkarlsson.scheduler.utils.TestUtils; import com.mongodb.ErrorCategory; @@ -62,15 +62,18 @@ void init() throws IOException { .thenReturn(Optional.of(taskResolved)); repository = new MongoTaskRepository(taskResolver, schedulerName, serializer, - "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient()); + "db-scheduler", "db-scheduler", emebddedMongodbExtension.getMongoClient(), + new SystemClock()); } @Test void testCreateIfNotExistsOk() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); assertThat(created).isTrue(); @@ -94,8 +97,10 @@ void testCreateIfNotExistsOk() { @Test void testCreateIfNotExistsKo() { - Execution execution = new ExecutionBuilder().taskName("idTask").taskInstanceId("idInstance") - .build(); + OneTimeTask task = Tasks + .oneTime("idTask", Object.class).execute((TaskInstance inst, + ExecutionContext ctx) -> { + }); TaskEntity taskEntityInitial = new TaskEntity(); taskEntityInitial.setTaskInstance("idInstance"); @@ -103,7 +108,7 @@ void testCreateIfNotExistsKo() { this.emebddedMongodbExtension.getCollection().insertOne(taskEntityInitial); - boolean created = repository.createIfNotExists(execution); + boolean created = repository.createIfNotExists(task.schedulableInstance("idInstance")); // Check that no execution is created because it already exists assertThat(created).isFalse(); }