From 58398109265d5cbfad681f4a36659720fdeda2fc Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:33:36 +0900 Subject: [PATCH 1/7] =?UTF-8?q?refactor:=20snake=20case=20->=20camel=20cas?= =?UTF-8?q?e=EB=A1=9C=20=EB=B3=80=EA=B2=BD=20(Spring=20=EC=84=9C=EB=B2=84?= =?UTF-8?q?=EC=99=80=EC=9D=98=20=ED=86=B5=EC=9D=BC=EC=84=B1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/schemas/job.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/app/schemas/job.py b/app/schemas/job.py index 4313bc5..f75ed14 100644 --- a/app/schemas/job.py +++ b/app/schemas/job.py @@ -2,20 +2,25 @@ from pydantic import BaseModel, Field class JobRequest(BaseModel): - presigned_url: str = Field(..., description="다운로드할 이미지의 Presigned URL") + presignedUrl: str = Field(..., description="다운로드할 이미지의 Presigned URL") +""" +FastAPI가 image.results Stream에 발행하는 메시지 +""" class JobResult(BaseModel): - pill_name: str - correlation_id: str - label: str - confidence: float - finished_at: str + correlationId: str + pillName: str + isSafe: int + description: str + finishedAt: str +""" +Spring으로부터 받는 Job(image.jobs 구독) +""" class ImageJob(BaseModel): - correlation_id: str = Field(alias="correlationId") - presigned_url: str = Field(alias="presignedUrl") - reply_queue: str = Field(alias="replyQueue") - callback_url: str | None = Field(alias="callbackUrl") - content_type: str = Field(alias="contentType") - created_at: str = Field(alias="createdAt") - ttl_sec: int = Field(alias="ttlSec") + correlationId: str = Field(alias="correlationId") + presignedUrl: str = Field(alias="presignedUrl") + replyQueue: str = Field(alias="replyQueue") + contentType: str = Field(alias="contentType") + createdAt: str = Field(alias="createdAt") + ttlSec: int = Field(alias="ttlSec") From 39bb4ff379145e716b59e152173c89486ecd85e5 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:34:19 +0900 Subject: [PATCH 2/7] =?UTF-8?q?refactor:=20snake=20case=20->=20camel=20cas?= =?UTF-8?q?e=EB=A1=9C=20=EB=B3=80=EA=B2=BD=20(Spring=20=EC=84=9C=EB=B2=84?= =?UTF-8?q?=EC=99=80=EC=9D=98=20=ED=86=B5=EC=9D=BC=EC=84=B1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index 06fd1b2..4b8c66f 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -12,13 +12,12 @@ def get_redis_client(request: Request) -> redis.Redis: @router.post("/predict", status_code=202) async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): - correlation_id = str(uuid.uuid4()) + correlationId = str(uuid.uuid4()) job = ImageJob( - correlationId=correlation_id, - presignedUrl=job_request.presigned_url, + correlationId=correlationId, + presignedUrl=job_request.presignedUrl, replyQueue=settings.STREAM_RESULT, - callbackUrl=None, - contentType="image/jpeg", + contentType=job_request.contentType, createdAt=datetime.utcnow().isoformat(), ttlSec=3600, ) @@ -30,4 +29,6 @@ async def create_prediction_job(job_request: JobRequest, redis_client: redis.Red approximate=True, ) - return {"job_id": correlation_id} \ No newline at end of file + print("분석 결과 발행 완료...") + + return {"job_id": correlationId} \ No newline at end of file From 43cb041e63d10c734afe1b0047c1dbbc7c0bba2d Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:34:51 +0900 Subject: [PATCH 3/7] =?UTF-8?q?fix:=20Message=20=EC=9E=91=EC=84=B1=20?= =?UTF-8?q?=EB=B0=8F=20=ED=95=84=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/predictions.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/app/api/endpoints/predictions.py b/app/api/endpoints/predictions.py index 4b8c66f..6015b35 100644 --- a/app/api/endpoints/predictions.py +++ b/app/api/endpoints/predictions.py @@ -4,12 +4,16 @@ import redis.asyncio as redis import uuid from datetime import datetime +import json router = APIRouter() def get_redis_client(request: Request) -> redis.Redis: return request.app.state.redis_client +""" +테스트를 위한 임시적인 API +""" @router.post("/predict", status_code=202) async def create_prediction_job(job_request: JobRequest, redis_client: redis.Redis = Depends(get_redis_client)): correlationId = str(uuid.uuid4()) @@ -22,9 +26,17 @@ async def create_prediction_job(job_request: JobRequest, redis_client: redis.Red ttlSec=3600, ) - await redis_client.xadd( + # 타입에 맞도록 넣어주기 + correlationId = job.correlationId + payload = json.dumps(job.dict()) + print("분석 결과 발행 시작...") + entry_id = await redis_client.xadd( settings.STREAM_JOB, - {"json": job.model_dump_json()}, + { + "type": "image_results", + "payload": payload, + "correlationId": correlationId, + }, maxlen=10_000, approximate=True, ) From fd8dea87d1d0a10f90db0a5755cf86fc37253c0a Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:36:55 +0900 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20Message=EC=97=90=20=EB=8C=80?= =?UTF-8?q?=ED=95=9C=20=EC=A0=84=EC=B2=98=EB=A6=AC=20=EC=B6=94=EA=B0=80(?= =?UTF-8?q?=EC=95=88=EC=A0=84=ED=95=9C=20=EC=A0=84=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=ED=9B=84=20=EC=9D=BD=EA=B8=B0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/redis_client.py | 141 +++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 38 deletions(-) diff --git a/app/worker/redis_client.py b/app/worker/redis_client.py index 4f56c75..1a79bcb 100644 --- a/app/worker/redis_client.py +++ b/app/worker/redis_client.py @@ -1,9 +1,9 @@ -import asyncio import redis +import json from redis.asyncio import Redis as AsyncRedis from pydantic import BaseModel, Field -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List, Optional, Union from app.core.config import settings # Redis Stream Definition @@ -12,7 +12,6 @@ class PublishRequest(BaseModel): payload: Dict[str, Any] class RedisStreamClient: - def __init__(self): self.redis_client = AsyncRedis.from_url( url=settings.REDIS_URL, @@ -21,38 +20,74 @@ def __init__(self): @classmethod def init(cls): - broker = cls() - return broker + return cls() + + @staticmethod + def _to_scalar(v: Any) -> Union[str, bytes, int, float]: + # Redis XADD : str/bytes/int/float 허용 + if isinstance(v, (str, bytes, int, float)): + return v + # 그 외는 JSON 문자열로 직렬화 (ensure_ascii=False로 한글 보존) + return json.dumps(v, ensure_ascii=False) + + + @classmethod + def _sanitize_fields_for_xadd(cls, fields: Dict[str, Any]) -> Dict[str, Union[str, bytes, int, float]]: + clean: Dict[str, Union[str, bytes, int, float]] = {} + for k, v in fields.items(): + if not isinstance(k, str): + k = str(k) + clean[k] = cls._to_scalar(v) + return clean + # Fast API 에서 Publish - async def xadd(self, stream_name: str, fields: Dict[str, Any]) -> str: - return await self.redis_client.xadd(stream_name, fields) + async def xadd( + self, + stream_name: str, + fields: Dict[str, Any], + *, + maxlen: Optional[int] = 10_000, + approximate: bool = True, + nomkstream: bool = False, + id: str = "*", + ) -> str: + safe_fields = self._sanitize_fields_for_xadd(fields) + return await self.redis_client.xadd( + stream_name, + safe_fields, + id=id, + maxlen=maxlen, + approximate=approximate, + nomkstream=nomkstream, + ) # Group 단위로 읽어오기 async def xreadgroup( - self, - group_name: str, - consumer_name: str, - stream_name: str, - count: Optional[int] = None, - block: Optional[int] = None, # ms 단위 - id: str = ">", # 새 메시지만 읽기 + self, + group_name: str, + consumer_name: str, + stream_name: str, + count: Optional[int] = None, + block: Optional[int] = None, # ms ) -> List[tuple]: + # Consumer Group 생성 (없으면) try: - # Create the consumer group (존재하지 않을 때) - self.redis_client.xgroup_create( - stream_name, group_name, id="$", mkstream=True + await self.redis_client.xgroup_create( + name=stream_name, + groupname=group_name, + id="0", + mkstream=True, ) except redis.exceptions.ResponseError as e: - # 이미 존재할 때 if "BUSYGROUP" not in str(e): raise - streams = {stream_name: id} + streams = {stream_name: ">"} # 신규 메시지 response = await self.redis_client.xreadgroup( - group_name, # groupname (positional) - consumer_name, # consumername (positional) - streams, # {stream: id} (positional) + group_name, + consumer_name, + streams, count=count, block=block, ) @@ -63,36 +98,38 @@ async def xack(self, stream_name: str, group_name: str, message_ids: List[str]) return await self.redis_client.xack(stream_name, group_name, *message_ids) # 완료 시 삭제 - async def xack_and_del(self, stream_name: str, group_name: str, message_ids: str) -> int: - - acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids) - - # XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL) + async def xack_and_del( + self, + stream_name: str, + group_name: str, + message_ids: Union[str, List[str]], + ) -> int: + ids = [message_ids] if isinstance(message_ids, str) else list(message_ids) + acked_count = await self.redis_client.xack(stream_name, group_name, *ids) if acked_count > 0: - await self.redis_client.xdel(stream_name, *message_ids) - + await self.redis_client.xdel(stream_name, *ids) return acked_count + # Group 생성 async def xgroup_create(self, stream_name: str, group_name: str, id: str = "$") -> bool: try: - self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) + await self.redis_client.xgroup_create(stream_name, group_name, id, mkstream=True) return True except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): print(f"Consumer group '{group_name}' already exists.") return False - raise e + raise # 메시지 재처리 지원 async def xclaim( - self, - stream_name: str, - group_name: str, - consumer_name: str, - min_idle_time: int, - message_ids: List[str], + self, + stream_name: str, + group_name: str, + consumer_name: str, + min_idle_time: int, + message_ids: List[str], ) -> List[tuple]: - return await self.redis_client.xclaim( stream_name=stream_name, group_name=group_name, @@ -101,6 +138,34 @@ async def xclaim( message_ids=message_ids, ) + # 자동으로 재청구 + async def xautoclaim( + self, + name: str, + groupname: str, + consumername: str, + min_idle_time: int, + start_id: str = "0-0", + count: Optional[int] = None, + justid: bool = False, + ): + res = await self.redis_client.xautoclaim( + name=name, + groupname=groupname, + consumername=consumername, + min_idle_time=min_idle_time, + start_id=start_id, + count=count, + justid=justid, + ) + # 2-튜플/3-튜플 호환하도록 전처리 + if isinstance(res, (list, tuple)) and len(res) == 3: + next_id, messages, _deleted = res + return next_id, messages + return res + + + # 종료 async def aclose(self): await self.redis_client.close() From 35cf9a4855e2c81e962f431071c7f864077dd72a Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:38:03 +0900 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=EB=B0=9C=ED=96=89=ED=95=98=EB=8A=94?= =?UTF-8?q?=20Message=20=EB=82=B4=EC=9A=A9=20=EC=88=98=EC=A0=95(correlatio?= =?UTF-8?q?nId,=20type,=20payload=20=EA=B0=81=EA=B0=81=20=ED=95=84?= =?UTF-8?q?=EB=93=9C=EB=A1=9C=20=EB=B0=9C=ED=96=89)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/tasks.py | 47 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/app/worker/tasks.py b/app/worker/tasks.py index a2a3bf2..3a22e33 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -8,34 +8,53 @@ from app.services.predictor_service import predictor_service from app.services.s3_service import s3_service +""" +이미지를 다운 -> 다운 한 것에 대하여 모델 분석 요청 +""" async def process_image_scan(job: ImageJob, redis_client: redis.Redis): - correlation_id = job.correlationId - print(f"[task] Start image scan for job_id={correlation_id}") + correlationId = job.correlationId + print(f"[task] Start image scan for job_id={correlationId}") try: - stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl)) - pill_name, label, confidence = await asyncio.to_thread(predictor_service.predict(stream_file)) + stream_file = await asyncio.to_thread( + s3_service.download_file_from_presigned_url, + job.presignedUrl + ) + + stream_file.seek(0) + + pillName, label, confidence = await asyncio.to_thread( + predictor_service.predict, + stream_file + ) + + # TODO: ChatGPT에 요청 결과 출력 - finished_at = datetime.utcnow().isoformat() + isSafe = 0 + description = "일단은 테스트입니다. 추후에 GPT 부분 추가할 예정" + finishedAt = datetime.utcnow().isoformat() result = JobResult( - pill_name=pill_name, - correlation_id=correlation_id, - label=label, - confidence=confidence, - finished_at=finished_at, + correlationId=correlationId, + pillName=pillName, + isSafe=isSafe, + description=description, + finishedAt=finishedAt, ) await redis_client.xadd( settings.STREAM_RESULT, - {"json": result.model_dump_json()}, + { + "correlationId": correlationId, + "type": "image_results", + "payload": result.model_dump_json()}, maxlen=10_000, approximate=True, ) - print(f"[task] Image scan finished for job_id={correlation_id}") + print(f"[task] Image scan successfully finished for job_id={correlationId}") except Exception as e: - print(f"[task] Failed to process job_id={correlation_id}: {e}") + print(f"[task] Failed to process job_id={correlationId}: {e}") finally: - print(f"[task] Image scan finished for job_id={correlation_id}") + print(f"[task] Image scan finished for job_id={correlationId}") From e8e0651fb684b41ccdb9e1e2e18e424589df3950 Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 15:38:34 +0900 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20=EB=A9=94=EC=8B=9C=EC=A7=80=20Consum?= =?UTF-8?q?er=EC=9D=98=20=EC=95=88=EC=A0=84=ED=95=9C=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=80=20=EC=9D=BD=EA=B8=B0=20=EC=A0=84=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=ED=95=A8=EC=88=98=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 136 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 108 insertions(+), 28 deletions(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index 2db59b1..fbdec09 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -1,10 +1,49 @@ - +import json import asyncio from app.worker.redis_client import redis_client from app.core.config import settings from app.schemas.job import ImageJob from app.worker.tasks import process_image_scan +""" +Redis Stream에 정의한 유효한 형식 메시지를 위한 전처리 함수 +""" +def _to_scalr(v): + # XADD 허용 타입: str, bytes, int, float + if isinstance(v, (str, bytes, int, float)): + return v + # 그 외는 JSON str + return json.dumps(v, ensure_ascii=False) + +""" +Decoding +""" +def _decode(b): + if isinstance(b, (bytes, bytearray)): + return b.decode() + else: + return b + + +def _sanitize_fields_for_xadd(fields: dict) -> dict: + # 정제 + cleaned = {} + for k, v in fields.items(): + k = _decode(k) + if isinstance(v, (bytes, bytearray)): + try: + v = v.decode() + except Exception: + pass + else: + v = _to_scalr(v) + cleaned[k] = v + return cleaned + + +""" +"image.jobs"를 구독 +""" class JobWorker: def __init__(self, redis_client: redis_client): self.redis_client = redis_client @@ -16,6 +55,7 @@ async def run(self): while True: try: + # Consumer의 메시지 읽기 resp = await self.redis_client.xreadgroup( group_name=settings.GROUP_NAME, consumer_name=settings.CONSUMER_NAME, @@ -27,25 +67,52 @@ async def run(self): _, entries = resp[0] for msg_id, fields in entries: try: - job = ImageJob.model_validate_json(fields["json"]) - task = asyncio.create_task(process_image_scan(job, redis_client)) - print(f"[worker] {task} 발행 성공") - # 처리 성공 시에만 ack 후 del - task.add_done_callback(lambda t: asyncio.create_task( - self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) - if not t.exception() else - self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(t.exception()), **fields}) - )) + job_type = fields.get(b"type") or fields.get("type") + correlation_id = fields.get(b"correlationId") or fields.get("correlationId") + payload = fields.get(b"payload") or fields.get("payload") + + # type 검증 + if job_type in (b"image_jobs", "image_jobs"): + + # payload 전처리 + if isinstance(payload, (bytes, bytearray)): + payload_str = payload.decode() + else: + payload_str = payload if isinstance(payload, str) else json.dumps(payload) + + # 최종 반환 data + data = json.loads(payload_str) + print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}") + + job = data + # XADD까지 호출 + task = asyncio.create_task(process_image_scan(job, redis_client)) + print(f"[worker] {task} 발행 성공") + + # 처리 성공 시에만 ack 후 del + task.add_done_callback(lambda t: asyncio.create_task( + self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) + if not t.exception() else + self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", + {"id": msg_id, "error": str(t.exception()), **fields}) + )) + + else: + # job_type 불일치 경우 -> DLQ + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": "unexpected job type"}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + except asyncio.CancelledError: # 취소되면 재전송되도록 ack 하지 않음 raise + except Exception as e: - await self.redis_client.xadd( - f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(e), **fields}, - ) + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(e)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + # 주기적으로 AutoClaim now = asyncio.get_event_loop().time() if now - last_reclaim > reclaim_every_sec: last_reclaim = now @@ -59,21 +126,34 @@ async def run(self): ) for msg_id, fields in claimed: try: - job = ImageJob.model_validate_json(fields["json"]) - task = asyncio.create_task(process_image_scan(job, redis_client)) + payload = fields.get(b"payload") or fields.get("payload") + if isinstance(payload, (bytes, bytearray)): + payload = payload.decode() + job = ImageJob.model_validate_json(payload) + + task = asyncio.create_task(process_image_scan(job, self.redis_client)) print(f"[worker] {task} 발행 성공") - # 처리 성공 시에만 ack 후 del - task.add_done_callback(lambda t: asyncio.create_task( - self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id) - if not t.exception() else - self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(t.exception()), **fields}) - )) + + def _on_done(t: asyncio.Task, *, msg_id=msg_id, fields=fields): + async def _ack_or_dlq(): + exc = t.exception() + if exc is None: + await self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, + msg_id) + else: + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(exc)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + + asyncio.create_task(_ack_or_dlq()) + + task.add_done_callback(_on_done) + except Exception as e: - await self.redis_client.xadd( - f"{settings.STREAM_JOB}:DLQ", - {"id": msg_id, "error": str(e), **fields}, - ) + clean = _sanitize_fields_for_xadd(fields) + clean.update({"id": _decode(msg_id), "error": str(e)}) + await self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ", clean) + except asyncio.CancelledError: print("[worker] cancelled; bye") break From bfe07a6d796e3625d22c4583e36d9d243347affb Mon Sep 17 00:00:00 2001 From: kite_U Date: Sun, 7 Sep 2025 16:31:21 +0900 Subject: [PATCH 7/7] =?UTF-8?q?feat:=20BaseModle=EB=A1=9C=20validate=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/worker/worker.py b/app/worker/worker.py index fbdec09..751455a 100644 --- a/app/worker/worker.py +++ b/app/worker/worker.py @@ -84,7 +84,7 @@ async def run(self): data = json.loads(payload_str) print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}") - job = data + job = ImageJob.model_validate(data) # XADD까지 호출 task = asyncio.create_task(process_image_scan(job, redis_client)) print(f"[worker] {task} 발행 성공")