Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/app.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ export class AppGateway
// 모든 이벤트에 대한 디버그 리스너 추가
server.on('connection', (socket) => {
console.log(`[AppGateway] 클라이언트 연결됨: ${socket.id}`);

// 모든 이벤트 로깅
// socket.onAny((eventName, ...args) => {
// console.log(
Expand Down
66 changes: 45 additions & 21 deletions src/canvas/broadcast.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Server } from 'socket.io';
import { waitForSocketServer } from '../socket/socket.manager';

Expand All @@ -7,26 +7,47 @@ interface PixelUpdate {
x: number;
y: number;
color: string;
// owner: number;
}

@Injectable()
export class BroadcastService {
export class BroadcastService implements OnModuleInit {
private pixelBatchQueue = new Map<string, PixelUpdate[]>();
private batchTimeouts = new Map<string, NodeJS.Timeout>();
private readonly BATCH_SIZE = 30;
private batchTimeout: NodeJS.Timeout | null = null;
private readonly BATCH_SIZE = 50;
private readonly BATCH_TIMEOUT_MS = 16.67; // 60fps (1000ms / 60)
private server: Server | null = null;

private async getServer(): Promise<Server> {
if (this.server) return this.server;
this.server = await waitForSocketServer(); // 🔐 안전
return this.server;
// 모듈 초기화 시 서버 인스턴스 가져오기
async onModuleInit() {
try {
this.server = await waitForSocketServer();
console.log('[BroadcastService] Socket 서버 인스턴스 획득 성공');
} catch (error) {
console.error(
'[BroadcastService] Socket 서버 인스턴스 획득 실패:',
error
);
}
}

addPixelToBatch(pixel: PixelUpdate) {
// 즉시 브로드캐스트 옵션 추가
addPixelToBatch(pixel: PixelUpdate, immediate: boolean = false) {
if (!this.server) {
console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다');
return;
}

const { canvas_id } = pixel;

// 즉시 브로드캐스트 요청이면 바로 처리
if (immediate) {
this.server.to(`canvas_${canvas_id}`).emit('pixel_update', {
pixels: [pixel],
});
return;
}

// 그 외는 기존 배치 처리 로직
if (!this.pixelBatchQueue.has(canvas_id)) {
this.pixelBatchQueue.set(canvas_id, []);
}
Expand All @@ -37,29 +58,32 @@ export class BroadcastService {
if (this.pixelBatchQueue.get(canvas_id)!.length >= this.BATCH_SIZE) {
this.flushBatch(canvas_id);
} else {
this.scheduleBatchFlush(canvas_id);
this.scheduleBatchFlush();
}
}

private scheduleBatchFlush(canvas_id: string) {
if (this.batchTimeouts.has(canvas_id)) return;
private scheduleBatchFlush() {
if (this.batchTimeout) return;

const timeout = setTimeout(() => {
this.flushBatch(canvas_id);
this.batchTimeouts.delete(canvas_id);
this.batchTimeout = setTimeout(() => {
this.flushAllBatches();
this.batchTimeout = null;
}, this.BATCH_TIMEOUT_MS);

this.batchTimeouts.set(canvas_id, timeout);
}

private async flushBatch(canvas_id: string) {
const io = await this.getServer();
private flushBatch(canvas_id: string) {
if (!this.server) {
console.error('[BroadcastService] Socket 서버 인스턴스가 없습니다');
return;
}

const pixels = this.pixelBatchQueue.get(canvas_id);
if (!pixels || pixels.length === 0) return;

// 배치로 전송
io.to(`canvas_${canvas_id}`).emit('pixel_update', { pixels: pixels });
this.server
.to(`canvas_${canvas_id}`)
.emit('pixel_update', { pixels: pixels });
this.pixelBatchQueue.set(canvas_id, []);
}

Expand Down
69 changes: 37 additions & 32 deletions src/canvas/canvas.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ export class CanvasGateway implements OnGatewayInit {
return;
}

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
await this.redis.publish(
'pixel:updated',
JSON.stringify({
canvasId: Number(pixel.canvas_id),
x: pixel.x,
y: pixel.y,
color: pixel.color,
owner: userId,
})
);
// this.broadcastService.addPixelToBatch(
// {
// canvas_id: pixel.canvas_id,
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// },
// true
// );

console.log('픽셀 그리기 성공:', pixel);
// // 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
// await this.redis.publish(
// 'pixel:updated',
// JSON.stringify({
// canvasId: Number(pixel.canvas_id),
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// owner: userId,
// })
// );

// 비동기 브로드캐스트 (응답 속도 향상)
this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', {
Expand All @@ -167,12 +175,6 @@ export class CanvasGateway implements OnGatewayInit {
},
],
});
// this.broadcastService.addPixelToBatch({
// canvas_id: pixel.canvas_id,
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// });
} catch (error) {
console.error('[Gateway] 픽셀 그리기 에러:', error);
client.emit('pixel_error', { message: '픽셀 그리기 실패' });
Expand Down Expand Up @@ -264,18 +266,28 @@ export class CanvasGateway implements OnGatewayInit {
return;
}

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
await this.redis.publish(
'pixel:updated',
JSON.stringify({
canvasId: Number(pixel.canvas_id),
this.broadcastService.addPixelToBatch(
{
canvas_id: pixel.canvas_id,
x: pixel.x,
y: pixel.y,
color: pixel.color,
owner: userId,
})
},
false
);

// 워커로 픽셀 이벤트 발행 (DB 저장을 위해)
// await this.redis.publish(
// 'pixel:updated',
// JSON.stringify({
// canvasId: Number(pixel.canvas_id),
// x: pixel.x,
// y: pixel.y,
// color: pixel.color,
// owner: userId,
// })
// );

// 비동기 브로드캐스트 (응답 속도 향상)
// setImmediate(() => {
// this.server.to(`canvas_${pixel.canvas_id}`).emit('pixel_update', {
Expand All @@ -284,13 +296,6 @@ export class CanvasGateway implements OnGatewayInit {
// color: pixel.color,
// });
// });

this.broadcastService.addPixelToBatch({
canvas_id: pixel.canvas_id,
x: pixel.x,
y: pixel.y,
color: pixel.color,
});
} catch (error) {
console.error('[Gateway] 픽셀 그리기 에러:', error);
}
Expand Down
18 changes: 14 additions & 4 deletions src/canvas/canvas.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { DrawPixelResponse } from '../interface/DrawPixelResponse.interface';
import { PixelInfo } from '../interface/PixelInfo.interface';
import { CanvasHistory } from './entity/canvasHistory.entity';
import { CanvasStrategyFactory } from './strategy/createFactory.factory';
import { historyQueue } from '../queues/bullmq.queue';
import { historyQueue, updateQueue } from '../queues/bullmq.queue';

@Injectable()
export class CanvasService {
Expand Down Expand Up @@ -52,9 +52,15 @@ export class CanvasService {
const pipeline = this.redisClient.pipeline();
pipeline.hset(hashKey, field, pixelData);
pipeline.expire(hashKey, 3 * 24 * 60 * 60);
pipeline.sadd(`dirty_pixels:${canvas_id}`, field);

// pipeline.sadd(`dirty_pixels:${canvas_id}`, field);
await pipeline.exec();
await updateQueue.add('pixel-update', {
canvasId: Number(canvas_id),
x,
y,
color,
owner: userId,
});
return true;
} catch (error) {
console.error('픽셀 저장 실패:', error);
Expand Down Expand Up @@ -626,16 +632,20 @@ export class CanvasService {
if (isNaN(id)) return false;
const now = new Date();
const result = await this.canvasRepository.update(id, { endedAt: now });
// ended_at을 now로 바꾼 후, 5초 delay로 historyQueue에 잡 등록
// ended_at을 now로 바꾼 후, 다시 읽기
const canvas = await this.canvasRepository.findOneBy({ id });
if (canvas) {
// 기존 잡 삭제
await historyQueue.remove(`history-${id}`);
// 새 잡 등록
await historyQueue.add(
'canvas-history',
{
canvas_id: id,
size_x: canvas.sizeX,
size_y: canvas.sizeY,
type: canvas.type,
endedAt: canvas.endedAt, // 최신 endedAt 사용
},
{ jobId: `history-${id}`, delay: 5000 }
);
Expand Down
Loading
Loading