diff --git a/pyproject.toml b/pyproject.toml index de8f3dc..2b79775 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,9 @@ dependencies = [ ] [project.optional-dependencies] +compress = [ + "Pillow>=10.0", +] dev = [ "pytest>=8.0", "ruff>=0.8", diff --git a/twitter_cli/cli.py b/twitter_cli/cli.py index 88784ec..9c249b1 100644 --- a/twitter_cli/cli.py +++ b/twitter_cli/cli.py @@ -1061,8 +1061,8 @@ def following(screen_name, max_count, as_json, as_yaml): _MAX_IMAGES = 4 # Twitter allows up to 4 images per tweet -def _upload_images(client, image_paths, rich_output=True): - # type: (TwitterClient, tuple, bool) -> list +def _upload_images(client, image_paths, rich_output=True, compress=None): + # type: (TwitterClient, tuple, bool, Optional[int]) -> list """Upload images and return list of media_id strings.""" if not image_paths: return [] @@ -1071,8 +1071,9 @@ def _upload_images(client, image_paths, rich_output=True): media_ids = [] for i, path in enumerate(image_paths, 1): if rich_output: - console.print("📤 Uploading image %d/%d: %s" % (i, len(image_paths), path)) - media_ids.append(client.upload_media(path)) + suffix = " (quality=%d)" % compress if compress else "" + console.print("📤 Uploading image %d/%d: %s%s" % (i, len(image_paths), path, suffix)) + media_ids.append(client.upload_media(path, compress=compress)) return media_ids @@ -1102,9 +1103,10 @@ def operation(client: TwitterClient) -> WritePayload: @click.argument("text") @click.option("--reply-to", "-r", default=None, help="Reply to this tweet ID.") @click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") +@click.option("--compress", type=click.IntRange(1, 100), default=None, help="Compress images at given quality (1-100). Requires Pillow.") @structured_output_options -def post(text, reply_to, images, as_json, as_yaml): - # type: (str, Optional[str], tuple, bool, bool) -> None +def post(text, reply_to, images, compress, as_json, as_yaml): + # type: (str, Optional[str], tuple, Optional[int], bool, bool) -> None """Post a new tweet. TEXT is the tweet content. Attach images with --image / -i (up to 4): @@ -1112,13 +1114,14 @@ def post(text, reply_to, images, as_json, as_yaml): \b twitter post "Hello!" --image photo.jpg twitter post "Gallery" -i a.png -i b.png -i c.jpg + twitter post "Compressed" -i big.jpg --compress 90 """ normalized_reply_to = _normalize_tweet_id(reply_to) if reply_to else None action = "Replying to %s" % normalized_reply_to if normalized_reply_to else "Posting tweet" rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - media_ids = _upload_images(client, images, rich_output=rich_output) + media_ids = _upload_images(client, images, rich_output=rich_output, compress=compress) tweet_id = client.create_tweet(text, reply_to_id=normalized_reply_to, media_ids=media_ids or None) return {"success": True, "action": "post", "id": tweet_id, "url": "https://x.com/i/status/%s" % tweet_id} @@ -1138,14 +1141,15 @@ def operation(client: TwitterClient) -> WritePayload: @click.argument("tweet_id") @click.argument("text") @click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") +@click.option("--compress", type=click.IntRange(1, 100), default=None, help="Compress images at given quality (1-100). Requires Pillow.") @structured_output_options -def reply_tweet(tweet_id, text, images, as_json, as_yaml): - # type: (str, str, tuple, bool, bool) -> None +def reply_tweet(tweet_id, text, images, compress, as_json, as_yaml): + # type: (str, str, tuple, Optional[int], bool, bool) -> None """Reply to a tweet. TWEET_ID is the tweet to reply to, TEXT is the reply content.""" tweet_id = _normalize_tweet_id(tweet_id) rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - media_ids = _upload_images(client, images, rich_output=rich_output) + media_ids = _upload_images(client, images, rich_output=rich_output, compress=compress) new_id = client.create_tweet(text, reply_to_id=tweet_id, media_ids=media_ids or None) return { "success": True, @@ -1171,14 +1175,15 @@ def operation(client: TwitterClient) -> WritePayload: @click.argument("tweet_id") @click.argument("text") @click.option("--image", "-i", "images", multiple=True, type=click.Path(exists=True), help="Attach image (up to 4). Repeatable.") +@click.option("--compress", type=click.IntRange(1, 100), default=None, help="Compress images at given quality (1-100). Requires Pillow.") @structured_output_options -def quote_tweet(tweet_id, text, images, as_json, as_yaml): - # type: (str, str, tuple, bool, bool) -> None +def quote_tweet(tweet_id, text, images, compress, as_json, as_yaml): + # type: (str, str, tuple, Optional[int], bool, bool) -> None """Quote-tweet a tweet. TWEET_ID is the tweet to quote, TEXT is the commentary.""" tweet_id = _normalize_tweet_id(tweet_id) rich_output = not _structured_mode(as_json=as_json, as_yaml=as_yaml) def operation(client: TwitterClient) -> WritePayload: - media_ids = _upload_images(client, images, rich_output=rich_output) + media_ids = _upload_images(client, images, rich_output=rich_output, compress=compress) new_id = client.quote_tweet(tweet_id, text, media_ids=media_ids or None) return { "success": True, diff --git a/twitter_cli/client.py b/twitter_cli/client.py index 6e7d707..23ae39d 100644 --- a/twitter_cli/client.py +++ b/twitter_cli/client.py @@ -451,9 +451,11 @@ def fetch_following(self, user_id, count=20): # ── Write operations ───────────────────────────────────────────── - # Supported image MIME types and max file size (5 MB) + # Supported image MIME types and max file sizes _SUPPORTED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"} - _MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB + _MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5 MB (JPEG, PNG, WebP) + _MAX_GIF_SIZE = 15 * 1024 * 1024 # 15 MB (animated GIF) + _CHUNK_SIZE = 1024 * 1024 # 1 MB per APPEND segment def _write_delay(self): # type: () -> None @@ -462,29 +464,48 @@ def _write_delay(self): logger.debug("Write operation delay: %.1fs", delay) time.sleep(delay) - def upload_media(self, file_path): - # type: (str) -> str + def upload_media(self, file_path, compress=None): + # type: (str, Optional[int]) -> str """Upload an image file to Twitter. Returns the media_id string. - Uses Twitter's chunked upload API (INIT → APPEND → FINALIZE). - Supports JPEG, PNG, GIF, and WebP images up to 5 MB. + Uses Twitter's chunked upload API (INIT -> APPEND -> FINALIZE). + Supports JPEG, PNG, GIF, and WebP. + - Images (JPEG/PNG/WebP): up to 5 MB, single-segment upload. + - GIFs: up to 15 MB, multi-segment chunked upload (1 MB chunks). + + Args: + file_path: Path to the image file. + compress: Optional quality level (1-100). If set, re-encodes + JPEG/PNG/WebP via Pillow before upload. GIFs are + not compressed. """ if not os.path.isfile(file_path): raise MediaUploadError("File not found: %s" % file_path) - file_size = os.path.getsize(file_path) - if file_size > self._MAX_IMAGE_SIZE: - raise MediaUploadError( - "File too large: %.1f MB (max %.0f MB)" - % (file_size / (1024 * 1024), self._MAX_IMAGE_SIZE / (1024 * 1024)) - ) - media_type = mimetypes.guess_type(file_path)[0] or "" if media_type not in self._SUPPORTED_IMAGE_TYPES: raise MediaUploadError( "Unsupported image format: %s (supported: jpeg, png, gif, webp)" % media_type ) + # ── Optional compression ────────────────────────────────────── + upload_bytes = None # type: Optional[bytes] + if compress is not None and media_type != "image/gif": + upload_bytes = self._compress_image(file_path, media_type, compress) + file_size = len(upload_bytes) + logger.info("Compressed %s: %d -> %d bytes (quality=%d)", + file_path, os.path.getsize(file_path), file_size, compress) + else: + file_size = os.path.getsize(file_path) + + # ── Size validation ─────────────────────────────────────────── + max_size = self._MAX_GIF_SIZE if media_type == "image/gif" else self._MAX_IMAGE_SIZE + if file_size > max_size: + raise MediaUploadError( + "File too large: %.1f MB (max %.0f MB)" + % (file_size / (1024 * 1024), max_size / (1024 * 1024)) + ) + upload_url = "https://upload.twitter.com/i/media/upload.json" session = _get_cffi_session() @@ -496,6 +517,8 @@ def upload_media(self, file_path): "total_bytes": str(file_size), "media_type": media_type, } + if media_type == "image/gif": + init_data["media_category"] = "tweet_gif" resp = session.post(upload_url, headers=headers, data=init_data, timeout=30) if resp.status_code >= 400: raise MediaUploadError("INIT failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) @@ -508,23 +531,11 @@ def upload_media(self, file_path): raise MediaUploadError("INIT did not return media_id") logger.info("Media INIT: media_id=%s", media_id) - # ── APPEND ─────────────────────────────────────────────────── - with open(file_path, "rb") as f: - media_data = base64.b64encode(f.read()).decode("ascii") - - headers = self._build_headers(url=upload_url, method="POST") - # Remove JSON content-type — curl_cffi handles multipart encoding - headers.pop("Content-Type", None) - append_data = { - "command": "APPEND", - "media_id": media_id, - "segment_index": "0", - "media_data": media_data, - } - resp = session.post(upload_url, headers=headers, data=append_data, timeout=60) - if resp.status_code >= 400: - raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) - logger.info("Media APPEND: segment 0 uploaded") + # ── APPEND (chunked for GIFs, single-segment for images) ───── + if media_type == "image/gif" and file_size > self._CHUNK_SIZE: + self._append_chunked(session, upload_url, media_id, file_path, upload_bytes) + else: + self._append_single(session, upload_url, media_id, file_path, upload_bytes) # ── FINALIZE ───────────────────────────────────────────────── headers = self._build_headers(url=upload_url, method="POST") @@ -540,6 +551,96 @@ def upload_media(self, file_path): return media_id + def _append_single(self, session, upload_url, media_id, file_path, upload_bytes=None): + # type: (Any, str, str, str, Optional[bytes]) -> None + """Upload media in a single APPEND request (raw binary multipart).""" + headers = self._build_headers(url=upload_url, method="POST") + headers.pop("Content-Type", None) + + if upload_bytes is not None: + import io + files = {"media": ("media", io.BytesIO(upload_bytes), "application/octet-stream")} + else: + files = {"media": ("media", open(file_path, "rb"), "application/octet-stream")} + + data = { + "command": "APPEND", + "media_id": media_id, + "segment_index": "0", + } + resp = session.post(upload_url, headers=headers, data=data, files=files, timeout=60) + if upload_bytes is None: + files["media"][1].close() + if resp.status_code >= 400: + raise MediaUploadError("APPEND failed (HTTP %d): %s" % (resp.status_code, resp.text[:300])) + logger.info("Media APPEND: segment 0 uploaded (raw binary)") + + def _append_chunked(self, session, upload_url, media_id, file_path, upload_bytes=None): + # type: (Any, str, str, str, Optional[bytes]) -> None + """Upload media in multiple APPEND requests (1 MB chunks).""" + import io + + if upload_bytes is not None: + source = io.BytesIO(upload_bytes) + else: + source = open(file_path, "rb") + + try: + segment_index = 0 + while True: + chunk = source.read(self._CHUNK_SIZE) + if not chunk: + break + headers = self._build_headers(url=upload_url, method="POST") + headers.pop("Content-Type", None) + + files = {"media": ("media", io.BytesIO(chunk), "application/octet-stream")} + data = { + "command": "APPEND", + "media_id": media_id, + "segment_index": str(segment_index), + } + resp = session.post(upload_url, headers=headers, data=data, files=files, timeout=60) + if resp.status_code >= 400: + raise MediaUploadError( + "APPEND segment %d failed (HTTP %d): %s" + % (segment_index, resp.status_code, resp.text[:300]) + ) + logger.info("Media APPEND: segment %d uploaded (%d bytes)", segment_index, len(chunk)) + segment_index += 1 + finally: + source.close() + + logger.info("Media APPEND: %d segments uploaded (chunked)", segment_index) + + @staticmethod + def _compress_image(file_path, media_type, quality): + # type: (str, str, int) -> bytes + """Compress an image using Pillow. Returns the re-encoded bytes.""" + try: + from PIL import Image + except ImportError: + raise MediaUploadError( + "Pillow is required for --compress. Install it with: " + "pip install twitter-cli[compress]" + ) + + img = Image.open(file_path) + buf = __import__("io").BytesIO() + + if media_type == "image/jpeg": + if img.mode in ("RGBA", "P"): + img = img.convert("RGB") + img.save(buf, format="JPEG", quality=quality, optimize=True) + elif media_type == "image/webp": + img.save(buf, format="WEBP", quality=quality, optimize=True) + elif media_type == "image/png": + img.save(buf, format="PNG", optimize=True) + else: + raise MediaUploadError("Compression not supported for %s" % media_type) + + return buf.getvalue() + def create_tweet(self, text, reply_to_id=None, media_ids=None): # type: (str, Optional[str], Optional[List[str]]) -> str """Post a new tweet. Returns the new tweet ID.