diff --git a/.sdkmanrc b/.sdkmanrc index b8a16ed8..04dd322e 100644 --- a/.sdkmanrc +++ b/.sdkmanrc @@ -1,3 +1,3 @@ # Enable auto-env through the sdkman_auto_env config # Add key=value pairs of SDKs to use below -java=11.0.22-tem +java=17.0.11-tem diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index af1b0015..e0fb1fe9 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -6,3 +6,7 @@ repositories { mavenLocal() gradlePluginPortal() } + +dependencies { + api("org.testcontainers:testcontainers:1.19.7") +} diff --git a/buildSrc/src/main/kotlin/Dependencies.kt b/buildSrc/src/main/kotlin/Dependencies.kt index b8feaa94..f459d387 100644 --- a/buildSrc/src/main/kotlin/Dependencies.kt +++ b/buildSrc/src/main/kotlin/Dependencies.kt @@ -4,7 +4,7 @@ object PlayioPlugin { object Version { - const val gradlePlugin = "0.2.1" + const val gradlePlugin = "0.3.0" } const val oss = "cloud.playio.gradle.oss" diff --git a/core/src/main/java/io/github/zero88/schedulerx/AsyncJob.java b/core/src/main/java/io/github/zero88/schedulerx/AsyncJob.java index 688dd313..73b28c1a 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/AsyncJob.java +++ b/core/src/main/java/io/github/zero88/schedulerx/AsyncJob.java @@ -37,7 +37,7 @@ default void execute(@NotNull JobData jobData, @NotNull ExecutionContext< /** * Async execute job *

- * WARNING: After execution, be aware to call a terminal operation of {@link Future} such + * CAUTION: After execution, be aware to call a terminal operation of {@link Future} such * as {@link Future#onSuccess(Handler)}, {@link Future#onFailure(Handler)} or {@link Future#onComplete(Handler)}. * The async job is already registered these handlers, if several {@code handler}s are registered, there is no * guarantee that they will be invoked in order of registration. diff --git a/core/src/main/java/io/github/zero88/schedulerx/JobData.java b/core/src/main/java/io/github/zero88/schedulerx/JobData.java index ae8cd856..218b0bf2 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/JobData.java +++ b/core/src/main/java/io/github/zero88/schedulerx/JobData.java @@ -1,5 +1,7 @@ package io.github.zero88.schedulerx; +import java.util.Optional; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -14,27 +16,6 @@ */ public interface JobData { - /** - * Get an input data. - *

- * It might be a static input value or a preloaded value from an external system - * or a configuration to instruct how to get actual input data of the job in runtime execution. - * - * @return input data - */ - @Nullable T get(); - - /** - * Declares a unique id in an external system that will be propagated to the job result. - *

- * That makes the integration between the job monitoring and the external system seamless and easier. - * - * @return an external id - * @see ExecutionResult#externalId() - * @since 2.0.0 - */ - default @Nullable Object externalId() { return null; } - /** * Create emtpy data with random external id in integer. * @@ -51,12 +32,13 @@ public interface JobData { * @return JobData contains null data * @since 2.0.0 */ - static JobData empty(@NotNull Object externalId) { + static JobData empty(Object externalId) { + Object id = Optional.ofNullable(externalId).orElseGet(Utils::randomPositiveInt); return new JobData<>() { public @Nullable D get() { return null; } @Override - public Object externalId() { return externalId; } + public Object externalId() { return id; } }; } @@ -78,13 +60,35 @@ static JobData create(@NotNull D data) { * @return JobData * @since 2.0.0 */ - static JobData create(@NotNull D data, @NotNull Object externalId) { + static JobData create(@NotNull D data, Object externalId) { + Object id = Optional.ofNullable(externalId).orElseGet(Utils::randomPositiveInt); return new JobData<>() { public D get() { return data; } @Override - public Object externalId() { return externalId; } + public Object externalId() { return id; } }; } + /** + * Get an input data. + *

+ * It might be a static input value or a preloaded value from an external system + * or a configuration to instruct how to get actual input data of the job in runtime execution. + * + * @return input data + */ + @Nullable T get(); + + /** + * Declares a unique id in an external system that will be propagated to the job result. + *

+ * That makes the integration between the job monitoring and the external system seamless and easier. + * + * @return an external id + * @see ExecutionResult#externalId() + * @since 2.0.0 + */ + @Nullable Object externalId(); + } diff --git a/core/src/main/java/io/github/zero88/schedulerx/JobFactory.java b/core/src/main/java/io/github/zero88/schedulerx/JobFactory.java new file mode 100644 index 00000000..8e2d4b4c --- /dev/null +++ b/core/src/main/java/io/github/zero88/schedulerx/JobFactory.java @@ -0,0 +1,88 @@ +package io.github.zero88.schedulerx; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.jetbrains.annotations.NotNull; + +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; + +/** + * Factory for creating Job instances from class names or service loader. + * + * @since 2.0.0 + */ +public final class JobFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobFactory.class); + private static final Map> CACHE = new ConcurrentHashMap<>(); + + private JobFactory() { + } + + /** + * Get the singleton instance of JobFactory. + * + * @return the JobFactory instance + */ + public static JobFactory getInstance() { + return Holder.INSTANCE; + } + + /** + * Create a Job instance from a job class name. + * + * @param jobClassName the fully qualified class name of the job + * @param type of job input + * @param type of job output + * @return a Job instance + * @throws IllegalArgumentException if the job class cannot be found or instantiated + */ + @SuppressWarnings("unchecked") + public Job create(@NotNull String jobClassName) { + try { + // First check if we have a cached instance + if (CACHE.containsKey(jobClassName)) { + LOGGER.debug("Using cached job instance for: " + jobClassName); + return (Job) CACHE.get(jobClassName); + } + + // Try to load the class + Class jobClass = Class.forName(jobClassName); + + // Ensure it implements Job interface + if (!Job.class.isAssignableFrom(jobClass)) { + throw new IllegalArgumentException("Class " + jobClassName + " does not implement Job interface"); + } + + // Instantiate using default constructor + Job jobInstance = (Job) jobClass.getDeclaredConstructor().newInstance(); + LOGGER.debug("Created job instance using default constructor: " + jobClassName); + + // Cache the instance for future use + CACHE.put(jobClassName, jobInstance); + return jobInstance; + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Job class " + jobClassName + " must have a default constructor", e); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Job class not found: " + jobClassName, e); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to instantiate job class: " + jobClassName, e); + } + } + + /** + * Clear the job instance cache. + */ + public void clearCache() { + CACHE.clear(); + } + + /** + * Holder for the singleton instance. + */ + private static class Holder { + private static final JobFactory INSTANCE = new JobFactory(); + } +} diff --git a/core/src/main/java/io/github/zero88/schedulerx/TimeClock.java b/core/src/main/java/io/github/zero88/schedulerx/TimeClock.java index b61203ae..85754868 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/TimeClock.java +++ b/core/src/main/java/io/github/zero88/schedulerx/TimeClock.java @@ -6,7 +6,7 @@ import org.jetbrains.annotations.NotNull; /** - * Represents for time clock + * Represents for a time clock * * @since 2.0.0 */ @@ -14,7 +14,7 @@ public interface TimeClock { /** - * Obtains the current instant from the system clock. + * Gets the current instant from the system clock. * * @return the current instant using the system clock, not null */ diff --git a/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java b/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java index b0016539..dbe541dc 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java +++ b/core/src/main/java/io/github/zero88/schedulerx/TimeoutPolicy.java @@ -7,6 +7,8 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import io.vertx.core.json.JsonObject; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonProperty; @@ -33,16 +35,6 @@ public static TimeoutPolicy byDefault() { return create(null, null); } - /** - * Create timeout policy with execution timeout - * - * @param executionTimeout given execution timeout - * @return timeout policy - */ - public static TimeoutPolicy create(@NotNull Duration executionTimeout) { - return create(null, executionTimeout); - } - /** * Create timeout policy with execution timeout * @@ -63,6 +55,20 @@ public static TimeoutPolicy create(@JsonProperty("evaluationTimeout") @Nullable check.apply(executionTimeout, DefaultOptions.getInstance().executionMaxTimeout)); } + /** + * Create timeout policy with execution timeout + * + * @param executionTimeout given execution timeout + * @return timeout policy + */ + public static TimeoutPolicy create(@NotNull Duration executionTimeout) { + return create(null, executionTimeout); + } + + public static TimeoutPolicy create(JsonObject timeoutPolicy) { + return timeoutPolicy.mapTo(TimeoutPolicy.class); + } + /** * Declares the trigger evaluation timeout. Default is {@link DefaultOptions#evaluationMaxTimeout} * @@ -81,6 +87,13 @@ public static TimeoutPolicy create(@JsonProperty("evaluationTimeout") @Nullable @JsonGetter public @NotNull Duration executionTimeout() { return executionTimeout; } + @Override + public int hashCode() { + int result = evaluationTimeout != null ? evaluationTimeout.hashCode() : 0; + result = 31 * result + (executionTimeout != null ? executionTimeout.hashCode() : 0); + return result; + } + @Override public boolean equals(Object o) { if (this == o) @@ -94,11 +107,4 @@ public boolean equals(Object o) { Objects.equals(executionTimeout, that.executionTimeout); } - @Override - public int hashCode() { - int result = evaluationTimeout != null ? evaluationTimeout.hashCode() : 0; - result = 31 * result + (executionTimeout != null ? executionTimeout.hashCode() : 0); - return result; - } - } diff --git a/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java b/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java index 068951f1..baf7148e 100644 --- a/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java +++ b/core/src/main/java/io/github/zero88/schedulerx/impl/Utils.java @@ -1,7 +1,6 @@ package io.github.zero88.schedulerx.impl; import java.security.SecureRandom; -import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; @@ -10,18 +9,12 @@ @Internal public final class Utils { - public static String brackets(Object any) { return "[" + any + "]"; } - - /* - * The random number generator, in a holder class to defer initialization until needed. - */ - private static class Holder { - - static final SecureRandom numberGenerator = new SecureRandom(); - + private Utils() { } - private Utils() { } + public static String brackets(Object any) { + return "[" + any + "]"; + } public static int randomPositiveInt() { return Utils.Holder.numberGenerator.nextInt() & Integer.MAX_VALUE; @@ -62,4 +55,13 @@ public static T castOrNull(Object data, boolean nullOrThrow) { } } + /* + * The random number generator, in a holder class to defer initialization until needed. + */ + private static class Holder { + + static final SecureRandom numberGenerator = new SecureRandom(); + + } + } diff --git a/core/src/test/java/io/github/zero88/schedulerx/JobFactoryTest.java b/core/src/test/java/io/github/zero88/schedulerx/JobFactoryTest.java new file mode 100644 index 00000000..cf506953 --- /dev/null +++ b/core/src/test/java/io/github/zero88/schedulerx/JobFactoryTest.java @@ -0,0 +1,84 @@ +package io.github.zero88.schedulerx; + +import static io.github.zero88.schedulerx.impl.Utils.brackets; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class JobFactoryTest { + + @BeforeEach + void setUp() { + JobFactory.getInstance().clearCache(); + } + + @Test + void test_create_job_with_default_constructor() { + Job job = JobFactory.getInstance().create(TestJobWithDefaultConstructor.class.getName()); + + assertNotNull(job); + assertInstanceOf(TestJobWithDefaultConstructor.class, job); + } + + @Test + void test_caching() { + Job job1 = JobFactory.getInstance().create(TestJobWithDefaultConstructor.class.getName()); + Job job2 = JobFactory.getInstance() + .create("io.github.zero88.schedulerx" + + ".JobFactoryTest$TestJobWithDefaultConstructor"); + + assertNotNull(job1); + assertNotNull(job2); + assertSame(job1, job2, "Should return cached instance"); + } + + @ParameterizedTest + // @formatter:off + @CsvSource({ + "non.existent.JobClass,Job class not found", + "io.github.zero88.schedulerx.TestUtils,does not implement Job interface", + "io.github.zero88.schedulerx.JobFactoryTest$TestJobWithWithoutDefaultConstructor,must have a default constructor" + }) + // @formatter:on + void test_fail_to_create_job(String jobClassName, String expectedMessage) { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> JobFactory.getInstance().create(jobClassName)); + String exceptionMessage = exception.getMessage(); + String causeMessage = exception.getCause() != null ? exception.getCause().getMessage() : ""; + assertTrue(exceptionMessage.contains(expectedMessage) || causeMessage.contains(expectedMessage), + "Expected error message to contain: " + brackets(expectedMessage) + ", but got: " + + brackets(exceptionMessage) + " and cause: " + brackets(causeMessage)); + } + + static class TestJobWithDefaultConstructor implements Job { + + @Override + public void execute(@NotNull JobData jobData, @NotNull ExecutionContext executionContext) { + executionContext.complete("Default job executed"); + } + + } + + + static class TestJobWithWithoutDefaultConstructor implements Job { + + private final String arg; + + public TestJobWithWithoutDefaultConstructor(String arg) { this.arg = arg; } + + @Override + public void execute(@NotNull JobData jobData, @NotNull ExecutionContext executionContext) { + executionContext.complete("Default job executed: " + arg); + } + + } + +} diff --git a/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java b/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java index eac0c644..a6ef9226 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java +++ b/core/src/test/java/io/github/zero88/schedulerx/TestUtils.java @@ -22,19 +22,6 @@ public final class TestUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class); - private TestUtils() { } - - @SuppressWarnings("java:S2925") - public static void block(Duration duration, VertxTestContext testContext) { - try { - LOGGER.info("Doing a mock stuff in " + brackets(duration) + "..."); - TimeUnit.MILLISECONDS.sleep(duration.toMillis()); - LOGGER.info("Wake up after " + brackets(duration) + "!!!"); - } catch (InterruptedException e) { - testContext.failNow(e); - } - } - public static List simulateRunActionInParallel(VertxTestContext testContext, Runnable action, int nbOfThreads) { final List store = new ArrayList<>(); @@ -47,6 +34,16 @@ public static List simulateRunActionInParallel(VertxTestContext testC return store; } + public static void block(Duration duration, VertxTestContext testContext) { + try { + LOGGER.info("Doing a mock stuff in " + brackets(duration) + "..."); + TimeUnit.MILLISECONDS.sleep(duration.toMillis()); + LOGGER.info("Wake up after " + brackets(duration) + "!!!"); + } catch (InterruptedException e) { + testContext.failNow(e); + } + } + public static ObjectMapper defaultMapper() { return DatabindCodec.mapper() .copy() diff --git a/core/src/test/java/io/github/zero88/schedulerx/custom/HttpClientJob.java b/core/src/test/java/io/github/zero88/schedulerx/custom/HttpClientJob.java index d9f0362f..1df7e36b 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/custom/HttpClientJob.java +++ b/core/src/test/java/io/github/zero88/schedulerx/custom/HttpClientJob.java @@ -18,6 +18,9 @@ public Future asyncExecute(@NotNull JobData jobData, @NotNull ExecutionContext executionContext) { Vertx vertx = executionContext.vertx(); JsonObject config = jobData.get(); + if (config == null) { + return Future.failedFuture("Missing job data"); + } return vertx.createHttpClient() .request(HttpMethod.GET, config.getString("host"), config.getString("path")) .map(req -> req.setFollowRedirects(true)) diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/CronTriggerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/CronTriggerTest.java index d493f887..0c0e5ad8 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/CronTriggerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/CronTriggerTest.java @@ -65,6 +65,7 @@ void test_serialize_deserialize(CronTrigger trigger, JsonObject json) throws Jso Assertions.assertEquals(t2, trigger); Assertions.assertEquals(t2.toJson(), trigger.toJson()); Assertions.assertEquals(t2.toJson().encode(), mapper.writeValueAsString(trigger)); + System.out.println(t2.toJson()); } static Stream invalidExpression() { diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/EventTriggerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/EventTriggerTest.java index 1fb6a4f6..572c8fce 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/EventTriggerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/EventTriggerTest.java @@ -53,6 +53,7 @@ void test_serialize_deserialize(EventTrigger trigger, JsonObject json) throws Js Assertions.assertEquals(t3, trigger); Assertions.assertEquals(t3.toJson(), trigger.toJson()); Assertions.assertEquals(t3.toJson().encode(), mapper.writeValueAsString(trigger)); + System.out.println(t3.toJson()); } static Stream invalidData() { diff --git a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalTriggerTest.java b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalTriggerTest.java index 8dc53603..fe65d8a0 100644 --- a/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalTriggerTest.java +++ b/core/src/test/java/io/github/zero88/schedulerx/trigger/IntervalTriggerTest.java @@ -37,7 +37,8 @@ class IntervalTriggerTest { static void setup() { mapper = DatabindCodec.mapper() .findAndRegisterModules() - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, + SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS); } static Stream validData() { @@ -78,6 +79,7 @@ void test_serialize_deserialize(IntervalTrigger trigger, JsonObject json) throws Assertions.assertEquals(t1.toJson(), trigger.toJson()); Assertions.assertEquals(t1.toJson().encode(), mapper.writeValueAsString(trigger)); Assertions.assertEquals(json.mapTo(IntervalTrigger.class), trigger); + System.out.println(t1.toJson()); } static Stream invalidData() { diff --git a/core/src/testFixtures/java/io/github/zero88/schedulerx/NoopJob.java b/core/src/testFixtures/java/io/github/zero88/schedulerx/NoopJob.java index 0cd648e0..567eb947 100644 --- a/core/src/testFixtures/java/io/github/zero88/schedulerx/NoopJob.java +++ b/core/src/testFixtures/java/io/github/zero88/schedulerx/NoopJob.java @@ -1,12 +1,12 @@ package io.github.zero88.schedulerx; /** - * Represents for dummy job that do nothing + * Represents for a fake job that does nothing * * @param Type of job input data * @param Type of job result data */ -public interface NoopJob extends Job { +public interface NoopJob extends SyncJob { static Job create() { return (jobData, executionContext) -> { }; } diff --git a/docs/build.gradle.kts b/docs/build.gradle.kts index f92093d3..9f7cf144 100644 --- a/docs/build.gradle.kts +++ b/docs/build.gradle.kts @@ -1,10 +1,14 @@ import cloud.playio.gradle.antora.tasks.AntoraCopyTask import cloud.playio.gradle.generator.docgen.AsciidocGenTask +import cloud.playio.gradle.pandoc.FormatFrom +import cloud.playio.gradle.pandoc.FormatTo +import cloud.playio.gradle.pandoc.tasks.PandocTask import cloud.playio.gradle.shared.prop plugins { id(PlayioPlugin.antora) id(PlayioPlugin.docgen) + id(PlayioPlugin.pandoc) } dependencies { @@ -32,19 +36,19 @@ documentation { javadocProjects.set((extensions["PROJECT_POOL"] as Map<*, Array>)[mainProject]!!.map(project::project)) } -// pandoc { -// from.set(FormatFrom.markdown) -// to.set(FormatTo.asciidoc) -// inputFile.set(rootDir.resolve("CHANGELOG.md")) -// outFile.set("pg-changelog.adoc") -// config { -// arguments.set(arrayOf("--trace")) -// } -// } + pandoc { + from.set(FormatFrom.markdown) + to.set(FormatTo.asciidoc) + inputFile.set(rootDir.resolve("CHANGELOG.md")) + outFile.set("pg-changelog.adoc") + config { + arguments.set(arrayOf("--trace")) + } + } } tasks { -// named("antoraPages") { from(withType()) } + named("antoraPages") { from(withType()) } named("antoraPartials") { from(withType()) include("*.adoc") diff --git a/event-trigger-jsonschema/build.gradle.kts b/manager/build.gradle.kts similarity index 76% rename from event-trigger-jsonschema/build.gradle.kts rename to manager/build.gradle.kts index 87d748df..63370e0a 100644 --- a/event-trigger-jsonschema/build.gradle.kts +++ b/manager/build.gradle.kts @@ -1,6 +1,8 @@ dependencies { api(project(":schedulerx")) api(VertxLibs.jsonSchema) + api(JacksonLibs.databind) + api(JacksonLibs.jsr310) compileOnly(UtilLibs.jetbrainsAnnotations) testImplementation(testFixtures(project(":schedulerx"))) diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/JsonFileScheduleManager.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/JsonFileScheduleManager.java new file mode 100644 index 00000000..40833eff --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/JsonFileScheduleManager.java @@ -0,0 +1,94 @@ +package io.github.zero88.schedulerx.manager; + +import io.github.zero88.schedulerx.SchedulingMonitor; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.file.FileSystem; +import io.vertx.core.json.JsonObject; + +/** + * A concrete implementation of Manager that loads scheduler configurations from JSON files. + * The file path should be specified in the setup configuration under the "filePath" key. + */ +public class JsonFileScheduleManager implements SchedulingLoader { + + private static final String FILE_PATH_KEY = "filePath"; + private static final String DEFAULT_FILE_PATH = "schedulers.json"; + + + /** + * Creates a new JsonFileManager instance with a default monitor. + * + * @param vertx the Vertx instance + * @param defaultMonitor the default scheduling monitor + */ + public JsonFileScheduleManager(Vertx vertx, SchedulingMonitor defaultMonitor) { + super(vertx, defaultMonitor); + } + + @Override + protected Future loadData() { + String filePath = config.getString(FILE_PATH_KEY, DEFAULT_FILE_PATH); + LOGGER.info("Loading scheduler data from file: " + filePath); + + FileSystem fs = vertx.fileSystem(); + + return fs.exists(filePath) + .compose(exists -> { + if (!exists) { + LOGGER.warn("Scheduler configuration file does not exist: " + filePath); + return Future.succeededFuture(Buffer.buffer()); + } + return fs.readFile(filePath); + }) + .compose(buffer -> { + try { + JsonObject data = buffer.toJsonObject(); + LOGGER.info("Successfully loaded scheduler data from file: " + filePath); + return Future.succeededFuture(data); + } catch (Exception e) { + LOGGER.error("Failed to parse JSON from file: " + filePath, e); + return Future.failedFuture(new IllegalArgumentException("Invalid JSON in file: " + filePath, e)); + } + }) + .onFailure(t -> LOGGER.error("Failed to load data from file: " + filePath, t)); + } + + /** + * Saves the current scheduler configurations to the configured file. + * This is a utility method for persisting configurations. + * + * @param data the data to save + * @return a Future indicating completion + */ + public Future saveData(JsonObject data) { + String filePath = config.getString(FILE_PATH_KEY, DEFAULT_FILE_PATH); + LOGGER.info("Saving scheduler data to file: " + filePath); + + return vertx.fileSystem() + .writeFile(filePath, data.toBuffer()) + .onSuccess(v -> LOGGER.info("Successfully saved scheduler data to file: " + filePath)) + .onFailure(t -> LOGGER.error("Failed to save data to file: " + filePath, t)); + } + + /** + * Gets the configured file path. + * + * @return the file path + */ + public String getFilePath() { + return config.getString(FILE_PATH_KEY, DEFAULT_FILE_PATH); + } + + @Override + public Future load() { + return null; + } + + @Override + public JsonObject next() { + return null; + } + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/ManagerExceptions.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/ManagerExceptions.java new file mode 100644 index 00000000..486af76d --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/ManagerExceptions.java @@ -0,0 +1,14 @@ +package io.github.zero88.schedulerx.manager; + +public final class ManagerExceptions { + + public static class InitSchedulerException extends Exception { + + public InitSchedulerException(Throwable cause) { super(cause); } + + } + + + public static class RunSchedulerException extends RuntimeException { } + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManager.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManager.java new file mode 100644 index 00000000..f62f66ad --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManager.java @@ -0,0 +1,38 @@ +package io.github.zero88.schedulerx.manager; + +import org.jetbrains.annotations.NotNull; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +/** + * The interface for managing schedulers in the scheduler.x framework. + * This interface defines methods for setting up the datasource for schedulers, then start. + * + * @since 2.0.0 + */ +public interface ScheduleManager { + + /** + * Initializes the manager, this method should be called before any other methods. + * + * @return this for fluent API + */ + ScheduleManager setup(@NotNull Vertx vertx, @NotNull JsonObject config); + + /** + * Loading available schedulers from datasource then start them up. + * + * @return a Future indicating completion + */ + Future run(); + + /** + * Cancels all the schedulers in the manager. + * + * @return a Future indicating completion + */ + Future shutdown(); + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManagerReport.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManagerReport.java new file mode 100644 index 00000000..3c9841ad --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/ScheduleManagerReport.java @@ -0,0 +1,19 @@ +package io.github.zero88.schedulerx.manager; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.manager.impl.ScheduleManagerReportImpl; +import io.github.zero88.schedulerx.trigger.Trigger; + +public interface ScheduleManagerReport { + + static ScheduleManagerReport create() { + return new ScheduleManagerReportImpl(); + } + + void addSuccess(Scheduler scheduler); + + void addError(@NotNull Exception error); + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulerCreator.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulerCreator.java new file mode 100644 index 00000000..e710ec7b --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulerCreator.java @@ -0,0 +1,23 @@ +package io.github.zero88.schedulerx.manager; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.manager.ManagerExceptions.InitSchedulerException; +import io.github.zero88.schedulerx.trigger.Trigger; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +public interface SchedulerCreator { + + /** + * Create scheduler from given data + * + * @param vertx vertx instance + * @param data schedule data in json object + * @return the creation result + * @throws InitSchedulerException if any error occurs during creation + */ + Scheduler create(@NotNull Vertx vertx, @NotNull JsonObject data) throws InitSchedulerException; + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulingLoader.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulingLoader.java new file mode 100644 index 00000000..cfb5100e --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/SchedulingLoader.java @@ -0,0 +1,12 @@ +package io.github.zero88.schedulerx.manager; + +import io.vertx.core.Future; +import io.vertx.core.json.JsonObject; + +public interface SchedulingLoader { + + Future load(); + + JsonObject next(); + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManager.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManager.java new file mode 100644 index 00000000..a2bdefb3 --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManager.java @@ -0,0 +1,191 @@ +package io.github.zero88.schedulerx.manager.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.manager.ScheduleManager; +import io.github.zero88.schedulerx.manager.SchedulingLoader; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.jackson.DatabindCodec; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +/** + * Abstract base implementation of the Manager interface that provides common functionality + * for managing schedulers. Concrete implementations should extend this class and implement + * the abstract methods to provide specific data loading strategies. + */ +public abstract class AbstractScheduleManager implements InternalSchedulerManager { + + protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractScheduleManager.class); + protected final ConcurrentMap> schedulers = new ConcurrentHashMap<>(); + protected ObjectMapper mapper; + protected Vertx vertx; + protected JsonObject config; + protected boolean isSetup = false; + + @Override + public ScheduleManager setup(@NotNull Vertx vertx, @NotNull JsonObject config) { + this.vertx = Objects.requireNonNull(vertx, "Vertx instance is required"); + this.config = Objects.requireNonNull(config, "Config is required"); + this.mapper = DatabindCodec.mapper() + .findAndRegisterModules() + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, + SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS); + this.isSetup = true; + LOGGER.info("Manager setup completed with config: " + this.config.encodePrettily()); + return this; + } + + @Override + public Future shutdown() { + LOGGER.info("Shutting down " + schedulers.size() + " scheduler(s)..."); + + List> stopFutures = new ArrayList<>(); + + for (Scheduler scheduler : schedulers.values()) { + Promise stopPromise = Promise.promise(); + try { + scheduler.cancel(); + stopPromise.complete(); + } catch (Exception e) { + LOGGER.warn("Error stopping scheduler", e); + stopPromise.fail(e); + } + stopFutures.add(stopPromise.future()); + } + + if (stopFutures.isEmpty()) { + return Future.succeededFuture(); + } + + return Future.all(stopFutures).mapEmpty().onSuccess(v -> { + schedulers.clear(); + LOGGER.info("All schedulers shutdown completed"); + }); + } + + /** + * Gets the current scheduler count. + * + * @return the number of active schedulers + */ + public int getSchedulerCount() { + return schedulers.size(); + } + + /** + * Gets a scheduler by ID. + * + * @param schedulerId the scheduler ID + * @return the scheduler or null if not found + */ + public Scheduler getScheduler(String schedulerId) { + return schedulers.get(schedulerId); + } + + /** + * Reloads the manager by shutting down existing schedulers and loading new ones. + * + * @return a Future indicating completion + */ + public Future reload() { + LOGGER.info("Reloading scheduler configurations..."); + return shutdown().flatMap(v -> load()).flatMap(v -> run()); + } + + @Override + public Future load() { + if (!isSetup) { + return Future.failedFuture(new IllegalStateException("Manager must be setup before loading")); + } + + return loadData().compose(this::createSchedulersFromData) + .onSuccess(v -> LOGGER.info("Successfully loaded schedulers")) + .onFailure(t -> LOGGER.error("Failed to load schedulers", t)); + } + + + /** + * Load data from the specific source (file, database, etc.). + * This method should be implemented by concrete classes. + * + * @return a Future containing the loaded data as JsonObject + */ + protected abstract Future loadData(); + + /** + * Creates schedulers from the loaded data. + * + * @param data the loaded data + * @return a Future indicating completion + */ + protected Future createSchedulersFromData(JsonObject data) { + Promise promise = Promise.promise(); + + try { + if (data == null || data.isEmpty()) { + LOGGER.warn("No scheduler data provided"); + promise.complete(); + return promise.future(); + } + + List> loadFutures = new ArrayList<>(); + + // Handle both single scheduler config and array of configs + if (data.containsKey("schedulers") && data.getValue("schedulers") instanceof JsonArray) { + JsonArray schedulerConfigs = data.getJsonArray("schedulers"); + for (int i = 0; i < schedulerConfigs.size(); i++) { + JsonObject config = schedulerConfigs.getJsonObject(i); + loadFutures.add(createSchedulerFromConfig(config, "scheduler_" + i)); + } + } else if (data.containsKey("trigger")) { + // Single scheduler configuration + loadFutures.add(createSchedulerFromConfig(data, "default_scheduler")); + } else if (data.getValue("schedulers") instanceof JsonObject) { + // Map of named schedulers + JsonObject namedSchedulers = data.getJsonObject("schedulers"); + for (String name : namedSchedulers.fieldNames()) { + JsonObject config = namedSchedulers.getJsonObject(name); + // Use name as a fallback scheduler ID, but if config has external_id in jobData, it will take + // precedence + loadFutures.add(createSchedulerFromConfig(config, name)); + } + } + + if (loadFutures.isEmpty()) { + LOGGER.warn("No valid scheduler configurations found in data"); + promise.complete(); + return promise.future(); + } + + Future.all(loadFutures).onComplete(ar -> { + if (ar.succeeded()) { + LOGGER.info("Successfully created " + loadFutures.size() + " scheduler(s)"); + promise.complete(); + } else { + LOGGER.error("Failed to create schedulers", ar.cause()); + promise.fail(ar.cause()); + } + }); + } catch (Exception e) { + LOGGER.error("Error during scheduler creation", e); + promise.fail(e); + } + + return promise.future(); + } +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/InternalSchedulerManager.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/InternalSchedulerManager.java new file mode 100644 index 00000000..a715e6e2 --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/InternalSchedulerManager.java @@ -0,0 +1,43 @@ +package io.github.zero88.schedulerx.manager.impl; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.manager.ScheduleManager; +import io.github.zero88.schedulerx.manager.ScheduleManagerReport; +import io.github.zero88.schedulerx.manager.SchedulerCreator; +import io.github.zero88.schedulerx.manager.SchedulingLoader; +import io.github.zero88.schedulerx.trigger.Trigger; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +public interface InternalSchedulerManager extends ScheduleManager { + + @Override + default Future run() { + ScheduleManagerReport report = ScheduleManagerReport.create(); + SchedulerCreator creator = creator(); + return loader().load().map(loader -> { + JsonObject data; + while ((data = loader.next()) != null) { + try { + final Scheduler scheduler = creator.create(vertx(), data); + // TODO: start by worker + scheduler.start(); + report.addSuccess(scheduler); + } catch (Exception e) { + report.addError(e); + } + } + return report; + }); + } + + @NotNull SchedulerCreator creator(); + + @NotNull T loader(); + + @NotNull Vertx vertx(); + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/ScheduleManagerReportImpl.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/ScheduleManagerReportImpl.java new file mode 100644 index 00000000..3b10090d --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/ScheduleManagerReportImpl.java @@ -0,0 +1,21 @@ +package io.github.zero88.schedulerx.manager.impl; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.manager.ScheduleManagerReport; +import io.github.zero88.schedulerx.trigger.Trigger; + +public class ScheduleManagerReportImpl implements ScheduleManagerReport { + + @Override + public void addSuccess(Scheduler scheduler) { + + } + + @Override + public void addError(@NotNull Exception error) { + + } + +} diff --git a/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/SchedulerCreatorImpl.java b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/SchedulerCreatorImpl.java new file mode 100644 index 00000000..81371059 --- /dev/null +++ b/manager/src/main/java/io/github/zero88/schedulerx/manager/impl/SchedulerCreatorImpl.java @@ -0,0 +1,96 @@ +package io.github.zero88.schedulerx.manager.impl; + +import org.jetbrains.annotations.NotNull; + +import io.github.zero88.schedulerx.CronScheduler; +import io.github.zero88.schedulerx.EventScheduler; +import io.github.zero88.schedulerx.IntervalScheduler; +import io.github.zero88.schedulerx.Job; +import io.github.zero88.schedulerx.JobData; +import io.github.zero88.schedulerx.JobFactory; +import io.github.zero88.schedulerx.Scheduler; +import io.github.zero88.schedulerx.SchedulerBuilder; +import io.github.zero88.schedulerx.TimeoutPolicy; +import io.github.zero88.schedulerx.manager.ManagerExceptions.InitSchedulerException; +import io.github.zero88.schedulerx.manager.SchedulerCreator; +import io.github.zero88.schedulerx.trigger.CronTrigger; +import io.github.zero88.schedulerx.trigger.EventTrigger; +import io.github.zero88.schedulerx.trigger.IntervalTrigger; +import io.github.zero88.schedulerx.trigger.Trigger; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +public class SchedulerCreatorImpl implements SchedulerCreator { + + @Override + public Scheduler create(@NotNull Vertx vertx, @NotNull JsonObject schedulerJson) throws InitSchedulerException { + try { + return createScheduler(vertx, schedulerJson); + } catch (Exception e) { + throw new InitSchedulerException(e); + } + } + + /** + * Creates a scheduler from configuration data. + * + * @param vertx vertx instance + * @param schedulerJson the scheduler configuration + * @return a Future containing the scheduler ID + */ + protected Scheduler createScheduler(Vertx vertx, JsonObject schedulerJson) { + if (!schedulerJson.containsKey("trigger")) { + throw new IllegalArgumentException("Trigger configuration is required"); + } + + Trigger trigger = createTrigger(schedulerJson.getJsonObject("trigger")); + Job job = JobFactory.getInstance().create(schedulerJson.getString("jobClass")); + JobData jobData = createJobData(schedulerJson.getJsonObject("jobData")); + TimeoutPolicy timeoutPolicy = TimeoutPolicy.create(schedulerJson.getJsonObject("timeoutPolicy")); + //noinspection unchecked + return createBuilderFromTrigger(trigger).setVertx(vertx) + .setTrigger(trigger) + .setJob(job) + .setJobData(jobData) + .setTimeoutPolicy(timeoutPolicy) + // .setMonitor(monitor) + .build(); + } + + /** + * Creates a trigger based on type and configuration. + */ + protected Trigger createTrigger(JsonObject triggerConfig) { + String type = triggerConfig.getString("type"); + try { + return switch (type.toLowerCase()) { + case "interval" -> triggerConfig.mapTo(IntervalTrigger.class); + case "cron" -> triggerConfig.mapTo(CronTrigger.class); + case "event" -> triggerConfig.mapTo(EventTrigger.class); + default -> throw new IllegalArgumentException("Unsupported trigger type: " + type); + }; + } catch (Exception e) { + throw new IllegalArgumentException("Invalid trigger configuration: " + e.getMessage(), e); + } + } + + /** + * Creates job data from configuration. + */ + protected JobData createJobData(JsonObject jobDataConfig) { + Object externalId = jobDataConfig.getValue("external_id"); + jobDataConfig.remove("external_id"); + return JobData.create(jobDataConfig, externalId); + } + + @SuppressWarnings("rawtypes") + private SchedulerBuilder createBuilderFromTrigger(Trigger trigger) { + return switch (trigger.type().toLowerCase()) { + case "interval" -> IntervalScheduler.builder(); + case "cron" -> CronScheduler.builder(); + case "event" -> EventScheduler.builder(); + default -> throw new IllegalArgumentException("Unsupported trigger type: " + trigger.type()); + }; + } + +} diff --git a/manager/src/test/java/io/github/zero88/schedulerx/manager/ScheduleManagerIntegrationTest.java b/manager/src/test/java/io/github/zero88/schedulerx/manager/ScheduleManagerIntegrationTest.java new file mode 100644 index 00000000..dc54c33c --- /dev/null +++ b/manager/src/test/java/io/github/zero88/schedulerx/manager/ScheduleManagerIntegrationTest.java @@ -0,0 +1,268 @@ +package io.github.zero88.schedulerx.manager; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.github.zero88.schedulerx.SchedulingAsserter; +import io.github.zero88.schedulerx.SchedulingMonitor; +import io.vertx.core.Vertx; +import io.vertx.core.file.FileSystem; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class ScheduleManagerIntegrationTest { + + private JsonFileScheduleManager manager; + private String testFilePath; + private FileSystem fs; + + @BeforeEach + void setUp(Vertx vertx) throws Exception { + fs = vertx.fileSystem(); + + // Create a temporary test file + Path tempFile = Files.createTempFile("scheduler-integration-test", ".json"); + testFilePath = tempFile.toString(); + } + + @AfterEach + void tearDown(VertxTestContext testContext) throws Exception { + if (manager != null) { + manager.shutdown() + .compose(v -> fs.delete(testFilePath).recover(t -> null)) + .onComplete(testContext.succeedingThenComplete()); + } else { + testContext.completeNow(); + } + } + + @Test + void test_manager_with_real_scheduler_execution(Vertx vertx, VertxTestContext testContext) + throws InterruptedException { + final AtomicInteger executionCount = new AtomicInteger(0); + final int expectedExecutions = 3; + + // Create a monitoring that counts executions using SchedulingAsserterBuilder + SchedulingMonitor monitor = SchedulingAsserter.builder() + .setTestContext(testContext) + .disableAutoCompleteTest() // We'll complete + // manually when we reach the expected count + .setEach(result -> { + int count = executionCount.incrementAndGet(); + if (count >= expectedExecutions) { + testContext.completeNow(); + } + }) + .build(); + + manager = new JsonFileScheduleManager(vertx, monitor); + + // Create configuration for a fast interval scheduler + JsonObject testData = new JsonObject().put("trigger", new JsonObject().put("type", "interval") + .put("interval", 1) // 1 second + .put("repeat", expectedExecutions)); + + fs.writeFile(testFilePath, testData.toBuffer()).compose(v -> { + manager.setup(vertx, new JsonObject().put("filePath", testFilePath)); + return manager.load(); + }).compose(v -> { + assertEquals(1, manager.getSchedulerCount()); + return manager.run(); + }).onComplete(testContext.succeeding(result -> { + // Execution counting is handled in the monitor + })); + + // Wait for the scheduler to execute the expected number of times + assertTrue(testContext.awaitCompletion(15, TimeUnit.SECONDS)); + assertTrue(executionCount.get() >= expectedExecutions); + } + + @Test + void test_manager_reload_functionality(Vertx vertx, VertxTestContext testContext) throws InterruptedException { + final AtomicInteger phase1Count = new AtomicInteger(0); + final AtomicInteger phase2Count = new AtomicInteger(0); + final AtomicInteger totalCount = new AtomicInteger(0); + + SchedulingMonitor monitor = SchedulingAsserter.builder() + .setTestContext(testContext) + .disableAutoCompleteTest() // We'll complete manually + // when phase 2 reaches the expected count + .setEach(result -> { + int total = totalCount.incrementAndGet(); + + if (total <= 2) { + phase1Count.incrementAndGet(); + } else { + phase2Count.incrementAndGet(); + if (phase2Count.get() >= 2) { + testContext.completeNow(); + } + } + }) + .build(); + + manager = new JsonFileScheduleManager(vertx, monitor); + + // Phase 1: Load initial configuration + JsonObject initialConfig = new JsonObject().put("trigger", new JsonObject().put("type", "interval") + .put("interval", 1) + .put("repeat", 2)); + + fs.writeFile(testFilePath, initialConfig.toBuffer()).compose(v -> { + manager.setup(vertx, new JsonObject().put("filePath", testFilePath)); + return manager.load(); + }).compose(v -> { + assertEquals(1, manager.getSchedulerCount()); + return manager.run(); + }).compose(v -> { + // Wait a bit for some executions, then reload + return vertx.executeBlocking(promise -> { + try { + Thread.sleep(3000); // Wait for 3 seconds + promise.complete(); + } catch (InterruptedException e) { + promise.fail(e); + } + }); + }).compose(v -> { + // Phase 2: Reload with new configuration + JsonObject newConfig = new JsonObject().put("trigger", new JsonObject().put("type", "interval") + .put("interval", 1) + .put("repeat", 3)); + + return fs.writeFile(testFilePath, newConfig.toBuffer()); + }).compose(v -> manager.reload()).onComplete(testContext.succeeding(result -> { + assertEquals(1, manager.getSchedulerCount()); + })); + + assertTrue(testContext.awaitCompletion(20, TimeUnit.SECONDS)); + assertTrue(phase1Count.get() >= 1, "Phase 1 should have at least 1 execution"); + assertTrue(phase2Count.get() >= 1, "Phase 2 should have at least 1 execution"); + } + + @Test + void test_manager_with_complex_configuration(Vertx vertx, VertxTestContext testContext) + throws InterruptedException { + manager = new JsonFileScheduleManager(vertx); + + // Create a complex configuration with multiple schedulers + JsonObject complexConfig = new JsonObject().put("schedulers", new JsonObject().put("fastScheduler", + new JsonObject().put( + "trigger", + new JsonObject().put( + "type", "interval") + .put( + "interval", + 2) + .put( + "repeat", + 2)) + .put( + "jobData", + new JsonObject().put( + "externalId", + "fast-job"))) + .put("cronScheduler", + new JsonObject().put( + "trigger", + new JsonObject().put( + "type", "cron") + .put( + "expression", + "0/5 * * ? * * *")) // Every 5 seconds + .put( + "jobData", + new JsonObject().put( + "externalId", + "cron-job")))); + + fs.writeFile(testFilePath, complexConfig.toBuffer()).compose(v -> { + manager.setup(vertx, new JsonObject().put("filePath", testFilePath)); + return manager.load(); + }).onComplete(testContext.succeeding(result -> { + assertEquals(2, manager.getSchedulerCount()); + assertNotNull(manager.getScheduler("fastScheduler")); + assertNotNull(manager.getScheduler("cronScheduler")); + testContext.completeNow(); + })); + + assertTrue(testContext.awaitCompletion(5, TimeUnit.SECONDS)); + } + + @Test + void test_manager_error_handling_with_invalid_config(Vertx vertx, VertxTestContext testContext) + throws InterruptedException { + manager = new JsonFileScheduleManager(vertx); + + // Configuration with invalid trigger data + JsonObject invalidConfig = new JsonObject().put("schedulers", new JsonArray().add( + new JsonObject().put( + "trigger", + new JsonObject().put( + "type", "interval") + .put("interval", -1))) // Invalid negative interval + .add( + new JsonObject().put("trigger", + new JsonObject().put( + "type", + "cron") + .put( + "expression", + "invalid-cron-expression")))); + + fs.writeFile(testFilePath, invalidConfig.toBuffer()).compose(v -> { + manager.setup(vertx, new JsonObject().put("filePath", testFilePath)); + return manager.load(); + }).onComplete(testContext.failing(throwable -> { + assertTrue(throwable instanceof IllegalArgumentException || + throwable.getCause() instanceof IllegalArgumentException); + assertEquals(0, manager.getSchedulerCount()); + testContext.completeNow(); + })); + + assertTrue(testContext.awaitCompletion(5, TimeUnit.SECONDS)); + } + + @Test + void test_full_lifecycle_with_file_operations(Vertx vertx, VertxTestContext testContext) + throws InterruptedException { + manager = new JsonFileScheduleManager(vertx); + + JsonObject initialData = new JsonObject().put("trigger", new JsonObject().put("type", "interval") + .put("interval", 5) + .put("repeat", 1)); + + fs.writeFile(testFilePath, initialData.toBuffer()).compose(v -> { + manager.setup(vertx, new JsonObject().put("filePath", testFilePath)); + return manager.load(); + }).compose(v -> { + assertEquals(1, manager.getSchedulerCount()); + return manager.run(); + }).compose(v -> { + // Save new data + JsonObject newData = new JsonObject().put("schedulers", new JsonArray().add( + new JsonObject().put("trigger", new JsonObject().put("type", "interval").put("interval", 3)))); + return manager.saveData(newData); + }).compose(v -> manager.reload()).compose(v -> manager.shutdown()).onComplete(testContext.succeeding(result -> { + assertEquals(0, manager.getSchedulerCount()); + testContext.completeNow(); + })); + + assertTrue(testContext.awaitCompletion(10, TimeUnit.SECONDS)); + } + +} diff --git a/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManagerTest.java b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManagerTest.java new file mode 100644 index 00000000..ad8745da --- /dev/null +++ b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/AbstractScheduleManagerTest.java @@ -0,0 +1,281 @@ +package io.github.zero88.schedulerx.manager.impl; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import io.github.zero88.schedulerx.SchedulingMonitor; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class AbstractScheduleManagerTest { + + private TestScheduleManager manager; + private JsonObject testData; + + static Stream provide_valid_configs() { + return Stream.of( + Arguments.of("Simple interval trigger", new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 5))), + + Arguments.of("Interval trigger with duration", new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", "PT10S") + .put("repeat", 5))), + + Arguments.of("Cron trigger", new JsonObject() + .put("trigger", new JsonObject() + .put("type", "cron") + .put("expression", "0 0/2 0 ? * * *"))), + + Arguments.of("Complex interval with rule", new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", "PT1H") + .put("initialDelay", "PT3S") + .put("rule", new JsonObject() + .put("until", "2023-10-20T10:10:00Z")))) + ); + } + + @BeforeEach + void setUp(Vertx vertx) { + manager = new TestScheduleManager(vertx); + testData = new JsonObject(); + } + + @AfterEach + void tearDown(VertxTestContext testContext) { + manager.shutdown() + .onComplete(testContext.succeedingThenComplete()); + } + + @Test + void test_setup_is_required_before_load(VertxTestContext testContext) { + manager.load() + .onComplete(testContext.failing(throwable -> { + assertInstanceOf(IllegalStateException.class, throwable); + assertTrue(throwable.getMessage().contains("Manager must be setup before loading")); + testContext.completeNow(); + })); + } + + @Test + void test_setup_with_config(VertxTestContext testContext) { + JsonObject config = new JsonObject().put("key", "value"); + + assertDoesNotThrow(() -> manager.setup(vertx, config)); + assertTrue(manager.isSetup); + testContext.completeNow(); + } + + @Test + void test_setup_with_null_config(VertxTestContext testContext) { + assertDoesNotThrow(() -> manager.setup(vertx, null)); + assertTrue(manager.isSetup); + testContext.completeNow(); + } + + @Test + void test_load_empty_data_should_succeed(VertxTestContext testContext) { + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(new JsonObject()); + + manager.load() + .onComplete(testContext.succeeding(result -> { + assertEquals(0, manager.getSchedulerCount()); + testContext.completeNow(); + })); + } + + @Test + void test_load_single_interval_scheduler(VertxTestContext testContext) { + JsonObject config = new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 5) + .put("repeat", 3)); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + manager.load() + .onComplete(testContext.succeeding(result -> { + assertEquals(1, manager.getSchedulerCount()); + assertNotNull(manager.getScheduler("default_scheduler")); + testContext.completeNow(); + })); + } + + @Test + void test_load_multiple_schedulers_as_array(VertxTestContext testContext) { + JsonArray schedulers = new JsonArray() + .add(new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 5))) + .add(new JsonObject() + .put("trigger", new JsonObject() + .put("type", "cron") + .put("expression", "0 0/2 0 ? * * *"))); + + JsonObject config = new JsonObject().put("schedulers", schedulers); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + manager.load() + .onComplete(testContext.succeeding(result -> { + assertEquals(2, manager.getSchedulerCount()); + assertNotNull(manager.getScheduler("scheduler_0")); + assertNotNull(manager.getScheduler("scheduler_1")); + testContext.completeNow(); + })); + } + + @Test + void test_run_without_schedulers(VertxTestContext testContext) { + manager.setup(vertx, new JsonObject()); + + manager.run() + .onComplete(testContext.succeeding(result -> { + assertEquals(0, manager.getSchedulerCount()); + testContext.completeNow(); + })); + } + + @Test + void test_complete_lifecycle(VertxTestContext testContext) { + JsonObject config = new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 5) + .put("repeat", 2)); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + manager.load() + .compose(v -> { + assertEquals(1, manager.getSchedulerCount()); + return manager.run(); + }) + .compose(v -> manager.shutdown()) + .onComplete(testContext.succeeding(result -> { + assertEquals(0, manager.getSchedulerCount()); + testContext.completeNow(); + })); + } + + @Test + void test_reload_functionality(VertxTestContext testContext) { + // Initial config + JsonObject initialConfig = new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 5)); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(initialConfig); + + manager.load() + .compose(v -> { + assertEquals(1, manager.getSchedulerCount()); + + // Change data for reload + JsonObject newConfig = new JsonObject() + .put("schedulers", new JsonArray() + .add(new JsonObject() + .put("trigger", new JsonObject() + .put("type", "cron") + .put("expression", "0 0/2 0 ? * * *"))) + .add(new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 10)))); + + manager.setDataToReturn(newConfig); + return manager.reload(); + }) + .onComplete(testContext.succeeding(result -> { + assertEquals(2, manager.getSchedulerCount()); + testContext.completeNow(); + })); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("provide_valid_configs") + void test_load_various_valid_configurations(String description, JsonObject config, VertxTestContext testContext) { + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + manager.load() + .onComplete(testContext.succeeding(result -> { + assertTrue(manager.getSchedulerCount() > 0, "Should have loaded at least one scheduler"); + testContext.completeNow(); + })); + } + + @Test + void test_invalid_trigger_type_should_fail(VertxTestContext testContext) { + JsonObject config = new JsonObject() + .put("trigger", new JsonObject() + .put("type", "invalid_type")); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + manager.load() + .onComplete(testContext.failing(throwable -> { + assertInstanceOf(IllegalArgumentException.class, throwable); + assertTrue(throwable.getMessage().contains("Unsupported trigger type")); + testContext.completeNow(); + })); + } + + + // Test implementation of AbstractManager for testing purposes + static class TestScheduleManager extends AbstractScheduleManager { + private JsonObject dataToReturn; + + public TestScheduleManager(Vertx vertx) { + super(vertx); + } + + public TestScheduleManager(Vertx vertx, SchedulingMonitor monitor) { + super(vertx, monitor); + } + + public void setDataToReturn(JsonObject data) { + this.dataToReturn = data; + } + + @Override + protected Future loadData() { + if (dataToReturn == null) { + return Future.succeededFuture(new JsonObject()); + } + return Future.succeededFuture(dataToReturn); + } + } +} diff --git a/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/JobClassScheduleManagerTest.java b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/JobClassScheduleManagerTest.java new file mode 100644 index 00000000..b53a318a --- /dev/null +++ b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/JobClassScheduleManagerTest.java @@ -0,0 +1,102 @@ +package io.github.zero88.schedulerx.manager.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.github.zero88.schedulerx.ExecutionContext; +import io.github.zero88.schedulerx.ExecutionResult; +import io.github.zero88.schedulerx.Job; +import io.github.zero88.schedulerx.JobData; +import io.github.zero88.schedulerx.SchedulingMonitor; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +@ExtendWith(VertxExtension.class) +class JobClassScheduleManagerTest { + + private TestScheduleManager manager; + private AtomicReference lastJobResult; + + @BeforeEach + void setUp(Vertx vertx) { + lastJobResult = new AtomicReference<>(); + + // Create a monitor that will capture the job result + SchedulingMonitor monitor = new SchedulingMonitor() { + @Override + public void onEach(ExecutionResult result) { + if (result.data() != null) { + lastJobResult.set(result.data().toString()); + } + } + }; + + manager = new TestScheduleManager(vertx, monitor); + } + + @AfterEach + void tearDown(VertxTestContext testContext) { + manager.shutdown().onComplete(testContext.succeedingThenComplete()); + } + + @Test + void test_create_job_from_job_class(VertxTestContext testContext) { + // Create a configuration with a jobClass specification + JsonObject config = new JsonObject() + .put("trigger", new JsonObject() + .put("type", "interval") + .put("interval", 1) // 1 second interval + .put("repeat", 1)) // Just run once + .put("job", new JsonObject() + .put("jobClass", "io.github.zero88.schedulerx.manager.impl.TestCustomJob")) + .put("jobData", new JsonObject() + .put("external_id", "test-job-1") + .put("message", "Hello from test")); + + manager.setup(vertx, new JsonObject()); + manager.setDataToReturn(config); + + // Load and run the scheduler + manager.load() + .compose(v -> manager.run()) + .onComplete(testContext.succeeding(result -> { + // Verify a scheduler was created with our jobClass + assertEquals(1, manager.getSchedulerCount()); + assertNotNull(manager.getScheduler("test-job-1")); + + // Wait a bit for the job to execute + vertx.setTimer(2000, id -> { + // Verify our custom job executed properly + String resultMsg = lastJobResult.get(); + assertNotNull(resultMsg); + assertTrue(resultMsg.contains("Hello from test")); + testContext.completeNow(); + }); + })); + } +} + +/** + * Test custom job with a default constructor + */ +class TestCustomJob implements Job { + @Override + public void execute(JobData jobData, ExecutionContext executionContext) { + // Access config values from jobData instead of constructor + String message = "Default message"; + if (jobData.get() instanceof JsonObject jobDataJson) { + message = jobDataJson.getString("message", message); + } + executionContext.complete("TestCustomJob executed with message: " + message); + } +} diff --git a/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TestScheduleManager.java b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TestScheduleManager.java new file mode 100644 index 00000000..d5bb111e --- /dev/null +++ b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TestScheduleManager.java @@ -0,0 +1,35 @@ +package io.github.zero88.schedulerx.manager.impl; + +import io.github.zero88.schedulerx.SchedulingMonitor; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; + +/** + * A test implementation of AbstractScheduleManager that returns predefined data. + */ +class TestScheduleManager extends AbstractScheduleManager { + + private JsonObject dataToReturn; + + TestScheduleManager(Vertx vertx) { + super(vertx); + } + + TestScheduleManager(Vertx vertx, SchedulingMonitor monitor) { + super(vertx, monitor); + } + + /** + * Set the data that should be returned by loadData(). + */ + void setDataToReturn(JsonObject dataToReturn) { + this.dataToReturn = dataToReturn; + } + + @Override + protected Future loadData() { + // Return predefined data for testing + return Future.succeededFuture(dataToReturn != null ? dataToReturn : new JsonObject()); + } +} diff --git a/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TriggerDeserializationTest.java b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TriggerDeserializationTest.java new file mode 100644 index 00000000..095e5087 --- /dev/null +++ b/manager/src/test/java/io/github/zero88/schedulerx/manager/impl/TriggerDeserializationTest.java @@ -0,0 +1,78 @@ +package io.github.zero88.schedulerx.manager.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.github.zero88.schedulerx.trigger.CronTrigger; +import io.github.zero88.schedulerx.trigger.IntervalTrigger; +import io.github.zero88.schedulerx.trigger.Trigger; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.junit5.VertxExtension; + +@ExtendWith(VertxExtension.class) +class TriggerDeserializationTest { + + @Test + void test_create_interval_trigger() { + TestScheduleManager manager = new TestScheduleManager(Vertx.vertx()); + + JsonObject triggerConfig = new JsonObject().put("type", "interval").put("interval", 60).put("repeat", 5); + + Trigger trigger = manager.createTrigger("interval", triggerConfig); + + assertNotNull(trigger); + assertInstanceOf(IntervalTrigger.class, trigger); + IntervalTrigger intervalTrigger = (IntervalTrigger) trigger; + assertEquals(5, intervalTrigger.getRepeat()); + assertEquals(60, intervalTrigger.getInterval()); + } + + @Test + void test_create_cron_trigger() { + TestScheduleManager manager = new TestScheduleManager(Vertx.vertx()); + + JsonObject triggerConfig = new JsonObject().put("type", "cron").put("expression", "0 0/5 * * * ? *"); + + Trigger trigger = manager.createTrigger("cron", triggerConfig); + + assertNotNull(trigger); + assertInstanceOf(CronTrigger.class, trigger); + CronTrigger cronTrigger = (CronTrigger) trigger; + assertEquals("0 0/5 * * * ? *", cronTrigger.getExpression()); + } + + @Test + void test_invalid_trigger_config() { + TestScheduleManager manager = new TestScheduleManager(Vertx.vertx()); + + JsonObject invalidConfig = new JsonObject().put("type", "interval") + .put("interval", -10); // Invalid negative interval + + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + manager.createTrigger("interval", invalidConfig); + }); + + assertTrue(exception.getMessage().contains("Invalid trigger configuration")); + } + + @Test + void test_unsupported_trigger_type() { + TestScheduleManager manager = new TestScheduleManager(Vertx.vertx()); + + JsonObject config = new JsonObject().put("type", "unsupported"); + + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + manager.createTrigger("unsupported", config); + }); + + assertTrue(exception.getMessage().contains("Unsupported trigger type")); + } + +} diff --git a/manager/src/test/resources/data1.json b/manager/src/test/resources/data1.json new file mode 100644 index 00000000..d7186459 --- /dev/null +++ b/manager/src/test/resources/data1.json @@ -0,0 +1,25 @@ +{ + "scheduler-1": { + "job": { + "classType": "http" + }, + "jobData": { + "url": "https://example.com/api", + "method": "GET", + "headers": { + } + }, + "trigger": { + "type": "interval", + "rule": { + "timeframes": [], + "beginTime": null, + "until": "2023-10-20T10:10:00Z", + "leeway": "PT0S" + }, + "repeat": -1, + "initialDelay": "PT3S", + "interval": "PT1H" + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 696e2dcd..51d172b6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -15,7 +15,7 @@ pluginManagement { val projectName = "schedulerx" val profile: String by settings val pools = mutableMapOf( - projectName to arrayOf(":schedulerx", ":event-trigger-jsonschema"), + projectName to arrayOf(":schedulerx", ":event-trigger-jsonschema", ":manager"), "sample" to emptyArray(), "integtest" to emptyArray() )