Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions handlers/instagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
InputMediaDocument,
InputMediaPhoto,
Message,
ReactionTypeEmoji,
)

from data.config import locale
Expand All @@ -33,10 +34,19 @@ async def handle_instagram_link(
lang: str,
file_mode: bool,
group_chat: bool,
status_message: Message | None = None,
) -> None:
client = InstagramClient()
media_info = await client.get_media(instagram_url)

if not status_message:
try:
await message.react(
[ReactionTypeEmoji(emoji="👨‍💻")], disable_notification=True
)
except TelegramBadRequest:
logger.debug("Failed to set processing reaction")

if media_info.is_video:
await bot.send_chat_action(
chat_id=message.chat.id, action="upload_video"
Expand Down
3 changes: 2 additions & 1 deletion handlers/link_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ async def handle_instagram_message(

try:
await handle_instagram_link(
message, instagram_url, lang, file_mode, group_chat
message, instagram_url, lang, file_mode, group_chat,
status_message=status_message,
)
except InstagramError as e:
if status_message:
Expand Down
100 changes: 69 additions & 31 deletions instagram_api/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import asyncio
import logging
import re

from aiohttp import ClientTimeout

from data.config import config
from media_types.http_session import _get_http_session

Expand All @@ -24,6 +27,10 @@
"instagram-downloader-download-instagram-stories-videos4.p.rapidapi.com"
)

_MAX_ATTEMPTS = 3
_RETRY_DELAYS = (3, 5)
_REQUEST_TIMEOUT = ClientTimeout(total=10, connect=3)


class InstagramClient:
async def get_media(self, url: str) -> InstagramMediaInfo:
Expand All @@ -36,40 +43,71 @@ async def get_media(self, url: str) -> InstagramMediaInfo:
}
api_url = f"https://{_RAPIDAPI_HOST}/convert"

try:
async with session.get(
api_url, params={"url": url}, headers=headers
) as response:
if response.status == 404:
raise InstagramNotFoundError("Post not found or private")
if response.status == 429:
raise InstagramRateLimitError("API rate limit exceeded")
if response.status != 200:
text = await response.text()
logger.error(
f"Instagram API error {response.status}: {text}"
)
raise InstagramNetworkError(
f"API returned status {response.status}"
)
last_exc: Exception | None = None
for attempt in range(1, _MAX_ATTEMPTS + 1):
try:
async with session.get(
api_url,
params={"url": url},
headers=headers,
timeout=_REQUEST_TIMEOUT,
) as response:
if response.status == 404:
raise InstagramNotFoundError("Post not found or private")
if response.status == 429:
raise InstagramRateLimitError("API rate limit exceeded")
if response.status >= 500:
raise InstagramNetworkError(
f"API returned status {response.status}"
)
if response.status != 200:
text = await response.text()
logger.error(
f"Instagram API error {response.status}: {text}"
)
raise InstagramNetworkError(
f"API returned status {response.status}"
)

data = await response.json()
logger.debug(f"Instagram API response keys: {list(data.keys())}")
logger.debug(
f"Instagram API media count: {len(data.get('media', []))}"
)
for i, item in enumerate(data.get("media", [])):
data = await response.json()
logger.debug(f"Instagram API response keys: {list(data.keys())}")
logger.debug(
f" media[{i}]: type={item.get('type')}, "
f"url={item.get('url', '')[:120]}, "
f"thumbnail={str(item.get('thumbnail', ''))[:120]}, "
f"quality={item.get('quality')}"
f"Instagram API media count: {len(data.get('media', []))}"
)
except (InstagramNotFoundError, InstagramRateLimitError, InstagramNetworkError):
raise
except Exception as e:
logger.error(f"Instagram API request failed: {e}")
raise InstagramNetworkError(f"Request failed: {e}") from e
for i, item in enumerate(data.get("media", [])):
logger.debug(
f" media[{i}]: type={item.get('type')}, "
f"url={item.get('url', '')[:120]}, "
f"thumbnail={str(item.get('thumbnail', ''))[:120]}, "
f"quality={item.get('quality')}"
)
break # success
except InstagramNotFoundError:
raise
except (InstagramRateLimitError, InstagramNetworkError) as e:
last_exc = e
except Exception as e:
last_exc = InstagramNetworkError(f"Request failed: {e}")
last_exc.__cause__ = e

if attempt < _MAX_ATTEMPTS:
delay = _RETRY_DELAYS[attempt - 1]
logger.warning(
"Instagram API attempt %d/%d failed: %s — retrying in %ds",
attempt,
_MAX_ATTEMPTS,
last_exc,
delay,
)
await asyncio.sleep(delay)
else:
logger.error(
"Instagram API attempt %d/%d failed: %s — giving up",
attempt,
_MAX_ATTEMPTS,
last_exc,
)
raise last_exc # type: ignore[misc]

media_items = []
for item in data.get("media", []):
Expand Down
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ main = [
"APScheduler==3.11.2",
"Pillow==12.1.0",
"pillow-heif==1.1.1",
"yt-dlp==2026.02.04",
# curl_cffi version must be compatible with yt-dlp's BROWSER_TARGETS
# Check yt_dlp/networking/_curlcffi.py for supported versions when updating yt-dlp
"curl_cffi>=0.10.0,<0.15.0",
]

[tool.uv]
Expand Down
16 changes: 16 additions & 0 deletions tt-scrap/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.13-slim

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app

# Install dependencies first (cache layer)
COPY pyproject.toml uv.lock* ./
RUN uv sync --frozen --no-dev 2>/dev/null || uv sync --no-dev

# Copy application code
COPY app/ app/

EXPOSE 8000

CMD ["uv", "run", "uvicorn", "app.app:app", "--host", "0.0.0.0", "--port", "8000"]
98 changes: 98 additions & 0 deletions tt-scrap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Media Scraper API

Standalone FastAPI server for extracting video, slideshow, and music metadata from social media platforms. Built with a service-based architecture — each platform is a self-contained plugin under `app/services/`.

Currently supported: **TikTok**

## Running with uv

```bash
cd tt-scrap

# Install dependencies
uv sync

# Start the server
uv run uvicorn app.app:app --host 0.0.0.0 --port 8000

# With auto-reload for development
uv run uvicorn app.app:app --reload
```

## Running with Docker

```bash
cd tt-scrap

# Build
docker build -t tt-scrap .

# Run
docker run -p 8000:8000 tt-scrap

# Run with environment variables
docker run -p 8000:8000 \
-e PROXY_FILE=/data/proxies.txt \
-e LOG_LEVEL=DEBUG \
-v /path/to/proxies.txt:/data/proxies.txt \
tt-scrap
```

## API Endpoints

Routes are namespaced per service: `/{service}/...`

### TikTok

#### `GET /tiktok/video`

Extract video or slideshow metadata from a TikTok URL.

| Parameter | Type | Description |
|-----------|--------|-----------------------------------|
| `url` | string | TikTok video or slideshow URL |
| `raw` | bool | Return raw TikTok API data (default: false) |

#### `GET /tiktok/music`

Extract music metadata from a TikTok video.

| Parameter | Type | Description |
|------------|------|------------------------|
| `video_id` | int | TikTok video ID |
| `raw` | bool | Return raw data (default: false) |

### Shared

#### `GET /health`

Health check. Returns `{"status": "ok"}`.

#### `GET /docs`

Interactive OpenAPI documentation (Swagger UI).

## Environment Variables

### Global

| Variable | Default | Description |
|----------------------|---------|------------------------------------------|
| `PROXY_FILE` | `""` | Path to proxy file (one URL per line) |
| `PROXY_INCLUDE_HOST` | `false` | Include direct connection in proxy rotation |
| `LOG_LEVEL` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR) |

### TikTok (`TIKTOK_` prefix)

| Variable | Default | Description |
|-----------------------------------|---------|------------------------------------------|
| `TIKTOK_URL_RESOLVE_MAX_RETRIES` | `3` | Max retries for short URL resolution |
| `TIKTOK_VIDEO_INFO_MAX_RETRIES` | `3` | Max retries for video info extraction |
| `YTDLP_COOKIES` | `""` | Path to Netscape-format cookies file |

## Adding a New Service

1. Create `app/services/<name>/` with `client.py`, `parser.py`, `routes.py`
2. Implement the `BaseClient` protocol (see `app/base_client.py`)
3. Create a factory function returning a `ServiceEntry`
4. Register it in `app/app.py` lifespan
Empty file added tt-scrap/app/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions tt-scrap/app/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""FastAPI REST API server for media scraping."""

from __future__ import annotations

import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.responses import JSONResponse

from .config import settings
from .exceptions import (
ContentDeletedError,
ContentPrivateError,
ContentTooLongError,
ExtractionError,
InvalidLinkError,
NetworkError,
RateLimitError,
RegionBlockedError,
ScraperError,
UnsupportedServiceError,
)
from .models import ErrorResponse
from .proxy_manager import ProxyManager
from .registry import ServiceRegistry
from .routes import router
from .services import create_tiktok_service

logger = logging.getLogger(__name__)

_ERROR_STATUS_MAP: dict[type[ScraperError], int] = {
ContentDeletedError: 404,
ContentPrivateError: 403,
InvalidLinkError: 400,
UnsupportedServiceError: 400,
ContentTooLongError: 413,
RateLimitError: 429,
NetworkError: 502,
RegionBlockedError: 451,
ExtractionError: 500,
}


@asynccontextmanager
async def lifespan(app: FastAPI):
log_level = getattr(logging, settings.log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)

proxy_manager = (
ProxyManager.initialize(
settings.proxy_file,
include_host=settings.proxy_include_host,
)
if settings.proxy_file
else None
)

registry = ServiceRegistry()
tiktok = create_tiktok_service(proxy_manager=proxy_manager)
registry.register(tiktok)
app.include_router(tiktok.router)

app.state.registry = registry

logger.info("Scraper API started")
yield

for service in registry.get_all():
if service.shutdown:
await service.shutdown()

logger.info("Scraper API stopped")


app = FastAPI(
title="Media Scraper API",
version="0.2.0",
lifespan=lifespan,
)


@app.exception_handler(ScraperError)
async def scraper_error_handler(request, exc: ScraperError):
status_code = _ERROR_STATUS_MAP.get(type(exc), 500)
return JSONResponse(
status_code=status_code,
content=ErrorResponse(
error=str(exc),
error_type=type(exc).__name__,
).model_dump(),
)


app.include_router(router)
Loading