Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions apps/backend/src/queue/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,46 @@ import { retrieveRelevantKnowledge } from "../services/embedding";
import { logger } from "../utils/logger";
import { Server } from "socket.io";

// 正在处理中的任务集合 - 用于内存级别的幂等性检查
const processingTasks = new Set<string>();

/**
* 检查任务是否已经在处理中
*/
function isTaskProcessing(taskId: string): boolean {
return processingTasks.has(taskId);
}

/**
* 标记任务为处理中
*/
function markTaskProcessing(taskId: string): void {
processingTasks.add(taskId);
}

/**
* 标记任务为处理完成
*/
function markTaskCompleted(taskId: string): void {
processingTasks.delete(taskId);
}

/**
* 检查数据库中任务状态是否允许执行
*/
async function canExecuteTask(taskId: string): Promise<boolean> {
const task = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, taskId),
});

if (!task) {
return false;
}

// 只允许 queued 或 failed 状态的任务执行
return task.status === "queued" || task.status === "failed";
}

const connection = new IORedis({
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
Expand Down Expand Up @@ -80,6 +120,26 @@ export function startWorker(io: Server) {
async (job: Job<TaskPayload>) => {
const { taskId, novelId, chapterId, type, input } = job.data;

// 幂等性检查 1: 内存级别 - 检查是否已经在处理中
if (isTaskProcessing(taskId)) {
logger.warn(
`Task ${taskId} is already being processed, skipping duplicate execution`,
);
return { skipped: true, reason: "already_processing" };
}

// 幂等性检查 2: 数据库级别 - 检查任务状态
const canExecute = await canExecuteTask(taskId);
if (!canExecute) {
logger.warn(
`Task ${taskId} cannot be executed (status not queued/failed), skipping`,
);
return { skipped: true, reason: "invalid_status" };
}

// 标记任务为处理中
markTaskProcessing(taskId);

try {
// Update task status to running
await db
Expand Down Expand Up @@ -428,6 +488,9 @@ export function startWorker(io: Server) {
});

throw error;
} finally {
// 无论成功或失败,都标记任务为处理完成
markTaskCompleted(taskId);
}
},
{
Expand Down
185 changes: 172 additions & 13 deletions apps/backend/src/routes/task.routes.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,107 @@
import { Router } from "express";
import { db, schema } from "../database";
import { eq } from "drizzle-orm";
import { eq, and, sql } from "drizzle-orm";
import { AuthRequest } from "../middleware/auth";
import { novelQueue } from "../queue/worker";

const router: Router = Router();

// 任务类型分组定义 - 用于幂等性检查
const TASK_TYPE_GROUPS = {
// 小说级别的任务 - 同一小说同一时间只能有一个进行中的任务
novelLevel: ["outline", "title", "volume_planning"] as const,
// 分卷级别的任务
volumeLevel: ["chapter_planning"] as const,
// 章节级别的任务 - 同一章节同一时间只能有一个进行中的任务
chapterLevel: [
"chapter_outline",
"chapter_detail",
"content",
"consistency_check",
] as const,
};

/**
* 检查是否存在进行中的相同类型任务
* @param novelId 小说ID
* @param type 任务类型
* @param chapterId 章节ID(可选)
* @returns 存在则返回任务ID,否则返回null
*/
async function checkExistingTask(
novelId: string,
type: string,
chapterId?: string,
): Promise<string | null> {
// 使用 SQL 查询来避免类型问题
const conditions = [
eq(schema.tasks.novelId, novelId),
eq(schema.tasks.type, type as any),
sql`${schema.tasks.status} IN ('queued', 'running')`,
];

if (chapterId) {
conditions.push(eq(schema.tasks.chapterId, chapterId));
}

const existingTask = await db.query.tasks.findFirst({
where: and(...conditions),
orderBy: (tasks, { desc }) => [desc(tasks.createdAt)],
});

return existingTask?.id || null;
}

/**
* 检查小说级别任务是否存在冲突
* 某些任务类型(如大纲生成)应该互斥
*/
async function checkNovelLevelConflict(
novelId: string,
type: string,
): Promise<string | null> {
// 检查是否属于 novelLevel 组
if (TASK_TYPE_GROUPS.novelLevel.includes(type as any)) {
// 检查该小说是否有任何 novelLevel 任务在进行中
const existingTask = await db.query.tasks.findFirst({
where: and(
eq(schema.tasks.novelId, novelId),
sql`${schema.tasks.type} IN ('outline', 'title', 'volume_planning')`,
sql`${schema.tasks.status} IN ('queued', 'running')`,
),
orderBy: (tasks, { desc }) => [desc(tasks.createdAt)],
});
return existingTask?.id || null;
}
return null;
}

// Generate outline
router.post(
"/:novelId/generate/outline",
async (req: AuthRequest, res, next) => {
try {
const novelId = req.params.novelId;

// 幂等性检查:检查是否已有进行中的大纲生成任务
const existingTaskId = await checkNovelLevelConflict(novelId, "outline");
if (existingTaskId) {
const existingTask = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, existingTaskId),
});
res.status(409).json({
error: "已有进行中的生成任务",
message: "该小说已有进行中的大纲生成任务,请等待完成后再试",
task: existingTask,
});
return;
}

// Create task
const [task] = await db
.insert(schema.tasks)
.values({
novelId: req.params.novelId,
novelId: novelId,
type: "outline",
status: "queued",
})
Expand All @@ -26,7 +112,7 @@ router.post(
"generate-outline",
{
taskId: task.id,
novelId: req.params.novelId,
novelId: novelId,
type: "outline",
},
{
Expand All @@ -50,12 +136,27 @@ router.post(
"/:novelId/generate/titles",
async (req: AuthRequest, res, next) => {
try {
const novelId = req.params.novelId;
const { outline } = req.body;

// 幂等性检查
const existingTaskId = await checkNovelLevelConflict(novelId, "title");
if (existingTaskId) {
const existingTask = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, existingTaskId),
});
res.status(409).json({
error: "已有进行中的生成任务",
message: "该小说已有进行中的标题生成任务,请等待完成后再试",
task: existingTask,
});
return;
}

const [task] = await db
.insert(schema.tasks)
.values({
novelId: req.params.novelId,
novelId: novelId,
type: "title",
status: "queued",
})
Expand All @@ -65,7 +166,7 @@ router.post(
"generate-titles",
{
taskId: task.id,
novelId: req.params.novelId,
novelId: novelId,
type: "title",
input: { outline },
},
Expand All @@ -90,12 +191,30 @@ router.post(
"/:novelId/generate/volumes",
async (req: AuthRequest, res, next) => {
try {
const novelId = req.params.novelId;
const { outline } = req.body;

// 幂等性检查
const existingTaskId = await checkNovelLevelConflict(
novelId,
"volume_planning",
);
if (existingTaskId) {
const existingTask = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, existingTaskId),
});
res.status(409).json({
error: "已有进行中的生成任务",
message: "该小说已有进行中的分卷规划任务,请等待完成后再试",
task: existingTask,
});
return;
}

const [task] = await db
.insert(schema.tasks)
.values({
novelId: req.params.novelId,
novelId: novelId,
type: "volume_planning",
status: "queued",
})
Expand All @@ -105,7 +224,7 @@ router.post(
"generate-volumes",
{
taskId: task.id,
novelId: req.params.novelId,
novelId: novelId,
type: "volume_planning",
input: {
outline,
Expand Down Expand Up @@ -133,13 +252,31 @@ router.post(
"/:novelId/generate/chapters",
async (req: AuthRequest, res, next) => {
try {
const novelId = req.params.novelId;
const { outline, volumeId, additionalRequirements, targetCount } =
req.body;

// 幂等性检查:检查是否已有进行中的章节规划任务
const existingTaskId = await checkExistingTask(
novelId,
"chapter_planning",
);
if (existingTaskId) {
const existingTask = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, existingTaskId),
});
res.status(409).json({
error: "已有进行中的生成任务",
message: "该小说已有进行中的章节规划任务,请等待完成后再试",
task: existingTask,
});
return;
}

const [task] = await db
.insert(schema.tasks)
.values({
novelId: req.params.novelId,
novelId: novelId,
type: "chapter_planning",
status: "queued",
})
Expand All @@ -149,7 +286,7 @@ router.post(
"generate-chapters",
{
taskId: task.id,
novelId: req.params.novelId,
novelId: novelId,
type: "chapter_planning",
input: { outline, volumeId, additionalRequirements, targetCount },
},
Expand All @@ -174,11 +311,32 @@ router.post(
"/:novelId/chapters/:chapterId/generate",
async (req: AuthRequest, res, next) => {
try {
const novelId = req.params.novelId;
const chapterId = req.params.chapterId;

// 幂等性检查:检查该章节是否已有进行中的内容生成任务
const existingTaskId = await checkExistingTask(
novelId,
"content",
chapterId,
);
if (existingTaskId) {
const existingTask = await db.query.tasks.findFirst({
where: eq(schema.tasks.id, existingTaskId),
});
res.status(409).json({
error: "已有进行中的生成任务",
message: "该章节已有进行中的内容生成任务,请等待完成后再试",
task: existingTask,
});
return;
}

const [task] = await db
.insert(schema.tasks)
.values({
novelId: req.params.novelId,
chapterId: req.params.chapterId,
novelId: novelId,
chapterId: chapterId,
type: "content",
status: "queued",
})
Expand All @@ -188,9 +346,10 @@ router.post(
"generate-content",
{
taskId: task.id,
novelId: req.params.novelId,
chapterId: req.params.chapterId,
novelId: novelId,
chapterId: chapterId,
type: "content",
input: req.body,
},
{
attempts: 3,
Expand Down
Loading