Skip to content

[FEATURE] S3 이미지 읽기 구현#19

Merged
youyeon11 merged 9 commits intodevelopfrom
feat/s3-read-5
Sep 3, 2025
Merged

[FEATURE] S3 이미지 읽기 구현#19
youyeon11 merged 9 commits intodevelopfrom
feat/s3-read-5

Conversation

@youyeon11
Copy link
Contributor

📌 작업 목적

  • S3의 로직에 대한 테스트를 실시
  • Redis를 비동기 기반으로 전환

🗂 작업 유형

  • 기능 추가 (Feature)
  • 버그 수정 (Bug Fix)
  • 리팩터링 (Refactor)
  • 테스트 추가/수정 (Test)

🔨 주요 작업 내용

  • S3로 다운 받은 이미지에 대하여 Path로 따로 저장을 거치던 것을 수정하였습니다.
    ** s3_service.py **
    def download_file_from_presigned_url(self, presigned_url: str) -> BytesIO:
        response = requests.get(presigned_url)
        response.raise_for_status()
  • presignedUrl을 받으면 get method를 통하여 Byte 파일로 입력 후 전달
  • 이 반환을 predict가 사용하게 됨

  • s3_service에 대한 테스트 코드 작성
@patch("app.services.s3_service.requests.get")
    def test_download_file_success(self, mock_get):
        # arrange
        mock_get.return_value = MagicMock()
        mock_get.return_value.content = b"test file content"
        mock_get.return_value.raise_for_status = MagicMock()
- 해당 경로의 서비스에서의 request.get 함수를 mock 객체 대상으로 선정 ( requests.get() → mock_get() → MagicMock() 객체 반환. )
- 최종적으로 메모리 기반 바이너리 스트림인지에 대한 테스트

  • Redis Stream에 대한 작업을 비동기로 구현하여 redis_client라는 공통된 클래스를 사용하도록 설정하였습니다.
class RedisStreamClient:

    def __init__(self):
        self.redis_client = AsyncRedis.from_url(
            url=settings.REDIS_URL,
            decode_responses=True,
        )
- `async def` 키워드로 코루틴을 정의하고 이를 통해 비동기 작업을 수행합니다
- `await` 키워드를 사용하여 해당 비동기 작업이 완료 때까지 실행을 일시 중단 -> 그 동안 이벤트 루프가 다른 작업을 처리할 수 있도록 함

  • XACK 후의 XDEL 처리로 완료된 작업에 대하여는 더이상 스트림에 유지되지 않도록 설정
async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int:

  • lifespan.py 에서 설정한 비동기 컨텍스트 설정
@asynccontextmanager
async def lifespan(app: FastAPI):
    limiter = anyio.to_thread.current_default_thread_limiter()
    limiter.total_tokens = 100
- anyio.to_thread 를 통하여 동시 설정 가능한 스레드의 수를 우선 100으로 설정

📎 관련 이슈


💬 논의 및 고민한 점

  • 사실 로직의 흐름이 PresignedURL이므로, S3 설정을 추가하지 않아도 되어 다음 이슈 때 삭제하겠습니다.
  • TTL을 Spring에 맞춰 설정하여 타임 안에 처리하지 못한 작업은 삭제하는 후처리를 하겠습니다.
  • Task 부분 상세 작업 필요 (dict 타입 문제 발생 )

@youyeon11 youyeon11 changed the base branch from main to develop September 2, 2025 19:54
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @youyeon11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

이 Pull Request는 S3 이미지 처리의 효율성을 높이고 Redis Stream 기반의 비동기 작업 처리 시스템을 더욱 견고하게 만드는 데 중점을 둡니다. 이미지 데이터를 파일 시스템에 저장하는 대신 메모리 스트림으로 직접 처리하여 성능을 개선했으며, Redis Stream 메시지 처리에 대한 오류 처리 및 완료 로직을 강화했습니다. 또한, 블루/그린 배포를 위한 인프라 설정을 추가하여 서비스의 안정적인 배포 및 운영을 지원합니다.

Highlights

  • S3 이미지 처리 로직 개선: S3에서 이미지를 다운로드할 때, 기존에는 임시 파일로 저장하는 방식이었으나, 이제는 BytesIO 스트림을 사용하여 메모리 내에서 직접 처리하도록 변경하여 디스크 I/O를 줄이고 효율성을 높였습니다.
  • Redis Stream 비동기 처리 전환 및 기능 강화: Redis Stream 작업을 비동기 기반으로 전면 전환하고, RedisStreamClient 클래스를 도입하여 xadd, xreadgroup, xack, xack_and_del, xclaim 등 Redis Stream 관련 기능을 통합 관리하도록 구현했습니다. 특히, 메시지 처리 완료 후 XACKXDEL을 통해 스트림에서 메시지를 제거하고, 처리 실패 시 Dead Letter Queue(DLQ)로 메시지를 전송하는 로직을 추가하여 견고성을 확보했습니다.
  • 배포 자동화 및 블루/그린 배포 환경 구축: 블루/그린 배포 전략을 지원하는 deploy.sh 스크립트와 docker-compose.yml, nginx.conf 파일을 추가하여 무중단 배포 환경을 구축했습니다. 이를 통해 서비스 업데이트 시 다운타임 없이 안정적인 전환이 가능해졌습니다.
  • S3 서비스 및 API 엔드포인트 테스트 코드 추가: S3 이미지 다운로드 서비스에 대한 단위 테스트를 추가하여 핵심 로직의 신뢰성을 확보하고, 헬스 체크 API 엔드포인트에 대한 테스트도 추가하여 서비스의 기본 동작을 검증합니다.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@youyeon11
Copy link
Contributor Author

🤖 Gemini 코드 리뷰 결과

1. 주요 변경 사항 요약 및 의도 파악

이 PR은 S3 이미지 처리 로직의 개선과 Redis 스트림 처리 방식을 비동기로 전환하는 것을 목표로 합니다. S3에서 이미지를 다운로드하여 로컬에 저장하는 대신, 바이트 스트림으로 직접 처리하여 I/O 오버헤드를 줄이고 예측 작업을 수행합니다. Redis 스트림 작업을 비동기로 전환하여 동시성을 높이고 처리량을 개선하려는 의도로 보입니다. 추가적으로 S3 관련 설정을 정리하고, Redis 메시지 처리 후 삭제 로직을 구현하여 스트림의 크기를 관리하고자 합니다.

2. 코드 품질 및 가독성

  • 코드 스타일: 전반적으로 PEP 8 규칙을 잘 준수하고 있습니다. 다만, redis_client.py에서 xack_and_del 함수 내부의 XACK 주석은 불필요해 보입니다. 함수명 자체가 기능을 명확히 나타내고 있기 때문입니다.

  • 변수/함수명: stream_file, redis_client 등 변수 및 함수명은 명확하고 직관적입니다. predict 함수의 입력 인자명을 image_path에서 stream_file로 변경하여 변경된 로직을 잘 반영하고 있습니다.

  • 주석/문서화: redis_client.py의 각 함수에 대한 docstring이 잘 작성되어 있습니다. s3_service.pydownload_file_from_presigned_url 함수에도 변경 사항에 대한 주석이 추가되어 이해하기 쉽습니다. 다만, tasks.pyprocess_image_scan 함수 내부의 주석은 다소 불필요해 보입니다. 로직 자체가 간결하고 변수명이 명확하기 때문에 주석보다는 코드 자체로 충분히 이해 가능합니다.

  • 중복 코드: 현재까지는 중복 코드가 발견되지 않습니다.

3. 잠재적 버그 및 엣지 케이스

  • 논리적 오류: lifespan.py에서 limiter.total_tokens = 100으로 설정되어 있는데, 이 값이 시스템 자원과 어떤 관계가 있는지, 적절한 값인지 확인이 필요합니다. 동시 스레드 수 제한이라면 그 근거를 명시하는 것이 좋겠습니다.

  • 경쟁 상태 (Race Condition): 비동기 처리로 전환하면서 발생할 수 있는 잠재적인 경쟁 상태 문제를 고려해야 합니다. 특히 Redis 스트림에서 메시지를 가져오고 처리하는 과정에서 동시성 문제가 발생할 가능성이 있는지 꼼꼼하게 검토해야 합니다. worker.py에서 DLQ 처리 로직이 추가되었는데, DLQ에 메시지를 추가하는 부분에서도 경쟁 상태 문제가 발생할 수 있는지 확인 필요합니다.

  • 에러 핸들링: tasks.pyprocess_image_scan 함수에서 발생하는 예외를 Exception으로 너무 광범위하게 잡고 있습니다. 좀 더 구체적인 예외 유형으로 나누어 처리하고, 각 예외에 대한 적절한 로그 메시지를 남기는 것이 좋겠습니다. 예를 들어, S3 다운로드 실패, 이미지 처리 실패 등으로 구분하여 처리하는 것이 좋습니다.

4. 성능 및 효율성

  • 시간 복잡도: 이미지 처리 로직의 시간 복잡도는 모델의 크기와 이미지의 크기에 따라 달라질 것으로 예상됩니다. 대용량 이미지를 처리해야 하는 경우 성능 병목 현상이 발생할 수 있으므로, 필요에 따라 이미지 크기 조정이나 비동기 처리 등의 최적화 전략을 고려해야 합니다.

  • 자원 사용: 로컬 파일 저장 방식에서 바이트 스트림 처리 방식으로 변경하여 I/O 오버헤드를 줄였습니다. 좋은 개선입니다. 다만, BytesIO 객체를 사용하면서 메모리 사용량이 증가할 수 있으므로, 대용량 이미지 처리 시 메모리 관리에 주의해야 합니다.

  • 최적화 제안: predict 함수 내에서 이미지 변환 및 모델 추론 과정을 비동기로 처리하여 성능 향상을 도모할 수 있을지 검토해 보는 것도 좋을 것 같습니다.

5. 보안 및 아키텍처

  • 보안 취약점: presigned URL을 사용하고 있지만, URL의 유효 기간을 제한하고, 사용 후에는 즉시 무효화하는 등의 추가적인 보안 조치를 고려해야 합니다.

  • 아키텍처 적합성: 비동기 처리 방식으로의 전환은 시스템 전체의 성능 향상에 기여할 수 있을 것으로 예상됩니다. 다만, 비동기 처리로 인해 디버깅이 복잡해질 수 있으므로, 충분한 로깅 및 모니터링 시스템을 구축해야 합니다.

  • 확장성: Redis Stream을 사용하여 작업 큐를 구현한 것은 좋은 설계입니다. 향후 작업량 증가에 따라 worker 노드를 추가하여 쉽게 확장할 수 있습니다. 다만, PR 본문에 언급된 "Task 부분 상세 작업 필요 (dict 타입 문제 발생)" 부분은 명확히 설명되어 있지 않습니다. 이 부분에 대한 자세한 설명과 해결 방안을 추가하여 향후 유지보수 및 확장성을 확보해야 합니다.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

이번 PR은 S3 이미지 다운로드와 Redis 스트림 처리를 비동기 방식으로 전환하여 성능을 개선하려는 좋은 시도입니다. 하지만 비동기 코드 구현과 관련하여 몇 가지 심각한 문제점들이 발견되었습니다. asyncio.to_thread의 잘못된 사용, 코루틴에 대한 await 누락, 함수 호출 시의 타입 불일치 등이 있으며, 이는 런타임 오류를 유발할 가능성이 높습니다. 또한, 중복 코드 리팩토링, 적절한 로깅 프레임워크 사용, Redis 작업의 원자성 보장 등을 통해 코드 품질을 개선할 여지가 있습니다. 추가된 테스트 코드 또한 올바르게 동작하기 위해 수정이 필요합니다.

streams={settings.STREAM_JOB: ">"},
group_name=settings.GROUP_NAME,
consumer_name=settings.CONSUMER_NAME,
stream_name={settings.STREAM_JOB: ">"},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

xreadgroup 메서드는 stream_name 인자로 스트림의 이름(문자열)을 기대하지만, 딕셔너리({settings.STREAM_JOB: ">"})가 전달되고 있습니다. xreadgroup 내부에서 id를 처리하므로 스트림 이름만 전달해야 합니다.

Suggested change
stream_name={settings.STREAM_JOB: ">"},
stream_name=settings.STREAM_JOB,

Comment on lines +34 to +39
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

self.redis_client.xack_and_del 메서드는 message_ids 인자로 문자열 리스트(List[str])를 기대하지만, 단일 문자열 msg_id가 전달되고 있습니다. [msg_id]와 같이 리스트로 감싸서 전달해야 합니다.

Suggested change
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, [msg_id])
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))


def predict(self, image_path: Path) -> tuple[str, str, float]:
image = Image.open(image_path).convert('RGB')
async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

predict 메서드는 내부적으로 동기적인(blocking) 코드(Pillow, PyTorch)를 포함하고 있으므로 async def로 선언해서는 안 됩니다. app.worker.tasks에서 asyncio.to_thread를 사용하여 이 함수를 호출하려고 하는데, async def 함수는 to_thread로 실행할 수 없어 TypeError가 발생합니다. 이 메서드를 일반 동기 함수로 변경해야 합니다.

Suggested change
async def predict(self, stream_file: BytesIO) -> tuple[str, str, float]:
def predict(self, stream_file: BytesIO) -> tuple[str, str, float]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyio.to_thread와 async는 실행 방식에서 근본적인 차이가 있습니다. async 키워드는 비동기 함수를 선언하는 문법(코드 스타일)이고, anyio.to_thread는 이러한 비동기 함수 내에서 CPU 바운드 작업을 별도의 스레드로 옮겨 실행하여 메인 스레드의 블로킹을 방지하는 함수입니다. 즉, async는 비동기 코드의 작성 방식이고, anyio.to_thread는 비동기 코드에서 스레드를 활용해 동시성을 구현하는 기능입니다.

  1. anyio.to_thread 의 블로킹 작업 실행: anyio.to_thread는 CPU 집약적인(블로킹) 함수를 별도의 스레드 풀에 할당하여 실행합니다.
  2. 이벤트 루프 보호: async 함수는 기본적으로 이벤트 루프를 사용하는데, CPU를 많이 사용하는 작업은 이벤트 루프를 멈추게 합니다. anyio.to_thread는 이러한 블로킹 작업을 별도의 스레드에서 처리함으로써 메인 이벤트 루프가 다른 비동기 작업들을 계속 처리할 수 있도록 합니다.
  3. 동시성 구현: async 코드가 I/O 바운드 작업에 효율적인 반면, anyio.to_thread는 비동기 코드에서 스레드를 활용하여 CPU 바운드 작업과의 동시성을 효과적으로 관리하는 도구입니다.

Comment on lines +66 to +71
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

여기에서도 self.redis_client.xack_and_del 메서드에 msg_id를 리스트([msg_id])로 전달해야 합니다. 또한, 28-48행의 로직과 이 부분이 거의 동일하므로, 중복을 제거하기 위해 별도의 헬퍼 메서드로 추출하는 것을 고려해보세요.

Suggested change
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, [msg_id])
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))


try:
s3_service.download_file_from_presigned_url(job.presignedUrl, temp_image_path)
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

asyncio.to_thread의 사용법이 잘못되었습니다. to_thread는 첫 번째 인자로 실행할 동기 함수를 받고, 나머지 인자들은 그 함수에 전달될 인자들입니다. 현재 코드는 s3_service.download_file_from_presigned_url 함수를 먼저 호출하여 이벤트 루프를 블로킹하고, 그 반환 값을 to_thread로 넘기고 있습니다.

Suggested change
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url(job.presignedUrl))
stream_file = await asyncio.to_thread(s3_service.download_file_from_presigned_url, job.presignedUrl)

Comment on lines 66 to 74
async def xack_and_del(self, stream_name: str, group_name: str, message_ids: List[str]) -> int:

acked_count = await self.redis_client.xack(stream_name, group_name, *message_ids)

# XACK가 성공하면 스트림에서 해당 메시지를 삭제 (XDEL)
if acked_count > 0:
await self.redis_client.xdel(stream_name, *message_ids)

return acked_count

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

XACKXDEL은 별개의 명령으로 실행됩니다. 두 명령이 원자적으로 실행되도록 보장하려면 Redis의 pipeline을 사용하는 것이 좋습니다. 이렇게 하면 XACK가 성공한 후에 XDEL이 실패하는 경우를 방지할 수 있습니다.


class JobWorker:
def __init__(self, redis_client: redis.Redis):
def __init__(self, redis_client: redis_client):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

redis_client의 타입 힌트가 redis_client 인스턴스 자체로 되어 있습니다. RedisStreamClient 클래스로 지정해야 올바른 타입 힌트가 됩니다. 파일 상단에 from app.worker.redis_client import RedisStreamClient를 추가하고 타입 힌트를 수정해주세요.

Suggested change
def __init__(self, redis_client: redis_client):
def __init__(self, redis_client: "RedisStreamClient"):

async def lifespan(app: FastAPI):
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

스레드 제한 수(100)가 하드코딩되어 있습니다. 이러한 값은 애플리케이션 설정(app/core/config.py)으로 옮겨 관리하는 것이 유지보수 측면에서 더 좋습니다.

finally:
if temp_image_path.exists():
temp_image_path.unlink()
print(f"[task] Image scan finished for job_id={correlation_id}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

이 로그 메시지는 36행에 이미 존재하여 중복됩니다. finally 블록은 항상 실행되므로 36행의 로그를 제거하고 이 로그만 남기는 것이 더 깔끔합니다. 또한 print 대신 logging 모듈 사용을 고려해보세요.

def test_download_file_http_error(self, mock_get):
# arrange
mock_get.return_value = MagicMock()
mock_get.return_value.raise_for_status.side_effect = Exception("HTTP Error")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

HTTP 오류를 테스트할 때 requests.exceptions.HTTPError와 같이 더 구체적인 예외 타입을 확인하는 것이 좋습니다. Exception은 너무 광범위하여 의도치 않은 다른 예외 상황에서도 테스트가 통과될 수 있습니다.

@youyeon11 youyeon11 merged commit 6a8e492 into develop Sep 3, 2025
@youyeon11 youyeon11 mentioned this pull request Sep 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] S3 이미지 읽기 구현

1 participant