diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index a0b1b81..89f70de 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -16,6 +16,7 @@ runs: node-version: ${{ inputs.node-version }} - name: Restore build outputs + id: cache uses: actions/cache@v4 with: path: | @@ -25,7 +26,15 @@ runs: **/dist key: ${{ runner.os }}-build-${{ hashFiles('**/yarn.lock') }}-${{ github.sha }} - - name: Setup Latest Yarn - uses: threeal/setup-yarn-action@v2.0.0 - with: - version: berry + - name: Enable Corepack + shell: bash + run: corepack enable + + - name: Install Yarn Berry + shell: bash + run: yarn set version berry + + - name: Install Dependencies + if: steps.cache.outputs.cache-hit != 'true' + shell: bash + run: yarn install --frozen-lockfile diff --git a/packages/engine/src/dependency-registry.ts b/packages/engine/src/dependency-registry.ts new file mode 100644 index 0000000..002c253 --- /dev/null +++ b/packages/engine/src/dependency-registry.ts @@ -0,0 +1,67 @@ +import { Backend } from "@sidequest/backend"; +import { NonNullableEngineConfig } from "./engine"; + +/** + * Enumeration of available dependency tokens for the dependency registry. + * Used as keys to register and retrieve dependencies throughout the engine. + */ +export enum Dependency { + /** Engine configuration */ + Config = "config", + /** Backend instance */ + Backend = "backend", +} + +/** + * Type mapping interface that associates each dependency token with its corresponding type. + * This ensures type safety when registering and retrieving dependencies. + */ +interface DependencyRegistryTypes { + [Dependency.Config]: NonNullableEngineConfig; + [Dependency.Backend]: Backend; +} + +/** + * A type-safe dependency injection container for managing core engine dependencies. + * Provides methods to register, retrieve, and clear dependencies used throughout the engine lifecycle. + */ +class DependencyRegistry { + /** + * Internal storage for registered dependencies. + */ + private registry = new Map(); + + /** + * Retrieves a registered dependency by its token. + * @param token - The dependency token to look up + * @returns The registered dependency instance, or undefined if not found + */ + get(token: T): DependencyRegistryTypes[T] | undefined { + return this.registry.get(token) as DependencyRegistryTypes[T] | undefined; + } + + /** + * Registers a dependency instance with the specified token. + * @param token - The dependency token to register under + * @param instance - The dependency instance to register + * @returns The registered instance + */ + register(token: T, instance: DependencyRegistryTypes[T]) { + this.registry.set(token, instance); + return instance; + } + + /** + * Clears all registered dependencies from the registry. + * Useful for cleanup and testing scenarios. + */ + clear() { + this.registry.clear(); + } +} + +/** + * Singleton instance of the dependency registry. + * This is the main container used throughout the engine to manage dependencies. + */ +export const dependencyRegistry = new DependencyRegistry(); diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 6338e09..bebfd12 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -1,4 +1,4 @@ -import { Backend, BackendConfig, LazyBackend, MISC_FALLBACK, NewQueueData, QUEUE_FALLBACK } from "@sidequest/backend"; +import { BackendConfig, LazyBackend, MISC_FALLBACK, NewQueueData, QUEUE_FALLBACK } from "@sidequest/backend"; import { configureLogger, JobClassType, logger, LoggerOptions } from "@sidequest/core"; import { ChildProcess, fork } from "child_process"; import { existsSync } from "fs"; @@ -6,6 +6,7 @@ import { cpus } from "os"; import { fileURLToPath } from "url"; import { inspect } from "util"; import { DEFAULT_WORKER_PATH } from "./constants"; +import { Dependency, dependencyRegistry } from "./dependency-registry"; import { JOB_BUILDER_FALLBACK } from "./job/constants"; import { ScheduledJobRegistry } from "./job/cron-registry"; import { JobBuilder, JobBuilderDefaults } from "./job/job-builder"; @@ -116,19 +117,6 @@ export type NonNullableEngineConfig = { * The main engine for managing job queues and workers in Sidequest. */ export class Engine { - /** - * Backend instance used by the engine. - * This is initialized when the engine is configured or started. - */ - private backend?: Backend; - - /** - * Current configuration of the engine. - * This is set when the engine is configured or started. - * It contains all the necessary settings for the engine to operate, such as backend, queues, logger options, and job defaults. - */ - private config?: NonNullableEngineConfig; - /** * Main worker process that runs the Sidequest engine. * This is created when the engine is started and handles job processing. @@ -147,11 +135,11 @@ export class Engine { * @returns The resolved configuration. */ async configure(config?: EngineConfig): Promise { - if (this.config) { + if (this.getConfig()) { logger("Engine").debug("Sidequest already configured"); - return this.config; + return this.getConfig()!; } - this.config = { + const nonNullConfig: NonNullableEngineConfig = { queues: config?.queues ?? [], backend: { driver: config?.backend?.driver ?? "@sidequest/sqlite-backend", @@ -190,21 +178,22 @@ export class Engine { jobsFilePath: config?.jobsFilePath?.trim() ?? "", jobPollingInterval: config?.jobPollingInterval ?? 100, }; + dependencyRegistry.register(Dependency.Config, nonNullConfig); this.validateConfig(); - logger("Engine").debug(`Configuring Sidequest engine: ${inspect(this.config)}`); + logger("Engine").debug(`Configuring Sidequest engine: ${inspect(nonNullConfig)}`); - if (this.config.logger) { - configureLogger(this.config.logger); + if (nonNullConfig.logger) { + configureLogger(nonNullConfig.logger); } - this.backend = new LazyBackend(this.config.backend); - if (!this.config.skipMigration) { - await this.backend.migrate(); + const backend = dependencyRegistry.register(Dependency.Backend, new LazyBackend(nonNullConfig.backend)); + if (!nonNullConfig.skipMigration) { + await backend.migrate(); } - return this.config; + return nonNullConfig; } /** @@ -224,18 +213,19 @@ export class Engine { * - Logs the resolved jobs file path when using manual job resolution */ validateConfig() { - if (this.config!.maxConcurrentJobs !== undefined && this.config!.maxConcurrentJobs < 1) { + const config = this.getConfig(); + if (config!.maxConcurrentJobs !== undefined && config!.maxConcurrentJobs < 1) { throw new Error(`Invalid "maxConcurrentJobs" value: must be at least 1.`); } - if (this.config!.manualJobResolution) { - if (this.config!.jobsFilePath) { - const scriptUrl = resolveScriptPath(this.config!.jobsFilePath); + if (config!.manualJobResolution) { + if (config!.jobsFilePath) { + const scriptUrl = resolveScriptPath(config!.jobsFilePath); if (!existsSync(fileURLToPath(scriptUrl))) { throw new Error(`The specified jobsFilePath does not exist. Resolved to: ${scriptUrl}`); } - logger("Engine").info(`Using manual jobs file at: ${this.config!.jobsFilePath}`); - this.config!.jobsFilePath = scriptUrl; + logger("Engine").info(`Using manual jobs file at: ${config!.jobsFilePath}`); + config!.jobsFilePath = scriptUrl; } else { // This should throw an error if not found findSidequestJobsScriptInParentDirs(); @@ -253,13 +243,13 @@ export class Engine { return; } - await this.configure(config); + const nonNullConfig = await this.configure(config); - logger("Engine").info(`Starting Sidequest using backend ${this.config!.backend.driver}`); + logger("Engine").info(`Starting Sidequest using backend ${nonNullConfig.backend.driver}`); - if (this.config!.queues) { - for (const queue of this.config!.queues) { - await grantQueueConfig(this.backend!, queue, this.config!.queueDefaults, true); + if (nonNullConfig.queues) { + for (const queue of nonNullConfig.queues) { + await grantQueueConfig(dependencyRegistry.get(Dependency.Backend)!, queue, nonNullConfig.queueDefaults, true); } } @@ -276,7 +266,7 @@ export class Engine { this.mainWorker.on("message", (msg) => { if (msg === "ready") { logger("Engine").debug("Main worker is ready"); - this.mainWorker?.send({ type: "start", sidequestConfig: this.config! }); + this.mainWorker?.send({ type: "start", sidequestConfig: nonNullConfig }); clearTimeout(timeout); resolve(); } @@ -291,7 +281,7 @@ export class Engine { }; runWorker(); - gracefulShutdown(this.close.bind(this), "Engine", this.config!.gracefulShutdown); + gracefulShutdown(this.close.bind(this), "Engine", nonNullConfig.gracefulShutdown); } }); } @@ -301,7 +291,7 @@ export class Engine { * @returns The current configuration, if set. */ getConfig() { - return this.config; + return dependencyRegistry.get(Dependency.Config); } /** @@ -309,7 +299,7 @@ export class Engine { * @returns The backend instance, if set. */ getBackend() { - return this.backend; + return dependencyRegistry.get(Dependency.Backend); } /** @@ -331,18 +321,18 @@ export class Engine { await promise; } try { - await this.backend?.close(); + await dependencyRegistry.get(Dependency.Backend)?.close(); } catch (error) { logger("Engine").error("Error closing backend:", error); } - this.config = undefined; - this.backend = undefined; this.mainWorker = undefined; // Reset the shutting down flag after closing // This allows the engine to be reconfigured or restarted later clearGracefulShutdown(); logger("Engine").debug("Sidequest engine closed."); this.shuttingDown = false; + // Clear the dependency registry to allow fresh configuration later + dependencyRegistry.clear(); } } @@ -352,7 +342,9 @@ export class Engine { * @returns A new JobBuilder instance for the job class. */ build(JobClass: T) { - if (!this.config || !this.backend) { + const backend = this.getBackend(); + const config = this.getConfig(); + if (!config || !backend) { throw new Error("Engine not configured. Call engine.configure() or engine.start() first."); } if (this.shuttingDown) { @@ -360,15 +352,15 @@ export class Engine { } logger("Engine").debug(`Building job for class: ${JobClass.name}`); return new JobBuilder( - this.backend, + backend, JobClass, { - ...this.config.jobDefaults, + ...config.jobDefaults, // We need to do this check again because available at is a getter. It needs to be set at job creation time. // If not set, it will use the fallback value which is outdated from config. - availableAt: this.config.jobDefaults.availableAt ?? JOB_BUILDER_FALLBACK.availableAt!, + availableAt: config.jobDefaults.availableAt ?? JOB_BUILDER_FALLBACK.availableAt!, }, - this.config.manualJobResolution, + config.manualJobResolution, ); } } diff --git a/sidequest.jobs.js b/sidequest.jobs.js new file mode 100644 index 0000000..fb064e2 --- /dev/null +++ b/sidequest.jobs.js @@ -0,0 +1,9 @@ +import { + EnqueueFromWithinJob, + FailingJob, + RetryJob, + SuccessJob, + TimeoutJob, +} from "./tests/integration/jobs/test-jobs.js"; + +export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob }; diff --git a/tests/fixture.ts b/tests/fixture.ts index 550d8ab..16a2d86 100644 --- a/tests/fixture.ts +++ b/tests/fixture.ts @@ -1,6 +1,7 @@ import { test as baseTest } from "vitest"; import { Backend } from "../packages/backends/backend/src"; import { Engine, NonNullableEngineConfig } from "../packages/engine/src"; +import { dependencyRegistry } from "../packages/engine/src/dependency-registry"; export interface SidequestTestFixture { engine: Engine; @@ -29,6 +30,7 @@ export const sidequestTest = baseTest.extend({ } await engine.close(); + dependencyRegistry.clear(); }, backend: async ({ engine }, use) => { diff --git a/tests/integration/shared-test-suite.js b/tests/integration/shared-test-suite.js index a89f7cd..80efc09 100644 --- a/tests/integration/shared-test-suite.js +++ b/tests/integration/shared-test-suite.js @@ -556,7 +556,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") // Wait for the two scheduled executions await vi.waitUntil(async () => { const currentJobs = await Sidequest.job.list(); - return currentJobs.length === 2 && currentJobs.every((job) => job.state === "completed"); + return currentJobs.length >= 2 && currentJobs.every((job) => job.state === "completed"); }, 5000); }); @@ -609,7 +609,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") let currentJobs; await vi.waitUntil(async () => { currentJobs = await Sidequest.job.list(); - return currentJobs.length === 3 && currentJobs.every((job) => job.state === "completed"); + return currentJobs.length >= 3 && currentJobs.every((job) => job.state === "completed"); }, 5000); const job1Executions = currentJobs.filter((job) => job.args[0] === "job-1"); @@ -634,7 +634,7 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") let currentJobs; await vi.waitUntil(async () => { currentJobs = await Sidequest.job.list(); - return currentJobs.length === 2 && currentJobs.every((job) => job.state === "completed"); + return currentJobs.length >= 2 && currentJobs.every((job) => job.state === "completed"); }, 5000); expect(currentJobs.length).toBe(2); diff --git a/tests/integration/sidequest.jobs.js b/tests/integration/sidequest.jobs.js new file mode 100644 index 0000000..68cd8c6 --- /dev/null +++ b/tests/integration/sidequest.jobs.js @@ -0,0 +1,3 @@ +import { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob } from "./jobs/test-jobs.js"; + +export { EnqueueFromWithinJob, FailingJob, RetryJob, SuccessJob, TimeoutJob };