Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 108 additions & 28 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,69 @@

<artifactId>db-scheduler-mongo</artifactId>
<groupId>com.github.piemjean</groupId>
<version>9.3</version>
<version>11.0.0-SNAPSHOT</version>
<name>db-scheduler: Mongodb support</name>
<description>Module providing Mongodb support for db-scheduler</description>
<url>https://github.com/piemjean/db-scheduler-mongo</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<developers>
<developer>
<name>piemjean</name>
<email>piem.jean@gmail.com</email>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/piemjean/db-scheduler-mopngo.git</connection>
<developerConnection>scm:git:ssh://github.com:piemjean/db-scheduler-mopngo.git</developerConnection>
<url>https://github.com/piemjean/db-scheduler-mongo/tree/master</url>
</scm>

<distributionManagement>
<!-- Publish versioned releases here -->
<repository>
<id>${nexus.server.projet.id}</id>
<name>Releases of ${project.artifactId}</name>
<url>${nexus.releases.repository}/</url>
</repository>

<!-- Publish snapshots here -->
<snapshotRepository>
<uniqueVersion>true</uniqueVersion>
<id>${nexus.server.projet.id}</id>
<name>Snapshots of ${project.artifactId}</name>
<url>${nexus.snapshots.repository}/</url>
</snapshotRepository>

<!-- Utilisé par Maven site:stage et site:deploy -->
<site>
<id>${nexus.server.projet.id}</id>
<url>${project.url}</url>
</site>
</distributionManagement>

<properties>
<jdk.version>1.8</jdk.version>
<maven.compiler.source>${jdk.version}</maven.compiler.source>
<maven.compiler.target>${jdk.version}</maven.compiler.target>
<!-- Versions declaration -->
<db-scheduler.version>9.3</db-scheduler.version>
<mongo-java-driver.version>3.12.7</mongo-java-driver.version>
<db-scheduler.version>11.0</db-scheduler.version>
<mongo-java-driver.version>4.5.1</mongo-java-driver.version>
<slf4j-simple.version>1.7.30</slf4j-simple.version>
<embed-mongo.version>3.0.0</embed-mongo.version>
<embed-mongo.version>3.4.5</embed-mongo.version>
<junit-jupiter.version>5.6.0</junit-jupiter.version>
<mockito-core.version>3.7.7</mockito-core.version>
<assertj-version>3.19.0</assertj-version>
<guava.version>30.1-jre</guava.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
Expand All @@ -37,7 +81,20 @@

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongo-java-driver.version}</version>
</dependency>

<!-- mongodb-driver-sync using not matching version for unknown reason-->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>${mongo-java-driver.version}</version>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>${mongo-java-driver.version}</version>
</dependency>

Expand Down Expand Up @@ -116,6 +173,12 @@
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<version>${embed-mongo.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -129,6 +192,46 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand All @@ -137,27 +240,4 @@
</plugins>
</build>

<distributionManagement>
<!-- Publish versioned releases here -->
<repository>
<id>${nexus.server.projet.id}</id>
<name>Releases of ${project.artifactId}</name>
<url>${nexus.releases.repository}/</url>
</repository>

<!-- Publish snapshots here -->
<snapshotRepository>
<uniqueVersion>true</uniqueVersion>
<id>${nexus.server.projet.id}</id>
<name>Snapshots of ${project.artifactId}</name>
<url>${nexus.snapshots.repository}/</url>
</snapshotRepository>

<!-- Utilisé par Maven site:stage et site:deploy -->
<site>
<id>${nexus.server.projet.id}</id>
<url>${project.url}</url>
</site>
</distributionManagement>

</project>
55 changes: 30 additions & 25 deletions src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -12,30 +14,33 @@

public class MongoScheduler extends Scheduler {

protected MongoScheduler(Clock clock,
TaskRepository schedulerTaskRepository,
TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize,
ExecutorService executorService,
SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval,
boolean enableImmediateExecution,
StatsRegistry statsRegistry, int pollingLimit,
Duration deleteUnresolvedAfter, Duration shutdownMaxWait,
List<OnStartup> onStartup) {
super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize,
executorService, schedulerName, executeDueWaiter, heartbeatInterval,
enableImmediateExecution,
statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, onStartup);
}
protected MongoScheduler(Clock clock,
TaskRepository schedulerTaskRepository,
TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize,
ExecutorService executorService,
SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval,
boolean enableImmediateExecution,
StatsRegistry statsRegistry,
Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel,
boolean logStackTrace,
List<OnStartup> onStartup) {
super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize,
executorService, schedulerName, executeDueWaiter, heartbeatInterval,
enableImmediateExecution,
statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, deleteUnresolvedAfter, shutdownMaxWait,
logLevel,
logStackTrace, onStartup);
}

public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, Task<?>... knownTasks) {
List<Task<?>> knownTasksList = new ArrayList<>();
knownTasksList.addAll(Arrays.asList(knownTasks));
return create(mongoClient, database, collection, knownTasksList);
}
public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, Task<?>... knownTasks) {
List<Task<?>> knownTasksList = new ArrayList<>();
knownTasksList.addAll(Arrays.asList(knownTasks));
return create(mongoClient, database, collection, knownTasksList);
}

public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, List<Task<?>> knownTasks) {
return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks);
}
public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, List<Task<?>> knownTasks) {
return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import static com.github.kagkarlsson.scheduler.Scheduler.THREAD_PREFIX;

import com.github.kagkarlsson.scheduler.task.Task;
import com.mongodb.MongoClient;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.mongodb.client.MongoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,10 +28,11 @@ public class MongoSchedulerBuilder extends SchedulerBuilder {
*
* @param mongoClient - object handling mongo connection
* @param databaseName - mongo database name
* @param collection - name of collection in which tasks are stored
* @param knownTasks - list of known tasks
*/
protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, String collection,
List<Task<?>> knownTasks) {
List<Task<?>> knownTasks) {
super(null, knownTasks);
this.mongoClient = mongoClient;
this.databaseName = databaseName;
Expand All @@ -39,20 +41,16 @@ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, St

@Override
public Scheduler build() {
if (pollingLimit < executorThreads) {
LOG.warn("Polling-limit is less than number of threads. Should be equal or higher.");
}

if (schedulerName == null) {
schedulerName = new SchedulerName.Hostname();
}

final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks);

final MongoTaskRepository mongoTaskRepository = new MongoTaskRepository(taskResolver,
schedulerName, serializer, databaseName, tableName, mongoClient);
schedulerName, serializer, databaseName, tableName, mongoClient, clock);
final MongoTaskRepository clientTaskRepository = new MongoTaskRepository(taskResolver,
schedulerName, serializer, databaseName, tableName, mongoClient);
schedulerName, serializer, databaseName, tableName, mongoClient, clock);

ExecutorService candidateExecutorService = executorService;
if (candidateExecutorService == null) {
Expand All @@ -69,7 +67,7 @@ public Scheduler build() {
schedulerName.getName());

return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService,
schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit,
deleteUnresolvedAfter, shutdownMaxWait, startTasks);
schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry,
deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks);
}
}
Loading