Skip to content
Closed
1 change: 0 additions & 1 deletion src/app.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ export class AppGateway
// 모든 이벤트에 대한 디버그 리스너 추가
server.on('connection', (socket) => {
console.log(`[AppGateway] 클라이언트 연결됨: ${socket.id}`);

// 모든 이벤트 로깅
// socket.onAny((eventName, ...args) => {
// console.log(
Expand Down
66 changes: 45 additions & 21 deletions src/canvas/broadcast.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Server } from 'socket.io';
import { waitForSocketServer } from '../socket/socket.manager';

Expand All @@ -7,26 +7,47 @@ interface PixelUpdate {
x: number;
y: number;
color: string;
// owner: number;
}

@Injectable()
export class BroadcastService {
export class BroadcastService implements OnModuleInit {
private pixelBatchQueue = new Map<string, PixelUpdate[]>();
private batchTimeouts = new Map<string, NodeJS.Timeout>();
private readonly BATCH_SIZE = 30;
private batchTimeout: NodeJS.Timeout | null = null;
private readonly BATCH_SIZE = 50;
private readonly BATCH_TIMEOUT_MS = 16.67; // 60fps (1000ms / 60)
private server: Server | null = null;

private async getServer(): Promise<Server> {
if (this.server) return this.server;
this.server = await waitForSocketServer(); // 🔐 안전
return this.server;
// 모듈 초기화 시 서버 인스턴스 가져오기
async onModuleInit() {
try {
this.server = await waitForSocketServer();
console.log('[BroadcastService] Socket 서버 인스턴스 획득 성공');
} catch (error) {
console.error(
'[BroadcastService] Socket 서버 인스턴스 획득 실패:',
error
);
}
}

addPixelToBatch(pixel: PixelUpdate) {
// 즉시 브로드캐스트 옵션 추가
addPixelToBatch(pixel: PixelUpdate, immediate: boolean = false) {
if (!this.server) {
console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다');
return;
}

const { canvas_id } = pixel;

// 즉시 브로드캐스트 요청이면 바로 처리
if (immediate) {
this.server.to(`canvas_${canvas_id}`).emit('pixel_update', {
pixels: [pixel],
});
return;
}

// 그 외는 기존 배치 처리 로직
if (!this.pixelBatchQueue.has(canvas_id)) {
this.pixelBatchQueue.set(canvas_id, []);
}
Expand All @@ -37,29 +58,32 @@ export class BroadcastService {
if (this.pixelBatchQueue.get(canvas_id)!.length >= this.BATCH_SIZE) {
this.flushBatch(canvas_id);
} else {
this.scheduleBatchFlush(canvas_id);
this.scheduleBatchFlush();
}
}

private scheduleBatchFlush(canvas_id: string) {
if (this.batchTimeouts.has(canvas_id)) return;
private scheduleBatchFlush() {
if (this.batchTimeout) return;

const timeout = setTimeout(() => {
this.flushBatch(canvas_id);
this.batchTimeouts.delete(canvas_id);
this.batchTimeout = setTimeout(() => {
this.flushAllBatches();
this.batchTimeout = null;
}, this.BATCH_TIMEOUT_MS);

this.batchTimeouts.set(canvas_id, timeout);
}

private async flushBatch(canvas_id: string) {
const io = await this.getServer();
private flushBatch(canvas_id: string) {
if (!this.server) {
console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다');
return;
}

const pixels = this.pixelBatchQueue.get(canvas_id);
if (!pixels || pixels.length === 0) return;

// 배치로 전송
io.to(`canvas_${canvas_id}`).emit('pixel_update', { pixels: pixels });
this.server
.to(`canvas_${canvas_id}`)
.emit('pixel_update', { pixels: pixels });
this.pixelBatchQueue.set(canvas_id, []);
}

Expand Down
69 changes: 37 additions & 32 deletions src/canvas/canvas.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ export class CanvasGateway implements OnGatewayInit {
return;
}

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
await this.redis.publish(
'pixel:updated',
JSON.stringify({
canvasId: Number(pixel.canvas_id),
x: pixel.x,
y: pixel.y,
color: pixel.color,
owner: userId,
})
);
// this.broadcastService.addPixelToBatch(
// {
// canvas_id: pixel.canvas_id,
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// },
// true
// );

console.log('픽셀 그리기 성공:', pixel);
// // 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
// await this.redis.publish(
// 'pixel:updated',
// JSON.stringify({
// canvasId: Number(pixel.canvas_id),
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// owner: userId,
// })
// );

// 비동기 브로드캐스트 (응답 속도 향상)
this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', {
Expand All @@ -167,12 +175,6 @@ export class CanvasGateway implements OnGatewayInit {
},
],
});
// this.broadcastService.addPixelToBatch({
// canvas_id: pixel.canvas_id,
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// });
} catch (error) {
console.error('[Gateway] 픽셀 그리기 에러:', error);
client.emit('pixel_error', { message: '픽셀 그리기 실패' });
Expand Down Expand Up @@ -264,18 +266,28 @@ export class CanvasGateway implements OnGatewayInit {
return;
}

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
await this.redis.publish(
'pixel:updated',
JSON.stringify({
canvasId: Number(pixel.canvas_id),
this.broadcastService.addPixelToBatch(
{
canvas_id: pixel.canvas_id,
x: pixel.x,
y: pixel.y,
color: pixel.color,
owner: userId,
})
},
false
);

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
// await this.redis.publish(
// 'pixel:updated',
// JSON.stringify({
// canvasId: Number(pixel.canvas_id),
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// owner: userId,
// })
// );

// 비동기 브로드캐스트 (응답 속도 향상)
// setImmediate(() => {
// this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', {
Expand All @@ -284,13 +296,6 @@ export class CanvasGateway implements OnGatewayInit {
// color: pixel.color,
// });
// });

this.broadcastService.addPixelToBatch({
canvas_id: pixel.canvas_id,
x: pixel.x,
y: pixel.y,
color: pixel.color,
});
} catch (error) {
console.error('[Gateway] 픽셀 그리기 에러:', error);
}
Expand Down
18 changes: 14 additions & 4 deletions src/canvas/canvas.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { DrawPixelResponse } from '../interface/DrawPixelResponse.interface';
import { PixelInfo } from '../interface/PixelInfo.interface';
import { CanvasHistory } from './entity/canvasHistory.entity';
import { CanvasStrategyFactory } from './strategy/createFactory.factory';
import { historyQueue } from '../queues/bullmq.queue';
import { historyQueue, updateQueue } from '../queues/bullmq.queue';

@Injectable()
export class CanvasService {
Expand Down Expand Up @@ -52,9 +52,15 @@ export class CanvasService {
const pipeline = this.redisClient.pipeline();
pipeline.hset(hashKey, field, pixelData);
pipeline.expire(hashKey, 3 * 24 * 60 * 60);
pipeline.sadd(`dirty_pixels:${canvas_id}`, field);

// pipeline.sadd(`dirty_pixels:${canvas_id}`, field);
await pipeline.exec();
await updateQueue.add('pixel-update', {
canvasId: Number(canvas_id),
x,
y,
color,
owner: userId,
});
return true;
} catch (error) {
console.error('픽셀 저장 실패:', error);
Expand Down Expand Up @@ -626,16 +632,20 @@ export class CanvasService {
if (isNaN(id)) return false;
const now = new Date();
const result = await this.canvasRepository.update(id, { endedAt: now });
// ended_at을 now로 바꾼 후, 5초 delay로 historyQueue에 잡 등록
// ended_at을 now로 바꾼 후, 다시 읽기
const canvas = await this.canvasRepository.findOneBy({ id });
if (canvas) {
// 기존 잡 삭제
await historyQueue.remove(`history-${id}`);
// 새 잡 등록
await historyQueue.add(
'canvas-history',
{
canvas_id: id,
size_x: canvas.sizeX,
size_y: canvas.sizeY,
type: canvas.type,
endedAt: canvas.endedAt, // 최신 endedAt 사용
},
{ jobId: `history-${id}`, delay: 5000 }
);
Expand Down
Loading
Loading