Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ build/
.pytest_cache/
.mypy_cache/
.ruff_cache/
artifacts/
1 change: 1 addition & 0 deletions app/api/v1/endpoints/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async def get_job_result(
status=job.status,
caption=result.caption if result else None,
instagram_meta=result.instagram_meta if result else None,
extraction_result=result.extraction_result if result else None,
error_message=job.error_message,
updated_at=job.updated_at,
)
6 changes: 6 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ class Settings(BaseSettings):
kakao_timeout_seconds: int = 5
kakao_max_places_per_candidate: int = 5

hf_extraction_endpoint_url: str = ""
hf_extraction_api_token: str = ""
hf_extraction_model_name: str = "Qwen/Qwen2.5-3B-Instruct"
hf_extraction_timeout_seconds: int = 20
hf_extraction_max_new_tokens: int = 512

@field_validator("processing_schema")
@classmethod
def validate_schema_name(cls, value: str) -> str:
Expand Down
6 changes: 6 additions & 0 deletions app/domain/job/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
from app.domain.job.model import (
CrawlArtifact,
ExtractionCertainty,
ExtractionResult,
ExtractedCandidate,
JobRecord,
JobResultRecord,
JobStatus,
PlaceCandidate,
as_candidate_dict,
as_extraction_result_dict,
as_place_dict,
)
from app.domain.job.service import CreateJobCommand, InvalidJobRequest, JobService

__all__ = [
"CrawlArtifact",
"ExtractionCertainty",
"ExtractionResult",
"ExtractedCandidate",
"JobRecord",
"JobResultRecord",
"JobStatus",
"PlaceCandidate",
"as_candidate_dict",
"as_extraction_result_dict",
"as_place_dict",
"CreateJobCommand",
"InvalidJobRequest",
Expand Down
26 changes: 26 additions & 0 deletions app/domain/job/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@ class JobStatus(str, Enum):
FAILED = "FAILED"


class ExtractionCertainty(str, Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"


@dataclass(slots=True)
class ExtractionResult:
store_name: str | None
address: str | None
store_name_evidence: str | None
address_evidence: str | None
certainty: ExtractionCertainty


@dataclass(slots=True)
class JobRecord:
job_id: UUID
Expand All @@ -30,6 +45,7 @@ class JobResultRecord:
job_id: UUID
caption: str | None
instagram_meta: dict[str, Any] | None
extraction_result: dict[str, Any] | None
created_at: datetime
updated_at: datetime

Expand Down Expand Up @@ -86,3 +102,13 @@ def as_candidate_dict(candidate: ExtractedCandidate) -> dict[str, Any]:
"source_sentence": candidate.source_sentence,
"raw_candidate": candidate.raw_candidate,
}


def as_extraction_result_dict(result: ExtractionResult) -> dict[str, Any]:
return {
"store_name": result.store_name,
"address": result.address,
"store_name_evidence": result.store_name_evidence,
"address_evidence": result.address_evidence,
"certainty": result.certainty.value,
}
3 changes: 3 additions & 0 deletions app/infra/db/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ async def upsert_job_result(
job_id: UUID,
caption: str | None,
instagram_meta: dict[str, Any] | None,
extraction_result: dict[str, Any] | None = None,
) -> JobResultRecord:
_ = extraction_result
sql = f"""
INSERT INTO {self._results_table}
(job_id, caption, instagram_meta)
Expand Down Expand Up @@ -144,6 +146,7 @@ def _to_job_result_record(self, row: asyncpg.Record) -> JobResultRecord:
job_id=row["job_id"],
caption=row["caption"],
instagram_meta=self._json_to_dict(row["instagram_meta"]),
extraction_result=None,
created_at=row["created_at"],
updated_at=row["updated_at"],
)
Expand Down
13 changes: 13 additions & 0 deletions app/infra/llm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from app.infra.llm.client import (
HFExtractionClient,
HFExtractionError,
extract_json_object,
extract_text_from_hf_payload,
)

__all__ = [
"HFExtractionClient",
"HFExtractionError",
"extract_json_object",
"extract_text_from_hf_payload",
]
167 changes: 167 additions & 0 deletions app/infra/llm/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from __future__ import annotations

import json
import re
from typing import Any

import httpx
from pydantic import ValidationError

from app.core.config import Settings
from app.domain.job import ExtractionResult
from app.schemas.extraction import ExtractionLLMResponse

EXTRACTION_SYSTEM_PROMPT = (
"You extract store information from Korean restaurant social media captions. "
"Return only one JSON object with these exact keys: store_name, address, "
"store_name_evidence, address_evidence, certainty. Use null when a value is "
"unknown. Evidence values must be substrings copied from the input caption. "
"certainty must be one of high, medium, or low. Do not include explanations, "
"Markdown, or any text outside the JSON object."
)


class HFExtractionError(Exception):
pass


class HFExtractionClient:
def __init__(
self,
settings: Settings,
*,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._settings = settings
self._transport = transport

async def extract(
self,
*,
text: str,
source_url: str,
media_type: str | None,
) -> ExtractionResult | None:
if not text.strip():
return None
if not self._settings.hf_extraction_endpoint_url:
raise HFExtractionError("HF extraction endpoint URL is empty")
if not self._settings.hf_extraction_api_token:
raise HFExtractionError("HF extraction API token is empty")

payload = self._build_payload(
text=text,
source_url=source_url,
media_type=media_type,
)
headers = {
"Authorization": f"Bearer {self._settings.hf_extraction_api_token}",
"Content-Type": "application/json",
}
timeout = httpx.Timeout(self._settings.hf_extraction_timeout_seconds)

try:
async with httpx.AsyncClient(
timeout=timeout,
transport=self._transport,
) as client:
response = await client.post(
self._settings.hf_extraction_endpoint_url,
headers=headers,
json=payload,
)
except (httpx.TimeoutException, httpx.NetworkError) as exc:
raise HFExtractionError(str(exc)) from exc

if response.status_code >= 400:
raise HFExtractionError(f"HF request failed ({response.status_code})")

try:
response_payload = response.json()
except json.JSONDecodeError as exc:
raise HFExtractionError("HF response is not valid JSON") from exc

generated_text = extract_text_from_hf_payload(response_payload)
generated_json = extract_json_object(generated_text)

try:
return ExtractionLLMResponse.model_validate(generated_json).to_domain()
except ValidationError as exc:
raise HFExtractionError("HF response failed schema validation") from exc

def _build_payload(
self,
*,
text: str,
source_url: str,
media_type: str | None,
) -> dict[str, Any]:
_ = source_url, media_type
return {
"model": self._settings.hf_extraction_model_name,
"messages": [
{"role": "system", "content": EXTRACTION_SYSTEM_PROMPT},
{"role": "user", "content": text},
],
"temperature": 0.0,
"max_tokens": self._settings.hf_extraction_max_new_tokens,
}


def extract_text_from_hf_payload(payload: Any) -> str:
if isinstance(payload, str):
return payload

if isinstance(payload, list):
if not payload:
raise HFExtractionError("HF response list is empty")
return extract_text_from_hf_payload(payload[0])

if not isinstance(payload, dict):
raise HFExtractionError("HF response has unsupported shape")

generated_text = payload.get("generated_text")
if isinstance(generated_text, str):
return generated_text

output = payload.get("output") or payload.get("outputs")
if isinstance(output, str):
return output

choices = payload.get("choices")
if isinstance(choices, list) and choices:
choice = choices[0]
if isinstance(choice, dict):
message = choice.get("message")
if isinstance(message, dict) and isinstance(message.get("content"), str):
return message["content"]
if isinstance(choice.get("text"), str):
return choice["text"]

raise HFExtractionError("HF response does not contain generated text")


def extract_json_object(text: str) -> dict[str, Any]:
raw = (text or "").strip()
if not raw:
raise HFExtractionError("Generated text is empty")

fenced = re.fullmatch(r"```(?:json)?\s*(.*?)\s*```", raw, re.DOTALL | re.IGNORECASE)
if fenced:
raw = fenced.group(1).strip()

try:
parsed = json.loads(raw)
except json.JSONDecodeError:
start = raw.find("{")
end = raw.rfind("}")
if start < 0 or end <= start:
raise HFExtractionError("Generated text does not contain a JSON object") from None
try:
parsed = json.loads(raw[start : end + 1])
except json.JSONDecodeError as exc:
raise HFExtractionError("Generated text contains invalid JSON") from exc

if not isinstance(parsed, dict):
raise HFExtractionError("Generated JSON is not an object")
return parsed
4 changes: 4 additions & 0 deletions app/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from app.schemas.extraction import ExtractionLLMResponse
from app.schemas.jobs import (
ApiErrorResponse,
CreateJobRequest,
CreateJobResponse,
JobResultResponse,
JobStatusResponse,
ExtractionResultResponse,
)

__all__ = [
Expand All @@ -12,4 +14,6 @@
"CreateJobResponse",
"JobResultResponse",
"JobStatusResponse",
"ExtractionResultResponse",
"ExtractionLLMResponse",
]
52 changes: 52 additions & 0 deletions app/schemas/extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from typing import Literal

from pydantic import BaseModel, ConfigDict, field_validator

from app.domain.job.model import ExtractionCertainty, ExtractionResult


class ExtractionLLMResponse(BaseModel):
model_config = ConfigDict(extra="ignore")

store_name: str | None = None
address: str | None = None
store_name_evidence: str | None = None
address_evidence: str | None = None
certainty: Literal["high", "medium", "low"] | None = None

@field_validator(
"store_name",
"address",
"store_name_evidence",
"address_evidence",
mode="before",
)
@classmethod
def normalize_optional_string(cls, value: object) -> object:
if value is None:
return None
if isinstance(value, str):
stripped = value.strip()
return stripped or None
return value

@field_validator("certainty", mode="before")
@classmethod
def normalize_certainty(cls, value: object) -> object:
if value is None:
return None
if isinstance(value, str):
stripped = value.strip().lower()
return stripped or None
return value

def to_domain(self) -> ExtractionResult:
return ExtractionResult(
store_name=self.store_name,
address=self.address,
store_name_evidence=self.store_name_evidence,
address_evidence=self.address_evidence,
certainty=ExtractionCertainty(self.certainty or "low"),
)
9 changes: 9 additions & 0 deletions app/schemas/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
from app.domain.job.model import JobStatus


class ExtractionResultResponse(BaseModel):
store_name: str | None
address: str | None
store_name_evidence: str | None
address_evidence: str | None
certainty: Literal["high", "medium", "low"]


class CreateJobRequest(BaseModel):
url: HttpUrl = Field(..., examples=["https://www.instagram.com/reel/abcde/"])
room_id: UUID
Expand Down Expand Up @@ -40,6 +48,7 @@ class JobResultResponse(BaseModel):
status: JobStatus
caption: str | None
instagram_meta: dict[str, object] | None
extraction_result: ExtractionResultResponse | None = None
error_message: str | None
updated_at: datetime

Expand Down
Loading
Loading