Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 86 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,60 @@

<artifactId>db-scheduler-mongo</artifactId>
<groupId>com.github.piemjean</groupId>
<version>9.3</version>
<version>11.0.0-SNAPSHOT</version>
<name>db-scheduler: Mongodb support</name>
<description>Module providing Mongodb support for db-scheduler</description>
<url>https://github.com/piemjean/db-scheduler-mongo</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<developers>
<developer>
<name>piemjean</name>
<email>piem.jean@gmail.com</email>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/piemjean/db-scheduler-mopngo.git</connection>
<developerConnection>scm:git:ssh://github.com:piemjean/db-scheduler-mopngo.git</developerConnection>
<url>https://github.com/piemjean/db-scheduler-mongo/tree/master</url>
</scm>

<distributionManagement>
<!-- Publish versioned releases here -->
<repository>
<id>${nexus.server.projet.id}</id>
<name>Releases of ${project.artifactId}</name>
<url>${nexus.releases.repository}/</url>
</repository>

<!-- Publish snapshots here -->
<snapshotRepository>
<uniqueVersion>true</uniqueVersion>
<id>${nexus.server.projet.id}</id>
<name>Snapshots of ${project.artifactId}</name>
<url>${nexus.snapshots.repository}/</url>
</snapshotRepository>

<!-- Utilisé par Maven site:stage et site:deploy -->
<site>
<id>${nexus.server.projet.id}</id>
<url>${project.url}</url>
</site>
</distributionManagement>

<properties>
<jdk.version>1.8</jdk.version>
<maven.compiler.source>${jdk.version}</maven.compiler.source>
<maven.compiler.target>${jdk.version}</maven.compiler.target>
<!-- Versions declaration -->
<db-scheduler.version>9.3</db-scheduler.version>
<db-scheduler.version>11.0</db-scheduler.version>
<mongo-java-driver.version>3.12.7</mongo-java-driver.version>
<slf4j-simple.version>1.7.30</slf4j-simple.version>
<embed-mongo.version>3.0.0</embed-mongo.version>
Expand All @@ -26,6 +69,7 @@
<assertj-version>3.19.0</assertj-version>
<guava.version>30.1-jre</guava.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
Expand Down Expand Up @@ -129,6 +173,46 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand All @@ -137,27 +221,4 @@
</plugins>
</build>

<distributionManagement>
<!-- Publish versioned releases here -->
<repository>
<id>${nexus.server.projet.id}</id>
<name>Releases of ${project.artifactId}</name>
<url>${nexus.releases.repository}/</url>
</repository>

<!-- Publish snapshots here -->
<snapshotRepository>
<uniqueVersion>true</uniqueVersion>
<id>${nexus.server.projet.id}</id>
<name>Snapshots of ${project.artifactId}</name>
<url>${nexus.snapshots.repository}/</url>
</snapshotRepository>

<!-- Utilisé par Maven site:stage et site:deploy -->
<site>
<id>${nexus.server.projet.id}</id>
<url>${project.url}</url>
</site>
</distributionManagement>

</project>
52 changes: 28 additions & 24 deletions src/main/java/com/github/kagkarlsson/scheduler/MongoScheduler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.LogLevel;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
Expand All @@ -12,30 +13,33 @@

public class MongoScheduler extends Scheduler {

protected MongoScheduler(Clock clock,
TaskRepository schedulerTaskRepository,
TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize,
ExecutorService executorService,
SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval,
boolean enableImmediateExecution,
StatsRegistry statsRegistry, int pollingLimit,
Duration deleteUnresolvedAfter, Duration shutdownMaxWait,
List<OnStartup> onStartup) {
super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize,
executorService, schedulerName, executeDueWaiter, heartbeatInterval,
enableImmediateExecution,
statsRegistry, pollingLimit, deleteUnresolvedAfter, shutdownMaxWait, onStartup);
}
protected MongoScheduler(Clock clock,
TaskRepository schedulerTaskRepository,
TaskRepository clientTaskRepository, TaskResolver taskResolver, int threadpoolSize,
ExecutorService executorService,
SchedulerName schedulerName, Waiter executeDueWaiter, Duration heartbeatInterval,
boolean enableImmediateExecution,
StatsRegistry statsRegistry,
Duration deleteUnresolvedAfter, Duration shutdownMaxWait, LogLevel logLevel,
boolean logStackTrace,
List<OnStartup> onStartup) {
super(clock, schedulerTaskRepository, clientTaskRepository, taskResolver, threadpoolSize,
executorService, schedulerName, executeDueWaiter, heartbeatInterval,
enableImmediateExecution,
statsRegistry, PollingStrategyConfig.DEFAULT_FETCH, deleteUnresolvedAfter, shutdownMaxWait,
logLevel,
logStackTrace, onStartup);
}

public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, Task<?>... knownTasks) {
List<Task<?>> knownTasksList = new ArrayList<>();
knownTasksList.addAll(Arrays.asList(knownTasks));
return create(mongoClient, database, collection, knownTasksList);
}
public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, Task<?>... knownTasks) {
List<Task<?>> knownTasksList = new ArrayList<>();
knownTasksList.addAll(Arrays.asList(knownTasks));
return create(mongoClient, database, collection, knownTasksList);
}

public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, List<Task<?>> knownTasks) {
return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks);
}
public static MongoSchedulerBuilder create(MongoClient mongoClient, String database,
String collection, List<Task<?>> knownTasks) {
return new MongoSchedulerBuilder(mongoClient, database, collection, knownTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class MongoSchedulerBuilder extends SchedulerBuilder {
*
* @param mongoClient - object handling mongo connection
* @param databaseName - mongo database name
* @param collection - name of collection in which tasks are stored
* @param knownTasks - list of known tasks
*/
protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, String collection,
Expand All @@ -39,20 +40,16 @@ protected MongoSchedulerBuilder(MongoClient mongoClient, String databaseName, St

@Override
public Scheduler build() {
if (pollingLimit < executorThreads) {
LOG.warn("Polling-limit is less than number of threads. Should be equal or higher.");
}

if (schedulerName == null) {
schedulerName = new SchedulerName.Hostname();
}

final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks);

final MongoTaskRepository mongoTaskRepository = new MongoTaskRepository(taskResolver,
schedulerName, serializer, databaseName, tableName, mongoClient);
schedulerName, serializer, databaseName, tableName, mongoClient, clock);
final MongoTaskRepository clientTaskRepository = new MongoTaskRepository(taskResolver,
schedulerName, serializer, databaseName, tableName, mongoClient);
schedulerName, serializer, databaseName, tableName, mongoClient, clock);

ExecutorService candidateExecutorService = executorService;
if (candidateExecutorService == null) {
Expand All @@ -69,7 +66,7 @@ public Scheduler build() {
schedulerName.getName());

return new MongoScheduler(clock, mongoTaskRepository, clientTaskRepository, taskResolver, executorThreads, candidateExecutorService,
schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit,
deleteUnresolvedAfter, shutdownMaxWait, startTasks);
schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry,
deleteUnresolvedAfter, shutdownMaxWait, logLevel, logStackTrace, startTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.mongodb.ErrorCategory;
Expand All @@ -30,6 +31,8 @@
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.result.DeleteResult;

import java.sql.PreparedStatement;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -53,15 +56,17 @@ public class MongoTaskRepository implements TaskRepository {
private final TaskResolver taskResolver;
private final SchedulerName schedulerSchedulerName;
private final Serializer serializer;
private final Clock clock;

private final MongoCollection<TaskEntity> collection;

public MongoTaskRepository(TaskResolver taskResolver,
SchedulerName schedulerSchedulerName,
Serializer serializer, String databaseName, String tableName, MongoClient mongoClient) {
SchedulerName schedulerSchedulerName,
Serializer serializer, String databaseName, String tableName, MongoClient mongoClient, Clock clock) {
this.taskResolver = taskResolver;
this.schedulerSchedulerName = schedulerSchedulerName;
this.serializer = serializer;
this.clock = clock;

CodecRegistry pojoCodecRegistry = fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
Expand All @@ -87,11 +92,14 @@ private void uniqueIndexCreation() {
}

@Override
public boolean createIfNotExists(Execution execution) {
LOG.debug("Creation request for execution {}", execution);
public boolean createIfNotExists(SchedulableInstance schedulableInstance) {
LOG.debug("Creation request for execution {}", schedulableInstance);
Optional<Execution> execution = this.getExecution(schedulableInstance.getTaskName(), schedulableInstance.getId());
if (execution.isPresent()) {
return false;
}
// Search criterion : taskName, taskInstance
final Bson query = buildFilterFromExecution(execution, false);
Optional<TaskEntity> taskEntityOpt = toEntity(execution);
Optional<TaskEntity> taskEntityOpt = toEntity(schedulableInstance);
if (!taskEntityOpt.isPresent()) {
return false;
}
Expand Down Expand Up @@ -196,6 +204,11 @@ public void getScheduledExecutions(
.filter(Optional::isPresent).map(Optional::get).forEach(consumer);
}

@Override
public List<Execution> lockAndGetDue(Instant instant, int i) {
throw new UnsupportedOperationException("lockAndFetch not supported in mongodatabase");
}

@Override
public void remove(Execution execution) {
final Bson filter = buildFilterFromExecution(execution);
Expand Down Expand Up @@ -355,32 +368,32 @@ public int removeExecutions(String taskName) {
return Long.valueOf(deleted.getDeletedCount()).intValue();
}

@Override
public void checkSupportsLockAndFetch() {
throw new IllegalArgumentException("Mongodb does not support specific lock-and-fetch since operations are already atomic");
}

/**
* Mapping method to get an Entity from an Execution
*
* @param in - Execution to map
* @return TaskEntity mapped from execution
*/
private Optional<TaskEntity> toEntity(Execution in) {
private Optional<TaskEntity> toEntity(SchedulableInstance<?> in) {
if (in == null) {
return Optional.empty();
}

TaskEntity out = new TaskEntity();
Optional<TaskInstance<?>> taskInstanceOpt = Optional
.ofNullable(in.taskInstance);
Optional<TaskInstance<?>> taskInstanceOpt = Optional.ofNullable(in.getTaskInstance());
taskInstanceOpt.map(TaskInstance::getTaskName).ifPresent(out::setTaskName);
taskInstanceOpt.map(TaskInstance::getId).ifPresent(out::setTaskInstance);
taskInstanceOpt.map(TaskInstance::getData).map(serializer::serialize)
.ifPresent(out::setTaskData);

out.setExecutionTime(in.getExecutionTime());
out.setPicked(in.isPicked());
out.setPickedBy(in.pickedBy);
out.setLastFailure(in.lastFailure);
out.setLastSuccess(in.lastSuccess);
out.setLastHeartbeat(in.lastHeartbeat);
out.setVersion(in.version);
out.setExecutionTime(in.getNextExecutionTime(clock.now()));
out.setPicked(false);
out.setVersion(1);

return Optional.of(out);
}
Expand Down
Loading