@@ -4,6 +4,7 @@ import type { RedisOptions as BullRedisOptions } from 'bullmq'
44import type { Plugin } from 'rollup'
55import { relative } from 'node:path'
66import scanFolder from './utils/scan-folder'
7+ import { generateWorkersEntryContent } from './utils/generate-workers-entry-content'
78
89// Module options TypeScript interface definition
910type ModuleRedisOptions = BullRedisOptions & { url ?: string }
@@ -61,98 +62,6 @@ export default defineNuxtModule<ModuleOptions>({
6162
6263 addServerPlugin ( tpl . dst )
6364
64- function generateWorkersEntryContent ( workerFiles : string [ ] ) : string {
65- const toImportArray = workerFiles . map ( id => `() => import(${ JSON . stringify ( id ) } )` ) . join ( ',\n ' )
66- return `
67- import { fileURLToPath } from 'node:url'
68- import { resolve as resolvePath } from 'node:path'
69- import { consola } from 'consola'
70- import { $workers } from '#processor-utils'
71-
72- // Initialize connection as early as possible so any imports that register
73- // workers/queues have a valid connection available.
74- const api = $workers()
75- api.setConnection(${ redisInline } )
76-
77- export async function createWorkersApp() {
78- // Avoid EPIPE when stdout/stderr are closed by terminal (e.g., Ctrl+C piping)
79- const handleStreamError = (err) => {
80- try {
81- const code = (typeof err === 'object' && err && 'code' in err) ? err.code : null
82- if (code === 'EPIPE') return
83- } catch (e) { console.warn?.('nuxt-processor: stream error inspection failed', e) }
84- throw err
85- }
86- try { process.stdout?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stdout error handler', err) }
87- try { process.stderr?.on?.('error', handleStreamError) } catch (err) { console.warn('nuxt-processor: failed to attach stderr error handler', err) }
88- const modules = [
89- ${ toImportArray }
90- ]
91- for (const loader of modules) {
92- await loader()
93- }
94- const logger = consola.create({}).withTag('nuxt-processor')
95- try {
96- const workerNames = Array.isArray(api.workers) ? api.workers.map(w => w && w.name).filter(Boolean) : []
97- logger.info('starting workers:\\n' + workerNames.map(n => ' - ' + n).join('\\n'))
98- for (const w of api.workers) {
99- w.on('error', (err) => logger.error('worker error', err))
100- }
101- // Explicitly start workers since autorun is disabled
102- for (const w of api.workers) {
103- try {
104- // run() returns a promise that resolves when the worker stops; do not await to avoid blocking
105- // eslint-disable-next-line promise/catch-or-return
106- w.run().catch((err) => logger.error('worker run error', err))
107- }
108- catch (err) {
109- logger.error('failed to start worker', err)
110- }
111- }
112- logger.success('workers started')
113- } catch (err) {
114- logger.error('failed to initialize workers', err)
115- }
116- return { stop: api.stopAll, workers: api.workers }
117- }
118-
119- const isMain = (() => {
120- try {
121- if (typeof process === 'undefined' || !process.argv || !process.argv[1]) return false
122- const argvPath = resolvePath(process.cwd?.() || '.', process.argv[1])
123- const filePath = fileURLToPath(import.meta.url)
124- return filePath === argvPath
125- } catch {
126- return false
127- }
128- })()
129- if (isMain) {
130- const logger = consola.create({}).withTag('nuxt-processor')
131- const appPromise = createWorkersApp().catch((err) => {
132- logger.error('failed to start workers', err)
133- process.exit(1)
134- })
135- const shutdown = async () => {
136- try { logger.info('closing workers...') } catch (err) { console.warn('nuxt-processor: failed to log shutdown start', err) }
137- try {
138- const app = await appPromise
139- try {
140- const names = (app?.workers || []).map(w => w && w.name).filter(Boolean)
141- logger.info('closing workers:\\n' + names.map(n => ' - ' + n).join('\\n'))
142- } catch (eL) { console.warn('nuxt-processor: failed to log workers list on shutdown', eL) }
143- await app.stop()
144- try { logger.success('workers closed') } catch (err2) { console.warn('nuxt-processor: failed to log shutdown complete', err2) }
145- }
146- finally { process.exit(0) }
147- }
148- ;['SIGINT','SIGTERM','SIGQUIT'].forEach(sig => process.on(sig, shutdown))
149- process.on('beforeExit', shutdown)
150- }
151-
152- export default { createWorkersApp }
153- `
154- }
155-
15665 // Alias inside the app to the identity API so user imports resolve at build-time
15766 nuxt . options . alias = nuxt . options . alias ?? { }
15867 nuxt . options . alias [ 'nuxt-processor' ] = resolve ( './runtime/server/handlers' )
@@ -200,7 +109,7 @@ declare module '#bullmq' {
200109 virtualCode = ''
201110 return
202111 }
203- virtualCode = generateWorkersEntryContent ( workerFiles )
112+ virtualCode = generateWorkersEntryContent ( workerFiles , redisInline )
204113 for ( const id of workerFiles ) {
205114 this . addWatchFile ( id )
206115 }
0 commit comments