From ee78b5834bec4cc9f199985b722dfe905c7d6705 Mon Sep 17 00:00:00 2001 From: usingnameing Date: Mon, 21 Jul 2025 16:02:01 +0900 Subject: [PATCH 1/5] SCRUM-247 : redis i/o optimized --- src/app.gateway.ts | 1 - src/canvas/broadcast.service.ts | 147 +++++++++++++++++++++---- src/canvas/canvas.gateway.ts | 69 ++++++------ src/canvas/canvas.service.ts | 12 +- src/queues/bullmq.queue.ts | 14 ++- src/queues/bullmq.worker.ts | 23 ++-- src/queues/pixelUpdate.worker.ts | 183 +++++++++++++++++++++++++++++++ 7 files changed, 380 insertions(+), 69 deletions(-) create mode 100644 src/queues/pixelUpdate.worker.ts diff --git a/src/app.gateway.ts b/src/app.gateway.ts index a37ebb8..da0d547 100644 --- a/src/app.gateway.ts +++ b/src/app.gateway.ts @@ -84,7 +84,6 @@ export class AppGateway // 모든 이벤트에 대한 디버그 리스너 추가 server.on('connection', (socket) => { console.log(`[AppGateway] 클라이언트 연결됨: ${socket.id}`); - // 모든 이벤트 로깅 // socket.onAny((eventName, ...args) => { // console.log( diff --git a/src/canvas/broadcast.service.ts b/src/canvas/broadcast.service.ts index 4faa686..c3bd032 100644 --- a/src/canvas/broadcast.service.ts +++ b/src/canvas/broadcast.service.ts @@ -1,4 +1,85 @@ -import { Injectable } from '@nestjs/common'; +// import { Injectable } from '@nestjs/common'; +// import { Server } from 'socket.io'; +// import { waitForSocketServer } from '../socket/socket.manager'; + +// interface PixelUpdate { +// canvas_id: string; +// x: number; +// y: number; +// color: string; +// // owner: number; +// } + +// @Injectable() +// export class BroadcastService { +// private pixelBatchQueue = new Map(); +// private batchTimeouts = new Map(); +// private readonly BATCH_SIZE = 30; +// private readonly BATCH_TIMEOUT_MS = 16.67; // 60fps (1000ms / 60) +// private server: Server | null = null; + +// private async getServer(): Promise { +// if (this.server) return this.server; +// this.server = await waitForSocketServer(); // 🔐 안전 +// return this.server; +// } + +// addPixelToBatch(pixel: PixelUpdate, immediate: boolean = false) { +// const { canvas_id } = pixel; + +// // 즉시 브로드캐스트 요청이면 바로 처리 +// if (immediate) { +// const io = await this.getServer(); +// io.to(`canvas_${canvas_id}`).emit('pixel_update', { +// pixels: [pixel], +// }); +// return; +// } + +// if (!this.pixelBatchQueue.has(canvas_id)) { +// this.pixelBatchQueue.set(canvas_id, []); +// } + +// this.pixelBatchQueue.get(canvas_id)!.push(pixel); + +// // 배치 크기 도달 시 즉시 flush +// if (this.pixelBatchQueue.get(canvas_id)!.length >= this.BATCH_SIZE) { +// this.flushBatch(canvas_id); +// } else { +// this.scheduleBatchFlush(canvas_id); +// } +// } + +// private scheduleBatchFlush(canvas_id: string) { +// if (this.batchTimeouts.has(canvas_id)) return; + +// const timeout = setTimeout(() => { +// this.flushBatch(canvas_id); +// this.batchTimeouts.delete(canvas_id); +// }, this.BATCH_TIMEOUT_MS); + +// this.batchTimeouts.set(canvas_id, timeout); +// } + +// private async flushBatch(canvas_id: string) { +// const io = await this.getServer(); + +// const pixels = this.pixelBatchQueue.get(canvas_id); +// if (!pixels || pixels.length === 0) return; + +// // 배치로 전송 +// io.to(`canvas_${canvas_id}`).emit('pixel_update', { pixels: pixels }); +// this.pixelBatchQueue.set(canvas_id, []); +// } + +// private flushAllBatches() { +// for (const canvas_id of this.pixelBatchQueue.keys()) { +// this.flushBatch(canvas_id); +// } +// } +// } + +import { Injectable, OnModuleInit } from '@nestjs/common'; import { Server } from 'socket.io'; import { waitForSocketServer } from '../socket/socket.manager'; @@ -7,26 +88,47 @@ interface PixelUpdate { x: number; y: number; color: string; - // owner: number; } @Injectable() -export class BroadcastService { +export class BroadcastService implements OnModuleInit { private pixelBatchQueue = new Map(); - private batchTimeouts = new Map(); - private readonly BATCH_SIZE = 30; + private batchTimeout: NodeJS.Timeout | null = null; + private readonly BATCH_SIZE = 50; private readonly BATCH_TIMEOUT_MS = 16.67; // 60fps (1000ms / 60) private server: Server | null = null; - private async getServer(): Promise { - if (this.server) return this.server; - this.server = await waitForSocketServer(); // 🔐 안전 - return this.server; + // 모듈 초기화 시 서버 인스턴스 가져오기 + async onModuleInit() { + try { + this.server = await waitForSocketServer(); + console.log('[BroadcastService] Socket 서버 인스턴스 획득 성공'); + } catch (error) { + console.error( + '[BroadcastService] Socket 서버 인스턴스 획득 실패:', + error + ); + } } - addPixelToBatch(pixel: PixelUpdate) { + // 즉시 브로드캐스트 옵션 추가 + addPixelToBatch(pixel: PixelUpdate, immediate: boolean = false) { + if (!this.server) { + console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다'); + return; + } + const { canvas_id } = pixel; + // 즉시 브로드캐스트 요청이면 바로 처리 + if (immediate) { + this.server.to(`canvas_${canvas_id}`).emit('pixel_update', { + pixels: [pixel], + }); + return; + } + + // 그 외는 기존 배치 처리 로직 if (!this.pixelBatchQueue.has(canvas_id)) { this.pixelBatchQueue.set(canvas_id, []); } @@ -37,29 +139,32 @@ export class BroadcastService { if (this.pixelBatchQueue.get(canvas_id)!.length >= this.BATCH_SIZE) { this.flushBatch(canvas_id); } else { - this.scheduleBatchFlush(canvas_id); + this.scheduleBatchFlush(); } } - private scheduleBatchFlush(canvas_id: string) { - if (this.batchTimeouts.has(canvas_id)) return; + private scheduleBatchFlush() { + if (this.batchTimeout) return; - const timeout = setTimeout(() => { - this.flushBatch(canvas_id); - this.batchTimeouts.delete(canvas_id); + this.batchTimeout = setTimeout(() => { + this.flushAllBatches(); + this.batchTimeout = null; }, this.BATCH_TIMEOUT_MS); - - this.batchTimeouts.set(canvas_id, timeout); } - private async flushBatch(canvas_id: string) { - const io = await this.getServer(); + private flushBatch(canvas_id: string) { + if (!this.server) { + console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다'); + return; + } const pixels = this.pixelBatchQueue.get(canvas_id); if (!pixels || pixels.length === 0) return; // 배치로 전송 - io.to(`canvas_${canvas_id}`).emit('pixel_update', { pixels: pixels }); + this.server + .to(`canvas_${canvas_id}`) + .emit('pixel_update', { pixels: pixels }); this.pixelBatchQueue.set(canvas_id, []); } diff --git a/src/canvas/canvas.gateway.ts b/src/canvas/canvas.gateway.ts index 598f335..8cf92a2 100644 --- a/src/canvas/canvas.gateway.ts +++ b/src/canvas/canvas.gateway.ts @@ -143,19 +143,27 @@ export class CanvasGateway implements OnGatewayInit { return; } - // 워커로 픽셀 이벤트 발행 (DB 저장을 위해) - await this.redis.publish( - 'pixel:updated', - JSON.stringify({ - canvasId: Number(pixel.canvas_id), - x: pixel.x, - y: pixel.y, - color: pixel.color, - owner: userId, - }) - ); + // this.broadcastService.addPixelToBatch( + // { + // canvas_id: pixel.canvas_id, + // x: pixel.x, + // y: pixel.y, + // color: pixel.color, + // }, + // true + // ); - console.log('픽셀 그리기 성공:', pixel); + // // 워커로 픽셀 이벤트 발행 (DB 저장을 위해) + // await this.redis.publish( + // 'pixel:updated', + // JSON.stringify({ + // canvasId: Number(pixel.canvas_id), + // x: pixel.x, + // y: pixel.y, + // color: pixel.color, + // owner: userId, + // }) + // ); // 비동기 브로드캐스트 (응답 속도 향상) this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', { @@ -167,12 +175,6 @@ export class CanvasGateway implements OnGatewayInit { }, ], }); - // this.broadcastService.addPixelToBatch({ - // canvas_id: pixel.canvas_id, - // x: pixel.x, - // y: pixel.y, - // color: pixel.color, - // }); } catch (error) { console.error('[Gateway] 픽셀 그리기 에러:', error); client.emit('pixel_error', { message: '픽셀 그리기 실패' }); @@ -264,18 +266,28 @@ export class CanvasGateway implements OnGatewayInit { return; } - // 워커로 픽셀 이벤트 발행 (DB 저장을 위해) - await this.redis.publish( - 'pixel:updated', - JSON.stringify({ - canvasId: Number(pixel.canvas_id), + this.broadcastService.addPixelToBatch( + { + canvas_id: pixel.canvas_id, x: pixel.x, y: pixel.y, color: pixel.color, - owner: userId, - }) + }, + false ); + // 워커로 픽셀 이벤트 발행 (DB 저장을 위해) + // await this.redis.publish( + // 'pixel:updated', + // JSON.stringify({ + // canvasId: Number(pixel.canvas_id), + // x: pixel.x, + // y: pixel.y, + // color: pixel.color, + // owner: userId, + // }) + // ); + // 비동기 브로드캐스트 (응답 속도 향상) // setImmediate(() => { // this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', { @@ -284,13 +296,6 @@ export class CanvasGateway implements OnGatewayInit { // color: pixel.color, // }); // }); - - this.broadcastService.addPixelToBatch({ - canvas_id: pixel.canvas_id, - x: pixel.x, - y: pixel.y, - color: pixel.color, - }); } catch (error) { console.error('[Gateway] 픽셀 그리기 에러:', error); } diff --git a/src/canvas/canvas.service.ts b/src/canvas/canvas.service.ts index f339b4c..a019bc7 100644 --- a/src/canvas/canvas.service.ts +++ b/src/canvas/canvas.service.ts @@ -10,7 +10,7 @@ import { DrawPixelResponse } from '../interface/DrawPixelResponse.interface'; import { PixelInfo } from '../interface/PixelInfo.interface'; import { CanvasHistory } from './entity/canvasHistory.entity'; import { CanvasStrategyFactory } from './strategy/createFactory.factory'; -import { historyQueue } from '../queues/bullmq.queue'; +import { historyQueue, updateQueue } from '../queues/bullmq.queue'; @Injectable() export class CanvasService { @@ -52,9 +52,15 @@ export class CanvasService { const pipeline = this.redisClient.pipeline(); pipeline.hset(hashKey, field, pixelData); pipeline.expire(hashKey, 3 * 24 * 60 * 60); - pipeline.sadd(`dirty_pixels:${canvas_id}`, field); - + // pipeline.sadd(`dirty_pixels:${canvas_id}`, field); await pipeline.exec(); + await updateQueue.add('pixel-update', { + canvasId: Number(canvas_id), + x, + y, + color, + owner: userId, + }); return true; } catch (error) { console.error('픽셀 저장 실패:', error); diff --git a/src/queues/bullmq.queue.ts b/src/queues/bullmq.queue.ts index bee93dc..9827a1b 100644 --- a/src/queues/bullmq.queue.ts +++ b/src/queues/bullmq.queue.ts @@ -39,4 +39,16 @@ const alarmQueue = new Queue('canvas-alarm', { }, }); -export { pixelQueue, historyQueue, alarmQueue }; +const updateQueue = new Queue('pixel-update', { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: true, + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000, + }, + }, +}); + +export { pixelQueue, historyQueue, alarmQueue, updateQueue }; diff --git a/src/queues/bullmq.worker.ts b/src/queues/bullmq.worker.ts index be29527..85ccb82 100644 --- a/src/queues/bullmq.worker.ts +++ b/src/queues/bullmq.worker.ts @@ -7,6 +7,7 @@ import Redis from 'ioredis'; import { Chat } from '../group/entity/chat.entity'; import { PixelInfo } from '../interface/PixelInfo.interface'; import './history.worker'; +import './pixelUpdate.worker'; config(); @@ -303,17 +304,17 @@ const forceFlushInterval = setInterval(async () => { // Redis 이벤트 리스너 설정 async function setupRedisEventListeners() { // 픽셀 변경 이벤트 구독 (기본 Redis 사용) - const pixelSubscriber = new Redis(redisConnection); - await pixelSubscriber.subscribe('pixel:updated'); - - pixelSubscriber.on('message', async (channel, message) => { - try { - const { canvasId, x, y, color, owner } = JSON.parse(message); - await addPixelToBatch(canvasId, x, y, color, owner); - } catch (error) { - console.error('[Worker] 픽셀 이벤트 처리 에러:', error); - } - }); + // const pixelSubscriber = new Redis(redisConnection); + // await pixelSubscriber.subscribe('pixel:updated'); + + // pixelSubscriber.on('message', async (channel, message) => { + // try { + // const { canvasId, x, y, color, owner } = JSON.parse(message); + // await addPixelToBatch(canvasId, x, y, color, owner); + // } catch (error) { + // console.error('[Worker] 픽셀 이벤트 처리 에러:', error); + // } + // }); // 채팅 메시지 이벤트 구독 (기본 Redis 사용) const chatSubscriber = new Redis(redisConnection); diff --git a/src/queues/pixelUpdate.worker.ts b/src/queues/pixelUpdate.worker.ts new file mode 100644 index 0000000..3a4915f --- /dev/null +++ b/src/queues/pixelUpdate.worker.ts @@ -0,0 +1,183 @@ +import { Job, Worker } from 'bullmq'; +import { AppDataSource } from '../data-source'; +import { redisConnection } from './bullmq.config'; + +const PIXEL_BATCH_SIZE = 200; +const CHUNK_SIZE = 100; // 대량 업데이트 시 청크 처리 +const BATCH_TIMEOUT_MS = 1000; + +interface PixelUpdate { + canvasId: number; + x: number; + y: number; + color: string; + owner: number | null; +} + +// 데이터 저장 구조 +const pixelUpdateMap = new Map(); + +// BullMQ 워커 설정 +const updateWorker = new Worker( + 'pixel-update', + async (job: Job) => { + const data: PixelUpdate = job.data as PixelUpdate; + + // 픽셀 데이터 큐에 추가 (중복 자동 제거) + const key = `${data.canvasId}:${data.x}:${data.y}`; + pixelUpdateMap.set(key, data); + + // 배치 크기 도달 시 즉시 flush + if (pixelUpdateMap.size >= PIXEL_BATCH_SIZE) { + await flushPixelToDb(); + } + }, + { + concurrency: 4, + connection: { + ...redisConnection, + commandTimeout: 30000, + connectTimeout: 30000, + }, + removeOnComplete: { count: 100 }, + removeOnFail: { count: 50 }, + } +); + +// 개선된 flushPixelToDb 함수 +async function flushPixelToDb() { + if (pixelUpdateMap.size === 0) return; + + // 현재 맵의 스냅샷 생성 (안전한 처리를 위해) + const pixelsToProcess = Array.from(pixelUpdateMap.values()); + pixelUpdateMap.clear(); + + // 캔버스별로 그룹화 + const canvasGroups = new Map(); + for (const pixel of pixelsToProcess) { + const canvasId = pixel.canvasId; + if (!canvasGroups.has(canvasId)) { + canvasGroups.set(canvasId, []); + } + canvasGroups.get(canvasId)!.push(pixel); + } + + const now = new Date(); + + // 각 캔버스별로 처리 + for (const [canvasId, pixels] of canvasGroups.entries()) { + // 청크 단위로 나누어 처리 (대량 데이터 처리 시 필요) + for (let i = 0; i < pixels.length; i += CHUNK_SIZE) { + const chunk = pixels.slice(i, i + CHUNK_SIZE); + + const queryRunner = AppDataSource.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + + try { + // SQL 쿼리 파라미터 준비 + const params: any[] = []; + let paramIndex = 1; + + // 픽셀 좌표 조건 생성 + const coordConditions = chunk + .map((p) => { + const condition = `(x = $${paramIndex} AND y = $${paramIndex + 1})`; + params.push(p.x, p.y); + paramIndex += 2; + return condition; + }) + .join(' OR '); + + // 색상 및 소유자 업데이트를 위한 CASE 문 + const colorCases = chunk + .map((p) => { + const caseStr = `WHEN x = $${params.indexOf(p.x) + 1} AND y = $${params.indexOf(p.y) + 1} THEN $${paramIndex}`; + params.push(p.color); + paramIndex++; + return caseStr; + }) + .join('\n'); + + const ownerCases = chunk + .map((p) => { + const caseStr = `WHEN x = $${params.indexOf(p.x) + 1} AND y = $${params.indexOf(p.y) + 1} THEN $${paramIndex}`; + params.push(p.owner === null ? null : Number(p.owner)); + paramIndex++; + return caseStr; + }) + .join('\n'); + + // 최종 쿼리 + const query = ` + UPDATE pixels + SET + color = CASE + ${colorCases} + ELSE color + END, + owner = CASE + ${ownerCases} + ELSE owner + END, + updated_at = $${paramIndex} + WHERE canvas_id = $${paramIndex + 1} + AND (${coordConditions}) + `; + + // 추가 파라미터 + params.push(now, canvasId); + + // 쿼리 실행 + await queryRunner.query(query, params); + await queryRunner.commitTransaction(); + + // console.log( + // `[PixelUpdate] 청크 처리 완료: canvasId=${canvasId}, ${chunk.length}개 픽셀` + // ); + } catch (error) { + await queryRunner.rollbackTransaction(); + console.error( + `[PixelUpdate] 청크 처리 실패: canvasId=${canvasId}`, + error + ); + } finally { + await queryRunner.release(); + } + } + } +} + +// 주기적 flush 설정 +setInterval(() => { + if (pixelUpdateMap.size > 0) { + flushPixelToDb(); + } +}, BATCH_TIMEOUT_MS); + +// 메모리 모니터링 +setInterval(() => { + const memUsage = process.memoryUsage(); + console.log( + `[PixelUpdate] 메모리: ${Math.round(memUsage.heapUsed / 1024 / 1024)}MB, 큐 크기: ${pixelUpdateMap.size}픽셀` + ); +}, 60000); + +// 이벤트 리스너 추가 +// updateWorker.on('completed', (job) => { +// console.log(`[PixelUpdate] 작업 완료: ${job.id}`); +// }); + +updateWorker.on('failed', (job, err) => { + console.error(`[PixelUpdate] 작업 실패: ${job?.id}`, err); +}); + +updateWorker.on('error', (err) => { + console.error('[PixelUpdate] 워커 에러:', err); +}); + +updateWorker.on('stalled', (jobId) => { + console.warn(`[PixelUpdate] 작업 지연: ${jobId}`); +}); + +export { updateWorker }; From a4015abd351d70bb504b2bcafd43934a63285c05 Mon Sep 17 00:00:00 2001 From: usingnameing Date: Mon, 21 Jul 2025 16:06:11 +0900 Subject: [PATCH 2/5] =?UTF-8?q?=EC=A3=BC=EC=84=9D=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/canvas/broadcast.service.ts | 81 --------------------------------- 1 file changed, 81 deletions(-) diff --git a/src/canvas/broadcast.service.ts b/src/canvas/broadcast.service.ts index c3bd032..1620a4c 100644 --- a/src/canvas/broadcast.service.ts +++ b/src/canvas/broadcast.service.ts @@ -1,84 +1,3 @@ -// import { Injectable } from '@nestjs/common'; -// import { Server } from 'socket.io'; -// import { waitForSocketServer } from '../socket/socket.manager'; - -// interface PixelUpdate { -// canvas_id: string; -// x: number; -// y: number; -// color: string; -// // owner: number; -// } - -// @Injectable() -// export class BroadcastService { -// private pixelBatchQueue = new Map(); -// private batchTimeouts = new Map(); -// private readonly BATCH_SIZE = 30; -// private readonly BATCH_TIMEOUT_MS = 16.67; // 60fps (1000ms / 60) -// private server: Server | null = null; - -// private async getServer(): Promise { -// if (this.server) return this.server; -// this.server = await waitForSocketServer(); // 🔐 안전 -// return this.server; -// } - -// addPixelToBatch(pixel: PixelUpdate, immediate: boolean = false) { -// const { canvas_id } = pixel; - -// // 즉시 브로드캐스트 요청이면 바로 처리 -// if (immediate) { -// const io = await this.getServer(); -// io.to(`canvas_${canvas_id}`).emit('pixel_update', { -// pixels: [pixel], -// }); -// return; -// } - -// if (!this.pixelBatchQueue.has(canvas_id)) { -// this.pixelBatchQueue.set(canvas_id, []); -// } - -// this.pixelBatchQueue.get(canvas_id)!.push(pixel); - -// // 배치 크기 도달 시 즉시 flush -// if (this.pixelBatchQueue.get(canvas_id)!.length >= this.BATCH_SIZE) { -// this.flushBatch(canvas_id); -// } else { -// this.scheduleBatchFlush(canvas_id); -// } -// } - -// private scheduleBatchFlush(canvas_id: string) { -// if (this.batchTimeouts.has(canvas_id)) return; - -// const timeout = setTimeout(() => { -// this.flushBatch(canvas_id); -// this.batchTimeouts.delete(canvas_id); -// }, this.BATCH_TIMEOUT_MS); - -// this.batchTimeouts.set(canvas_id, timeout); -// } - -// private async flushBatch(canvas_id: string) { -// const io = await this.getServer(); - -// const pixels = this.pixelBatchQueue.get(canvas_id); -// if (!pixels || pixels.length === 0) return; - -// // 배치로 전송 -// io.to(`canvas_${canvas_id}`).emit('pixel_update', { pixels: pixels }); -// this.pixelBatchQueue.set(canvas_id, []); -// } - -// private flushAllBatches() { -// for (const canvas_id of this.pixelBatchQueue.keys()) { -// this.flushBatch(canvas_id); -// } -// } -// } - import { Injectable, OnModuleInit } from '@nestjs/common'; import { Server } from 'socket.io'; import { waitForSocketServer } from '../socket/socket.manager'; From 96b38a2a08f177929e06c8818d3fa87e0e6076af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=B1=EC=A7=80=EC=9B=90?= Date: Mon, 21 Jul 2025 19:38:40 +0900 Subject: [PATCH 3/5] =?UTF-8?q?SCRUM-253=20:=20=20=EA=B2=8C=EC=9E=84=20?= =?UTF-8?q?=EC=BA=94=EB=B2=84=EC=8A=A4=20=ED=9E=88=EC=8A=A4=ED=86=A0?= =?UTF-8?q?=EB=A6=AC=20=EC=83=9D=EC=84=B1=20=EB=B0=8F=20=ED=86=B5=EA=B3=84?= =?UTF-8?q?=20=EC=A0=95=EC=83=81=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/canvas/canvas.service.ts | 6 +- src/game/game-flush.service.ts | 163 --------------------------------- src/game/game-logic.service.ts | 59 +++++++++--- src/game/game-pixel.service.ts | 1 - src/game/game.module.ts | 3 - src/queues/history.worker.ts | 4 +- 6 files changed, 52 insertions(+), 184 deletions(-) delete mode 100644 src/game/game-flush.service.ts diff --git a/src/canvas/canvas.service.ts b/src/canvas/canvas.service.ts index a019bc7..857336e 100644 --- a/src/canvas/canvas.service.ts +++ b/src/canvas/canvas.service.ts @@ -632,9 +632,12 @@ export class CanvasService { if (isNaN(id)) return false; const now = new Date(); const result = await this.canvasRepository.update(id, { endedAt: now }); - // ended_at을 now로 바꾼 후, 5초 delay로 historyQueue에 잡 등록 + // ended_at을 now로 바꾼 후, 다시 읽기 const canvas = await this.canvasRepository.findOneBy({ id }); if (canvas) { + // 기존 잡 삭제 + await historyQueue.remove(`history-${id}`); + // 새 잡 등록 await historyQueue.add( 'canvas-history', { @@ -642,6 +645,7 @@ export class CanvasService { size_x: canvas.sizeX, size_y: canvas.sizeY, type: canvas.type, + endedAt: canvas.endedAt, // 최신 endedAt 사용 }, { jobId: `history-${id}`, delay: 5000 } ); diff --git a/src/game/game-flush.service.ts b/src/game/game-flush.service.ts deleted file mode 100644 index b61709b..0000000 --- a/src/game/game-flush.service.ts +++ /dev/null @@ -1,163 +0,0 @@ -// Redis에 임시로 저장된 "dirty" 데이터(변경된 픽셀, 유저 상태)를 -// 주기적으로 DB에 일괄 반영(Flush) 하는 역할. -// 1초마다 또는 10개 이상 변경 시 batch로 DB update. - -import { Injectable, Inject } from '@nestjs/common'; -import Redis from 'ioredis'; -import { DataSource } from 'typeorm'; - -@Injectable() -export class GameFlushService { - constructor( - @Inject('REDIS_CLIENT') private readonly redis: Redis, - @Inject('DATA_SOURCE') private readonly dataSource: DataSource - ) {} - - // 픽셀 dirty set에 추가 - async addDirtyPixel(canvasId: string, x: number, y: number) { - await this.redis.sadd(`dirty_pixels:${canvasId}`, `${x}:${y}`); - } - - // 유저 dirty set에 추가 - async addDirtyUser(canvasId: string, userId: string) { - await this.redis.sadd(`dirty_users:${canvasId}`, userId); - await this.redis.expire(`dirty_users:${canvasId}`, 3600); - } - - // 픽셀 batch flush - async flushDirtyPixels(canvasId: string) { - if (!this.dataSource.isInitialized) { - console.warn( - '[GameFlushService] DataSource not initialized. Re-initializing...' - ); - await this.dataSource.initialize(); - } - const dirtySetKey = `dirty_pixels:${canvasId}`; - const fields = await this.redis.smembers(dirtySetKey); - if (fields.length === 0) return; - const hashKey = `canvas:${canvasId}`; - const pipeline = this.redis.pipeline(); - for (const field of fields) { - pipeline.hget(hashKey, field); - } - const results = await pipeline.exec(); - // DB 일괄 update - if (!results) return; - for (let i = 0; i < fields.length; i++) { - const [x, y] = fields[i].split(':').map(Number); - const redisResult = results[i]; - if (!redisResult || typeof redisResult[1] !== 'string') continue; - const value = redisResult[1]; - if (value) { - const [color, owner] = value.split('|'); - await this.dataSource.query( - 'UPDATE pixels SET color=$1, owner=$2 WHERE canvas_id=$3 AND x=$4 AND y=$5', - [color, owner || null, canvasId, x, y] - ); - } - } - await this.redis.del(dirtySetKey); - } - - // 유저 batch flush - async flushDirtyUsers(canvasId: string) { - if (!this.dataSource.isInitialized) { - console.warn( - '[GameFlushService] DataSource not initialized. Re-initializing...' - ); - await this.dataSource.initialize(); - } - const dirtySetKey = `dirty_users:${canvasId}`; - await this.redis.expire(dirtySetKey, 3600); - const userIds = await this.redis.smembers(dirtySetKey); - if (userIds.length === 0) { - return; - } - - for (const userId of userIds) { - // 사용자 ID 유효성 검증 - if (!userId || userId === '0' || isNaN(Number(userId))) { - console.warn( - `[GameFlushService] 유효하지 않은 사용자 ID 감지: userId=${userId}, canvasId=${canvasId}` - ); - continue; - } - - // 사용자가 실제로 users 테이블에 존재하는지 확인 - try { - const userExists = await this.dataSource.query( - 'SELECT id FROM users WHERE id = $1', - [userId] - ); - - if (userExists.length === 0) { - console.warn( - `[GameFlushService] 존재하지 않는 사용자 ID 감지: userId=${userId}, canvasId=${canvasId}` - ); - continue; - } - } catch (error) { - console.error( - `[GameFlushService] 사용자 존재 여부 확인 중 에러: userId=${userId}, canvasId=${canvasId}`, - error - ); - continue; - } - - // own_count, try_count, dead, life 등 Redis에서 조회 - const [ownCount, tryCount, dead, life] = await Promise.all([ - this.redis.hget(`game:${canvasId}:user:${userId}`, 'own_count'), - this.redis.hget(`game:${canvasId}:user:${userId}`, 'try_count'), - this.redis.hget(`game:${canvasId}:user:${userId}`, 'dead'), - this.redis.hget(`game:${canvasId}:user:${userId}`, 'life'), - ]); - - // user_canvas 테이블에 UPSERT - await this.dataSource.query( - `INSERT INTO user_canvas (user_id, canvas_id, own_count, try_count, joined_at) - VALUES ($1, $2, $3, $4, NOW()) - ON CONFLICT (user_id, canvas_id) - DO UPDATE SET own_count = $3, try_count = $4`, - [userId, canvasId, ownCount || 0, tryCount || 0] - ); - - // game_user_result에도 life 반영 (있을 경우) - await this.dataSource.query( - 'UPDATE game_user_result SET life=$1 WHERE canvas_id=$2 AND user_id=$3', - [life || 2, canvasId, userId] - ); - } - await this.redis.del(dirtySetKey); - } - - // 1초마다 또는 batch 10개마다 flush - async flushLoop(canvasId: string) { - setInterval(async () => { - const pixelCount = await this.redis.scard(`dirty_pixels:${canvasId}`); - const userCount = await this.redis.scard(`dirty_users:${canvasId}`); - - // 게임에서는 더 자주 flush (1개 이상이면 flush) - if (pixelCount >= 10) { - await this.flushDirtyPixels(canvasId); - } - if (userCount >= 10) { - await this.flushDirtyUsers(canvasId); - } - - // 30초마다 강제 flush (안전장치) - const now = Date.now(); - const lastForceFlush = await this.redis.get( - `last_force_flush:${canvasId}` - ); - if (!lastForceFlush || now - parseInt(lastForceFlush) > 30000) { - await this.flushDirtyPixels(canvasId); - await this.flushDirtyUsers(canvasId); - await this.redis.setex( - `last_force_flush:${canvasId}`, - 60, - now.toString() - ); - } - }, 1000); - } -} diff --git a/src/game/game-logic.service.ts b/src/game/game-logic.service.ts index 860d887..429b1f9 100644 --- a/src/game/game-logic.service.ts +++ b/src/game/game-logic.service.ts @@ -10,7 +10,6 @@ import { Server, Socket } from 'socket.io'; import { CanvasService } from '../canvas/canvas.service'; import { GameStateService } from './game-state.service'; import { GamePixelService } from './game-pixel.service'; -import { GameFlushService } from './game-flush.service'; import Redis from 'ioredis'; import { DataSource } from 'typeorm'; @@ -21,7 +20,6 @@ export class GameLogicService { private readonly canvasService: CanvasService, private readonly gameStateService: GameStateService, private readonly gamePixelService: GamePixelService, - private readonly gameFlushService: GameFlushService, @Inject('REDIS_CLIENT') private readonly redis: Redis, // Redis 인스턴스 주입 private readonly dataSource: DataSource // DataSource 주입 ) {} @@ -90,10 +88,8 @@ export class GameLogicService { data.canvas_id, pixel.owner ); - await this.gameFlushService.addDirtyUser(data.canvas_id, pixel.owner); } await this.gameStateService.incrUserOwnCount(data.canvas_id, userId); - await this.gameFlushService.addDirtyUser(data.canvas_id, userId); await this.canvasService.applyDrawPixel({ canvas_id: data.canvas_id, x: data.x, @@ -101,6 +97,7 @@ export class GameLogicService { color: data.color, userId: Number(userId), }); + // 픽셀 변경 시 updateQueue.add('pixel-update', {...})로 바로 큐에 추가 server.to(`canvas_${data.canvas_id}`).emit('game_pixel_update', { x: data.x, y: data.y, @@ -127,7 +124,7 @@ export class GameLogicService { data.canvas_id, userId ); - await this.gameFlushService.addDirtyUser(data.canvas_id, userId); + if (life <= 0) { // 사망 처리: 픽셀 자유화, dead_user 브로드캐스트 const freedPixels = await this.gamePixelService.freeAllPixelsOfUser( @@ -138,7 +135,6 @@ export class GameLogicService { await this.gameStateService.addDeadUser(data.canvas_id, userId); // own_count 0으로 강제 세팅 await this.gameStateService.setUserOwnCount(data.canvas_id, userId, 0); - await this.gameFlushService.addDirtyUser(data.canvas_id, userId); server.to(`canvas_${data.canvas_id}`).emit('dead_user', { username: await this.getUserNameById(userId), pixels: freedPixels.map((p) => ({ @@ -245,7 +241,7 @@ export class GameLogicService { const flushKey = `flush_started:${canvasId}`; const isFlushStarted = await this.redis.get(flushKey); if (!isFlushStarted) { - await this.gameFlushService.flushLoop(canvasId); + // await this.gameFlushService.flushLoop(canvasId); await this.redis.setex(flushKey, 3600, '1'); // 1시간 동안 유지 } } @@ -314,13 +310,7 @@ export class GameLogicService { // 캔버스 종료(시간 만료) 시 강제 게임 종료 및 결과 브로드캐스트 async forceGameEnd(canvasId: string, server?: any) { try { - // 게임 종료 직전 모든 유저 상태 DB 반영 (life 등) - if ( - this.gameFlushService && - typeof this.gameFlushService.flushDirtyUsers === 'function' - ) { - await this.gameFlushService.flushDirtyUsers(canvasId); - } + // dirty set 구조 제거로 인해 별도 flush 불필요, 바로 다음 로직 실행 // 랭킹 계산 시간 측정 시작 const rankingStart = Date.now(); // 모든 유저, 사망자 목록 조회 @@ -354,7 +344,46 @@ export class GameLogicService { const rankingEnd = Date.now(); // 게임 결과를 DB에 저장 (rank만 업데이트, userId 직접 사용) await this.updateGameResultsByUserId(canvasId, ranked); - // canvas-history 잡 추가 완전 제거 (alarm.worker/배치에서만 관리) + // 게임 종료 시점에 user_canvas 테이블에 유저별 통계 upsert + for (const user of userStats) { + try { + await this.dataSource.query( + `INSERT INTO user_canvas (user_id, canvas_id, own_count, try_count, joined_at) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (user_id, canvas_id) + DO UPDATE SET own_count = $3, try_count = $4`, + [user.userId, canvasId, user.own_count || 0, user.try_count || 0] + ); + } catch (err) { + console.error('[forceGameEnd] user_canvas upsert 실패:', user.userId, err); + } + } + // 게임 종료 시 canvas-history 잡 추가 (히스토리 워커) + try { + const { historyQueue } = await import('../queues/bullmq.queue'); + // 캔버스 정보 조회 (크기 등 필요시) + const canvasInfo = await this.canvasService.getCanvasById(canvasId); + const meta = canvasInfo?.metaData; + await historyQueue.add( + 'canvas-history', + { + canvas_id: canvasId, + size_x: Number(meta?.sizeX), + size_y: Number(meta?.sizeY), + type: meta?.type, + startedAt: meta?.startedAt, + endedAt: meta?.endedAt, + created_at: meta?.createdAt, + updated_at: new Date(), + }, + { jobId: `history-${canvasId}`, delay: 5000 } + ); + } catch (e) { + console.error( + `[GameLogicService] forceGameEnd: 워커 큐에 canvas-history 잡 추가 실패: canvasId=${canvasId}`, + e + ); + } // 결과 브로드캐스트 if (server) { server diff --git a/src/game/game-pixel.service.ts b/src/game/game-pixel.service.ts index 07c0c0b..f3cc3fc 100644 --- a/src/game/game-pixel.service.ts +++ b/src/game/game-pixel.service.ts @@ -55,7 +55,6 @@ export class GamePixelService { const pixelData = `#000000|`; // owner 없음 pipeline.hset(hashKey, field, pixelData); - pipeline.sadd(`dirty_pixels:${canvasId}`, field); freedPixels.push({ x: pixel.x, y: pixel.y, color: '#000000' }); } diff --git a/src/game/game.module.ts b/src/game/game.module.ts index e29f3fe..9a8712e 100644 --- a/src/game/game.module.ts +++ b/src/game/game.module.ts @@ -2,7 +2,6 @@ import { Module, forwardRef } from '@nestjs/common'; import { GameLogicService } from './game-logic.service'; import { GameStateService } from './game-state.service'; import { GamePixelService } from './game-pixel.service'; -import { GameFlushService } from './game-flush.service'; import { GameController } from './game.controller'; import { GameService } from './game.service'; import { TypeOrmModule } from '@nestjs/typeorm'; @@ -24,7 +23,6 @@ import { DatabaseModule } from '../database/database.module'; GameLogicService, GameStateService, GamePixelService, - GameFlushService, GameService, ], controllers: [GameController], @@ -32,7 +30,6 @@ import { DatabaseModule } from '../database/database.module'; GameLogicService, GameStateService, GamePixelService, - GameFlushService, GameService, ], }) diff --git a/src/queues/history.worker.ts b/src/queues/history.worker.ts index 4fe3c6d..3f7a1f4 100644 --- a/src/queues/history.worker.ts +++ b/src/queues/history.worker.ts @@ -14,6 +14,8 @@ const historyWorker = new Worker( 'canvas-history', async (job: Job) => { const { canvas_id, size_x, size_y, type } = job.data; + const width = Number(size_x); + const height = Number(size_y); if (!job.data) throw new Error('job.data is undefined'); @@ -22,7 +24,7 @@ const historyWorker = new Worker( 'select x, y, color from pixels where canvas_id = $1::INTEGER', [canvas_id] ); - const buffer = await generatorPixelToImg(pixelData, size_x, size_y); + const buffer = await generatorPixelToImg(pixelData, width, height); const contentType = 'image/png'; const key = `history/${canvas_id}/${randomUUID()}.png`; await uploadBufferToS3(buffer, key, contentType); From aff13d85572e4391fb1c479e6f8b8944d94eb1bd Mon Sep 17 00:00:00 2001 From: usingnameing Date: Mon, 21 Jul 2025 22:13:52 +0900 Subject: [PATCH 4/5] SCRUM-258 : history pixel data origin change --- src/queues/history.worker.ts | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/queues/history.worker.ts b/src/queues/history.worker.ts index 3f7a1f4..eae5ba2 100644 --- a/src/queues/history.worker.ts +++ b/src/queues/history.worker.ts @@ -4,6 +4,7 @@ import { AppDataSource } from '../data-source'; import { generatorPixelToImg } from '../util/imageGenerator.util'; import { uploadBufferToS3 } from '../util/s3UploadFile.util'; import { randomUUID } from 'crypto'; +import { redisClient } from './bullmq.redis'; import { redisConnection } from './bullmq.config'; import { CanvasHistory } from '../canvas/entity/canvasHistory.entity'; @@ -19,11 +20,34 @@ const historyWorker = new Worker( if (!job.data) throw new Error('job.data is undefined'); - const pixelData: { x: number; y: number; color: string }[] = - await pixelRepository.query( + const hashKey = `canvas:${canvas_id}`; + const redisPixels = await redisClient.hgetall(hashKey); + + let pixelData: Array<{ x: number; y: number; color: string }> = []; + + if (Object.keys(redisPixels).length > 0) { + for (const field in redisPixels) { + const [x, y] = field.split(':').map(Number); + const value = redisPixels[field]; + let color: string; + + if (value.includes('|')) { + // 새로운 파이프로 구분된 형태 처리 + const [colorPart] = value.split('|')[0]; + color = colorPart; + } else { + // 기존 color만 저장된 형태 처리 (하위 호환성) + color = value; + } + pixelData.push({ x, y, color }); + } + } else { + pixelData = await pixelRepository.query( 'select x, y, color from pixels where canvas_id = $1::INTEGER', [canvas_id] ); + } + const buffer = await generatorPixelToImg(pixelData, width, height); const contentType = 'image/png'; const key = `history/${canvas_id}/${randomUUID()}.png`; From f7166eeeeb3646b4510e72064d30e3eb2f5e6ef0 Mon Sep 17 00:00:00 2001 From: usingnameing Date: Tue, 22 Jul 2025 01:18:59 +0900 Subject: [PATCH 5/5] SCRUM-263 : canvas history bugfix --- src/queues/bullmq.redis.ts | 9 +++++++-- src/queues/bullmq.worker.ts | 8 +++++--- src/queues/history.worker.ts | 6 ++---- src/util/imageGenerator.util.ts | 4 +++- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/queues/bullmq.redis.ts b/src/queues/bullmq.redis.ts index 0fb7abe..8d74c1b 100644 --- a/src/queues/bullmq.redis.ts +++ b/src/queues/bullmq.redis.ts @@ -1,5 +1,10 @@ -// bullmq.redis.ts 혹은 bullmq.client.ts 등 새 파일 생성 -import { Redis } from 'ioredis'; +import Redis from 'ioredis'; import { redisConnection } from './bullmq.config'; +// 공유 Redis 클라이언트 생성 export const redisClient = new Redis(redisConnection); + +// 프로세스 종료 시 연결 정리 +process.on('exit', () => { + redisClient.disconnect(); +}); diff --git a/src/queues/bullmq.worker.ts b/src/queues/bullmq.worker.ts index 85ccb82..45deecd 100644 --- a/src/queues/bullmq.worker.ts +++ b/src/queues/bullmq.worker.ts @@ -6,6 +6,7 @@ import { Pixel } from '../pixel/entity/pixel.entity'; import Redis from 'ioredis'; import { Chat } from '../group/entity/chat.entity'; import { PixelInfo } from '../interface/PixelInfo.interface'; +import { redisClient } from './bullmq.redis'; import './history.worker'; import './pixelUpdate.worker'; @@ -391,10 +392,11 @@ process.on('SIGINT', gracefulShutdown); // 메인 워커 실행 void (async () => { try { - // console.log('[Worker] 워커 프로세스 시작...'); - redis = new Redis(redisConnection); + console.log('[Worker] 워커 프로세스 시작...'); + // redis = new Redis(redisConnection); + redis = redisClient; await redis.ping(); - // console.log('[Worker] Redis 연결 성공'); + console.log('[Worker] Redis 연결 성공'); // await AppDataSource.initialize(); await initializeDataSourceWithRetry(); // console.log('[Worker] DataSource 초기화 완료'); diff --git a/src/queues/history.worker.ts b/src/queues/history.worker.ts index eae5ba2..81d8752 100644 --- a/src/queues/history.worker.ts +++ b/src/queues/history.worker.ts @@ -20,8 +20,7 @@ const historyWorker = new Worker( if (!job.data) throw new Error('job.data is undefined'); - const hashKey = `canvas:${canvas_id}`; - const redisPixels = await redisClient.hgetall(hashKey); + const redisPixels = await redisClient.hgetall(`canvas:${canvas_id}`); let pixelData: Array<{ x: number; y: number; color: string }> = []; @@ -33,8 +32,7 @@ const historyWorker = new Worker( if (value.includes('|')) { // 새로운 파이프로 구분된 형태 처리 - const [colorPart] = value.split('|')[0]; - color = colorPart; + color = value.split('|')[0]; } else { // 기존 color만 저장된 형태 처리 (하위 호환성) color = value; diff --git a/src/util/imageGenerator.util.ts b/src/util/imageGenerator.util.ts index c9df5a7..d6ba527 100644 --- a/src/util/imageGenerator.util.ts +++ b/src/util/imageGenerator.util.ts @@ -12,7 +12,9 @@ export async function generatorPixelToImg( height: number, width: number ): Promise { - const canvas = createCanvas(width, height); + console.log('width: ', width); + console.log('height: ', height); + const canvas = createCanvas(height, width); const ctx = canvas.getContext('2d'); ctx.imageSmoothingEnabled = false;