diff --git a/.gitignore b/.gitignore
index e550c2b..7cdcb2e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,6 @@ __pycache__/
.exports/
exports/
.env
+build/
+dist/
+.DS_Store
diff --git a/README.md b/README.md
index 01a843a..a02e53d 100644
--- a/README.md
+++ b/README.md
@@ -35,29 +35,148 @@ Additional background lives in the internal wiki: [Arkadium IT Knowledge Base](h
## Usage
+### Quick Start (Interactive Mode)
+
+The simplest way to export a chat is to run without any arguments:
+
+```bash
+teams-export
+```
+
+This will:
+1. Authenticate with Microsoft Graph
+2. Show an interactive menu with your 20 most recent chats
+3. Let you select the chat by number
+4. Export today's messages in Jira-friendly format
+
+### Export by User Email (1:1 chats)
+
+```bash
+teams-export --user "john.smith@company.com"
+```
+
+### Export by Chat Name (Group chats)
+
+```bash
+teams-export --chat "Project Alpha Team"
+```
+
+### Export with Date Range
+
+```bash
+# Specific dates
+teams-export --user "john.smith@company.com" --from 2025-10-23 --to 2025-10-25
+
+# Using keywords
+teams-export --user "john.smith@company.com" --from "last week" --to "today"
```
-teams-export --user "john.smith@company.com" --from 2025-10-23 --to 2025-10-23 --format json
+
+### Export in Different Formats
+
+```bash
+# Markdown (default) - works in Jira, GitHub, Confluence, etc.
+teams-export --user "john.smith@company.com" --format jira
+
+# JSON for programmatic processing
+teams-export --user "john.smith@company.com" --format json
+
+# CSV for spreadsheet analysis
+teams-export --user "john.smith@company.com" --format csv
```
-- `--user` targets 1:1 chats by participant name or email.
-- `--chat` targets group chats by display name.
-- `--from` / `--to` accept `YYYY-MM-DD`, `today`, or `last week`.
-- `--format` supports `json` (default) or `csv`.
+The default Markdown format includes:
+- Standard Markdown syntax (compatible with Jira, GitHub, Confluence)
+- Clickable links for attachments
+- Inline image rendering for shared images
+- Message quotes and formatting preserved
+
+### Other Options
+
- `--list` prints available chats with participants.
-- `--all` exports every chat in the provided window.
+- `--all` exports every chat in the provided window (uses parallel processing for speed).
- `--force-login` clears the cache and forces a new device code login.
+- `--refresh-cache` forces refresh of chat list (bypasses 24-hour cache).
+- `--output-dir` specifies where to save exports (default: `./exports/`).
+
+**Interactive Menu Controls:**
+- Enter number (1-20) to select a chat
+- Press `s` to search across all chats
+- Press `c` to refresh chat list from API
+- Press `q` to quit
+
+### Examples
+
+```bash
+# Interactive selection with custom date range
+teams-export --from "2025-10-01" --to "2025-10-31"
-Exports are saved under `./exports/` by default with filenames like `john_smith_2025-10-23.json`.
+# Export all chats from last week in parallel
+teams-export --all --from "last week" --format jira
-## Token Cache
+# List all available chats
+teams-export --list
+# Export specific user's chat for today
+teams-export --user "jane.doe@company.com"
+```
+
+Exports are saved under `./exports/` by default with filenames like `john_smith_2025-10-23.md` (for Markdown/Jira format) or `john_smith_2025-10-23.json`.
+
+## Caching
+
+### Token Cache
MSAL token cache is stored at `~/.teams-exporter/token_cache.json`. The cache refreshes automatically; re-run with `--force-login` to regenerate the device flow.
+### Chat List Cache
+To speed up repeated operations, the chat list is cached locally for 24 hours at `~/.teams-exporter/cache/chats_cache.json`.
+
+**First run:** Loads all chats from API (~30-60 seconds for 1000+ chats)
+**Subsequent runs (within 24h):** Instant load from cache
+
+To refresh the cache:
+- **Interactive menu**: Press `c` during chat selection to refresh and reload
+- **Command line**: Use `--refresh-cache` flag to force refresh before showing menu
+
+**Note:** Chats are sorted by last message timestamp (using `lastMessagePreview`), matching the behavior of the Teams desktop client.
+
+### Graph API Sorting Limitation
+
+The Microsoft Graph API's `/me/chats` endpoint does **not** support the `$orderby` query parameter ([see official documentation](https://learn.microsoft.com/en-us/graph/api/chat-list?view=graph-rest-1.0&tabs=http#optional-query-parameters)). This means:
+
+- Chats cannot be sorted server-side by last message time
+- All chats must be loaded to achieve correct chronological sorting
+- Client-side sorting is performed using `lastMessagePreview.createdDateTime`
+
+This is why the initial load fetches all chats (with progress indication) rather than loading only the most recent N chats. The 24-hour cache ensures subsequent runs are instant.
+
+## Features
+
+### Performance Optimizations
+- **Chat list caching**: 24-hour local cache makes repeated runs instant
+- **Parallel exports**: When using `--all`, exports multiple chats concurrently (up to 3 at once)
+- **Automatic retry**: Handles API rate limiting (429) and server errors (5xx) with exponential backoff
+- **Optimized pagination**: Fetches 50 messages per request (Graph API maximum)
+- **Smart filtering**: Stops fetching when messages are outside the date range
+
+### User Experience Improvements
+- **Interactive chat selection**: Beautiful menu with chat names, types, and last activity
+- **Multiple match handling**: If search finds multiple chats, shows menu instead of error
+- **Markdown format**: Standard Markdown output that works in Jira, GitHub, Confluence, and other platforms
+ - Clean HTML conversion (removes tags, preserves formatting)
+ - Blockquote formatting (`>`) for message content
+ - Standard Markdown headers (`##`, `###`) and emphasis (`**bold**`, `*italic*`)
+ - Attachment support with clickable links
+ - **Image support**: Images from chat attachments rendered as ``
+ - Reaction indicators
+ - Proper timestamp formatting
+- **Smart defaults**: Defaults to today's date if not specified
+- **Progress tracking**: Shows real-time progress for multi-chat exports
+
## Limitations
- Requires delegated permissions for the signed-in user.
- Attachments are referenced in the output but not downloaded.
-- Microsoft Graph API throttling is not yet handled with automatic retries.
+- Parallel exports limited to 3 concurrent requests to avoid API throttling.
## Security Notes
diff --git a/pyproject.toml b/pyproject.toml
index 627f05e..19cab67 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -15,7 +15,9 @@ dependencies = [
"typer>=0.12",
"requests>=2.32",
"msal>=1.28",
- "python-dateutil>=2.9"
+ "python-dateutil>=2.9",
+ "wcwidth>=0.2",
+ "python-docx>=1.0"
]
[project.scripts]
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..8c0f37e
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,9 @@
+"""Setup script for teams-export.
+
+This file is for compatibility with older build tools.
+The main configuration is in pyproject.toml.
+"""
+from setuptools import setup
+
+# Configuration is in pyproject.toml
+setup()
diff --git a/src/teams_export/cache.py b/src/teams_export/cache.py
new file mode 100644
index 0000000..798cc22
--- /dev/null
+++ b/src/teams_export/cache.py
@@ -0,0 +1,82 @@
+"""Local caching for chat lists to speed up repeated operations."""
+
+from __future__ import annotations
+
+import json
+import time
+from pathlib import Path
+from typing import List, Optional
+
+
+DEFAULT_CACHE_DIR = Path("~/.teams-exporter/cache").expanduser()
+CACHE_TTL_SECONDS = 86400 # 24 hours (1 day)
+
+
+class ChatCache:
+ """Simple file-based cache for chat lists."""
+
+ def __init__(self, cache_dir: Path = DEFAULT_CACHE_DIR):
+ self.cache_dir = cache_dir
+ self.cache_file = cache_dir / "chats_cache.json"
+
+ def get(self, user_id: str) -> Optional[List[dict]]:
+ """Get cached chats for a user if still valid.
+
+ Args:
+ user_id: User identifier (from token claims or 'me')
+
+ Returns:
+ List of chats if cache is valid, None otherwise
+ """
+ if not self.cache_file.exists():
+ return None
+
+ try:
+ with self.cache_file.open("r", encoding="utf-8") as f:
+ cache_data = json.load(f)
+
+ # Check if cache is for the same user
+ if cache_data.get("user_id") != user_id:
+ return None
+
+ # Check if cache is still fresh
+ cached_time = cache_data.get("timestamp", 0)
+ age = time.time() - cached_time
+ if age > CACHE_TTL_SECONDS:
+ return None
+
+ chats = cache_data.get("chats", [])
+ return chats if chats else None
+
+ except (json.JSONDecodeError, KeyError, OSError):
+ return None
+
+ def set(self, user_id: str, chats: List[dict]) -> None:
+ """Cache chat list for a user.
+
+ Args:
+ user_id: User identifier
+ chats: List of chat objects to cache
+ """
+ self.cache_dir.mkdir(parents=True, exist_ok=True)
+
+ cache_data = {
+ "user_id": user_id,
+ "timestamp": time.time(),
+ "chats": chats,
+ }
+
+ try:
+ with self.cache_file.open("w", encoding="utf-8") as f:
+ json.dump(cache_data, f, indent=2)
+ except OSError:
+ # Silently fail if can't write cache
+ pass
+
+ def clear(self) -> None:
+ """Clear the cache file."""
+ try:
+ if self.cache_file.exists():
+ self.cache_file.unlink()
+ except OSError:
+ pass
diff --git a/src/teams_export/cli.py b/src/teams_export/cli.py
index 375ad33..f9b3a8c 100644
--- a/src/teams_export/cli.py
+++ b/src/teams_export/cli.py
@@ -1,15 +1,20 @@
from __future__ import annotations
+import sys
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable
import typer
from .auth import AuthError, acquire_token
+from .cache import ChatCache
from .config import ConfigError, load_config
from .dates import DateParseError, resolve_range
from .exporter import ChatNotFoundError, choose_chat, export_chat
from .graph import GraphClient
+from .interactive import select_chat_interactive
app = typer.Typer(
add_completion=False,
@@ -47,6 +52,83 @@ def _print_chat_list(chats: Iterable[dict]) -> None:
)
+def _select_date_range_interactive() -> tuple[datetime, datetime] | None:
+ """Interactively select date range for export.
+
+ Returns:
+ Tuple of (start_dt, end_dt), or None if user cancels
+ """
+ typer.echo("\nπ
Select export period:")
+ typer.echo(" 1. Today (last 24 hours)")
+ typer.echo(" 2. Last 7 days")
+ typer.echo(" 3. Last 30 days")
+ typer.echo(" 4. Last 90 days")
+ typer.echo(" 5. All time (last 1 year)")
+ typer.echo(" 6. Custom date range")
+ typer.echo(" q. Cancel export")
+
+ choice = typer.prompt("\nYour choice", default="1").strip().lower()
+
+ if choice == "q":
+ return None
+
+ now = datetime.now().astimezone()
+
+ if choice == "1":
+ # Last 24 hours (today)
+ start_dt = now - timedelta(hours=24)
+ end_dt = now
+ elif choice == "2":
+ start_dt = now - timedelta(days=7)
+ end_dt = now
+ elif choice == "3":
+ start_dt = now - timedelta(days=30)
+ end_dt = now
+ elif choice == "4":
+ start_dt = now - timedelta(days=90)
+ end_dt = now
+ elif choice == "5":
+ start_dt = now - timedelta(days=365)
+ end_dt = now
+ elif choice == "6":
+ # Custom range
+ from_str = typer.prompt("Start date (YYYY-MM-DD, 'today', or 'last week')")
+ to_str = typer.prompt("End date (YYYY-MM-DD, 'today', or 'last week')", default="today")
+ try:
+ start_dt, end_dt = resolve_range(from_str, to_str)
+ except DateParseError as exc:
+ typer.secho(f"Invalid date: {exc}", fg=typer.colors.RED)
+ return None
+ else:
+ typer.secho("Invalid choice", fg=typer.colors.RED)
+ return None
+
+ typer.echo(f" β Period: {start_dt.date()} to {end_dt.date()}")
+ return start_dt, end_dt
+
+
+def _load_chats_with_progress(client: GraphClient) -> list[dict]:
+ """Load all chats from Microsoft Graph with progress indicator.
+
+ Returns:
+ List of chat objects
+ """
+ def show_progress(count: int) -> None:
+ sys.stdout.write(f"\rLoading chats... {count} loaded")
+ sys.stdout.flush()
+
+ typer.echo("Loading chats from Microsoft Graph...")
+ chats = client.list_chats(limit=None, progress_callback=show_progress)
+
+ # Clear progress line
+ if chats:
+ sys.stdout.write("\r" + " " * 50 + "\r")
+ sys.stdout.flush()
+ typer.secho(f"β Loaded {len(chats)} chats", fg=typer.colors.GREEN)
+
+ return chats
+
+
@app.command()
def main(
participant: str = typer.Option(
@@ -74,11 +156,11 @@ def main(
help='End date (YYYY-MM-DD, "today", or "last week").',
),
output_format: str = typer.Option(
- "json",
+ "jira",
"--format",
"-o",
case_sensitive=False,
- help="Export format: json or csv.",
+ help="Export format: jira (markdown), html, docx (Word - best for Jira/Confluence), json, or csv.",
),
output_dir: Path = typer.Option(
Path("exports"),
@@ -101,6 +183,16 @@ def main(
"--force-login",
help="Skip cache and refresh the device login flow.",
),
+ refresh_cache: bool = typer.Option(
+ False,
+ "--refresh-cache",
+ help="Force refresh of chat list cache.",
+ ),
+ download_attachments: bool = typer.Option(
+ True,
+ "--download-attachments/--no-download-attachments",
+ help="Download images and attachments locally (default: enabled).",
+ ),
) -> None:
try:
config = load_config()
@@ -108,63 +200,177 @@ def main(
typer.secho(f"Configuration error: {exc}", fg=typer.colors.RED)
raise typer.Exit(code=1)
- try:
- start_dt, end_dt = resolve_range(from_date, to_date)
- except DateParseError as exc:
- typer.secho(f"Invalid date input: {exc}", fg=typer.colors.RED)
- raise typer.Exit(code=2)
+ # Parse dates if provided, otherwise they'll be set interactively
+ start_dt = None
+ end_dt = None
+ if from_date or to_date:
+ try:
+ start_dt, end_dt = resolve_range(from_date, to_date)
+ except DateParseError as exc:
+ typer.secho(f"Invalid date input: {exc}", fg=typer.colors.RED)
+ raise typer.Exit(code=2)
typer.echo("Authenticating with Microsoft Graphβ¦")
try:
token = acquire_token(config, message_callback=typer.echo, force_refresh=force_login)
+ typer.secho("β Authenticated successfully", fg=typer.colors.GREEN)
except AuthError as exc:
typer.secho(f"Authentication failed: {exc}", fg=typer.colors.RED)
raise typer.Exit(code=3)
with GraphClient(token) as client:
- chats = client.list_chats()
+ # Try to load from cache first
+ cache = ChatCache()
+ user_id = "me" # Simple identifier for caching
+ chats = None
+
+ if not refresh_cache:
+ chats = cache.get(user_id)
+ if chats:
+ typer.secho(f"β Loaded {len(chats)} chats from cache (24h TTL, press 'c' in menu to refresh)", fg=typer.colors.CYAN)
+
+ # If no cache or refresh requested, load from API
+ if chats is None:
+ chats = _load_chats_with_progress(client)
+ # Save to cache for next time
+ if chats:
+ cache.set(user_id, chats)
+
if list_chats:
- typer.echo("Chat ID\tType\tTitle\tParticipants")
+ typer.echo("\nChat ID\tType\tTitle\tParticipants")
_print_chat_list(chats)
raise typer.Exit()
+ # Check if we found any chats
+ if not chats:
+ typer.secho("No chats found.", fg=typer.colors.YELLOW)
+ raise typer.Exit(code=0)
+
exports: list[tuple[str, Path, int]] = []
+ # Set default date range if not provided (for --all and --user/--chat modes)
+ if start_dt is None and (export_all or participant or chat_name):
+ # Default to last 24 hours (today) if dates not specified
+ now = datetime.now().astimezone()
+ start_dt = now - timedelta(hours=24)
+ end_dt = now
+ typer.echo(f"Using default date range: last 24 hours ({start_dt.date()} to {end_dt.date()})")
+
if export_all:
selected_chats = chats
else:
if not participant and not chat_name:
- prompt_value = typer.prompt("Enter chat partner name/email (leave blank to use chat name)", default="")
- if prompt_value:
- participant = prompt_value
+ # Interactive mode - show chat menu (with cache refresh support)
+ while True:
+ try:
+ chat = select_chat_interactive(
+ chats,
+ prompt_message="Select a chat to export:",
+ showing_limited=False,
+ )
+
+ # Check if user requested cache refresh
+ if isinstance(chat, dict) and chat.get("__action__") == "refresh_cache":
+ chats = _load_chats_with_progress(client)
+ if chats:
+ cache.set(user_id, chats)
+ continue # Show menu again with refreshed data
+
+ # Ask for date range if not provided
+ if start_dt is None:
+ date_range = _select_date_range_interactive()
+ if date_range is None:
+ typer.echo("Export cancelled")
+ raise typer.Exit(code=0)
+ start_dt, end_dt = date_range
+
+ selected_chats = [chat]
+ break
+ except typer.Abort:
+ raise typer.Exit(code=0)
+ else:
+ # Search mode - try to find by participant or chat name
+ try:
+ result = choose_chat(chats, participant=participant, chat_name=chat_name)
+ except ChatNotFoundError as exc:
+ typer.secho(str(exc), fg=typer.colors.RED)
+ raise typer.Exit(code=4)
+
+ # If multiple matches, let user choose interactively
+ if isinstance(result, list):
+ typer.echo(f"\nFound {len(result)} matching chats.")
+ try:
+ chat = select_chat_interactive(
+ result,
+ prompt_message="Multiple chats matched. Please select one:",
+ )
+ selected_chats = [chat]
+ except typer.Abort:
+ raise typer.Exit(code=0)
else:
- chat_name = typer.prompt("Enter chat display name", default="") or None
- try:
- chat = choose_chat(chats, participant=participant, chat_name=chat_name)
- except ChatNotFoundError as exc:
- typer.secho(str(exc), fg=typer.colors.RED)
- raise typer.Exit(code=4)
- selected_chats = [chat]
+ selected_chats = [result]
total_messages = 0
- for chat in selected_chats:
- title = _chat_title(chat)
- typer.echo(f"Exporting chat: {title}")
- try:
- output_path, count = export_chat(
- client,
- chat,
- start_dt,
- end_dt,
- output_dir=output_dir,
- output_format=output_format,
- )
- except ValueError as exc:
- typer.secho(str(exc), fg=typer.colors.RED)
- raise typer.Exit(code=5)
-
- exports.append((title, output_path, count))
- total_messages += count
+
+ # Use parallel processing for multiple chats
+ if len(selected_chats) > 1:
+ typer.echo(f"\nExporting {len(selected_chats)} chats in parallel...")
+
+ def export_single_chat(chat):
+ title = _chat_title(chat)
+ try:
+ output_path, count = export_chat(
+ client,
+ chat,
+ start_dt,
+ end_dt,
+ output_dir=output_dir,
+ output_format=output_format,
+ download_attachments=download_attachments,
+ )
+ return (title, output_path, count, None)
+ except Exception as exc:
+ return (title, None, 0, str(exc))
+
+ # Use ThreadPoolExecutor for parallel downloads (limited to 3 concurrent)
+ with ThreadPoolExecutor(max_workers=3) as executor:
+ futures = {executor.submit(export_single_chat, chat): chat for chat in selected_chats}
+
+ completed = 0
+ for future in as_completed(futures):
+ title, output_path, count, error = future.result()
+ completed += 1
+
+ if error:
+ typer.secho(f"[{completed}/{len(selected_chats)}] Failed: {title} - {error}", fg=typer.colors.RED)
+ else:
+ exports.append((title, output_path, count))
+ total_messages += count
+ typer.secho(
+ f"[{completed}/{len(selected_chats)}] Exported {count} messages from {title}",
+ fg=typer.colors.GREEN
+ )
+ else:
+ # Single chat - process directly
+ for chat in selected_chats:
+ title = _chat_title(chat)
+ typer.echo(f"Exporting chat: {title}")
+ try:
+ output_path, count = export_chat(
+ client,
+ chat,
+ start_dt,
+ end_dt,
+ output_dir=output_dir,
+ output_format=output_format,
+ download_attachments=download_attachments,
+ )
+ except ValueError as exc:
+ typer.secho(str(exc), fg=typer.colors.RED)
+ raise typer.Exit(code=5)
+
+ exports.append((title, output_path, count))
+ total_messages += count
for title, path, count in exports:
typer.echo(f"Exported {count} messages from {title}; saved to {path}")
diff --git a/src/teams_export/exporter.py b/src/teams_export/exporter.py
index fafa734..f67d396 100644
--- a/src/teams_export/exporter.py
+++ b/src/teams_export/exporter.py
@@ -5,10 +5,12 @@
import re
from pathlib import Path
from typing import Iterable, List, Sequence
+from urllib.parse import urlparse
from dateutil import parser
from .graph import GraphClient
+from .formatters import write_jira_markdown, write_html, write_docx
class ChatNotFoundError(RuntimeError):
@@ -36,8 +38,13 @@ def choose_chat(
*,
participant: str | None = None,
chat_name: str | None = None,
-) -> dict:
- """Select a chat by participant identifier or chat display name."""
+) -> dict | List[dict]:
+ """Select a chat by participant identifier or chat display name.
+
+ Returns:
+ Either a single chat dict if exactly one match, or a list of matches
+ if multiple chats matched the criteria.
+ """
name_norm = _normalise(chat_name) if chat_name else None
participant_norm = _normalise(participant) if participant else None
@@ -64,12 +71,11 @@ def choose_chat(
"No chat matches the provided identifiers. Try running with --list to"
" review available chats."
)
- if len(matches) > 1:
- ids = ", ".join(chat.get("id", "?") for chat in matches)
- raise ChatNotFoundError(
- f"Multiple chats matched the request. Narrow your query. Matches: {ids}"
- )
- return matches[0]
+ if len(matches) == 1:
+ return matches[0]
+
+ # Return all matches for interactive selection
+ return matches
def _normalise_filename(identifier: str) -> str:
@@ -78,8 +84,9 @@ def _normalise_filename(identifier: str) -> str:
def _transform_message(message: dict) -> dict:
- sender_info = message.get("from", {}).get("user", {})
- sender_fallback = message.get("from", {}).get("application", {})
+ from_field = message.get("from") or {}
+ sender_info = from_field.get("user") or {}
+ sender_fallback = from_field.get("application") or {}
sender_display = sender_info.get("displayName") or sender_fallback.get("displayName")
sender_email = sender_info.get("userPrincipalName") or sender_info.get("email")
@@ -130,6 +137,230 @@ def _write_csv(messages: Sequence[dict], output_path: Path) -> None:
writer.writerow({key: message.get(key) for key in fieldnames})
+def _get_extension_from_mime(mime_type: str) -> str:
+ """Get file extension from MIME type."""
+ mime_to_ext = {
+ # Images
+ 'image/png': '.png',
+ 'image/jpeg': '.jpg',
+ 'image/jpg': '.jpg',
+ 'image/gif': '.gif',
+ 'image/bmp': '.bmp',
+ 'image/webp': '.webp',
+ 'image/svg+xml': '.svg',
+ 'image/tiff': '.tiff',
+ 'image/x-icon': '.ico',
+ # Documents
+ 'application/pdf': '.pdf',
+ 'application/msword': '.doc',
+ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
+ 'application/vnd.ms-excel': '.xls',
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
+ 'application/vnd.ms-powerpoint': '.ppt',
+ 'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
+ 'application/vnd.oasis.opendocument.text': '.odt',
+ 'application/vnd.oasis.opendocument.spreadsheet': '.ods',
+ 'application/vnd.oasis.opendocument.presentation': '.odp',
+ # Archives
+ 'application/zip': '.zip',
+ 'application/x-zip-compressed': '.zip',
+ 'application/x-rar-compressed': '.rar',
+ 'application/x-7z-compressed': '.7z',
+ 'application/gzip': '.gz',
+ 'application/x-tar': '.tar',
+ # Text
+ 'text/plain': '.txt',
+ 'text/csv': '.csv',
+ 'text/html': '.html',
+ 'text/css': '.css',
+ 'text/javascript': '.js',
+ 'application/json': '.json',
+ 'application/xml': '.xml',
+ 'text/xml': '.xml',
+ 'text/markdown': '.md',
+ # Code
+ 'application/x-python': '.py',
+ 'text/x-python': '.py',
+ 'application/x-sh': '.sh',
+ # Video
+ 'video/mp4': '.mp4',
+ 'video/mpeg': '.mpeg',
+ 'video/quicktime': '.mov',
+ 'video/x-msvideo': '.avi',
+ 'video/webm': '.webm',
+ # Audio
+ 'audio/mpeg': '.mp3',
+ 'audio/wav': '.wav',
+ 'audio/ogg': '.ogg',
+ 'audio/webm': '.weba',
+ }
+ return mime_to_ext.get(mime_type.lower(), '.bin')
+
+
+def _download_attachment(client: GraphClient, url: str, output_path: Path) -> tuple[bool, str | None]:
+ """Download an attachment from a URL to local file.
+
+ Returns:
+ Tuple of (success: bool, content_type: str | None)
+ """
+ try:
+ # Use the authenticated session from GraphClient
+ response = client._session.get(url, timeout=30)
+ if response.status_code == 200:
+ output_path.write_bytes(response.content)
+ content_type = response.headers.get('Content-Type', '').split(';')[0].strip()
+ return True, content_type
+ else:
+ print(f"Failed to download {url}: HTTP {response.status_code}")
+ return False, None
+ except Exception as e:
+ print(f"Error downloading {url}: {e}")
+ return False, None
+
+
+def _extract_attachment_urls(messages: Sequence[dict]) -> List[tuple[str, bool]]:
+ """Extract all attachment URLs from messages (both inline images and file attachments).
+
+ Returns:
+ List of tuples (url, is_image) where is_image indicates if the attachment is an image.
+ """
+ import re
+
+ urls = []
+ for message in messages:
+ # Extract inline images from HTML content
+ content = message.get("content", "")
+ if content:
+ img_pattern = r'
]+src=["\']([^"\']+)["\'][^>]*>'
+ for match in re.finditer(img_pattern, content, flags=re.IGNORECASE):
+ url = match.group(1)
+ if url and url.startswith("http"):
+ urls.append((url, True)) # Inline images are always images
+
+ # Extract from attachments array
+ attachments = message.get("attachments", [])
+ for att in attachments:
+ # Try different possible URL fields
+ url = (
+ att.get("contentUrl") or
+ att.get("content") or
+ att.get("url") or
+ att.get("thumbnailUrl") or
+ (att.get("hostedContents", {}).get("contentUrl") if isinstance(att.get("hostedContents"), dict) else None)
+ )
+ if url and url.startswith("http"):
+ # Check if it's an image
+ content_type = att.get("contentType", "")
+ name = att.get("name", "")
+ is_image = (
+ content_type.startswith("image/") if content_type else
+ any(name.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.webp'])
+ )
+ urls.append((url, is_image))
+
+ return urls
+
+
+def _download_attachments(
+ client: GraphClient,
+ messages: Sequence[dict],
+ attachments_dir: Path,
+) -> dict[str, str]:
+ """Download all attachments (images and files) and return URL -> local path mapping.
+
+ Args:
+ client: Authenticated Graph API client
+ messages: List of message dictionaries
+ attachments_dir: Directory to save attachments
+
+ Returns:
+ Dictionary mapping original URL to local relative path
+ """
+ attachments_dir.mkdir(parents=True, exist_ok=True)
+
+ url_tuples = _extract_attachment_urls(messages)
+ unique_url_tuples = list(dict.fromkeys(url_tuples)) # Remove duplicates while preserving order
+
+ url_mapping = {}
+
+ if not unique_url_tuples:
+ return url_mapping
+
+ # Count images and non-images
+ image_count = sum(1 for _, is_image in unique_url_tuples if is_image)
+ file_count = len(unique_url_tuples) - image_count
+
+ if image_count and file_count:
+ print(f"\nDownloading {image_count} image(s) and {file_count} file(s)...")
+ elif image_count:
+ print(f"\nDownloading {image_count} image(s)...")
+ else:
+ print(f"\nDownloading {file_count} file(s)...")
+
+ for idx, (url, is_image) in enumerate(unique_url_tuples, 1):
+ # Generate base filename (without extension) from URL or use index
+ try:
+ parsed = urlparse(url)
+ path_parts = parsed.path.split('/')
+ # Try to get a meaningful name from the URL
+ if path_parts and path_parts[-1]:
+ base_filename = path_parts[-1]
+ # Remove extension if present, we'll add correct one later
+ if '.' in base_filename:
+ base_filename = base_filename.rsplit('.', 1)[0]
+ else:
+ # Use appropriate prefix based on file type
+ prefix = "image" if is_image else "file"
+ base_filename = f"{prefix}_{idx:03d}"
+ except Exception:
+ # Use appropriate prefix based on file type
+ prefix = "image" if is_image else "file"
+ base_filename = f"{prefix}_{idx:03d}"
+
+ # Sanitize base filename
+ base_filename = re.sub(r'[^\w\-]', '_', base_filename)
+
+ # Download to temporary path first to get Content-Type
+ temp_filename = f"{base_filename}_temp"
+ temp_path = attachments_dir / temp_filename
+
+ success, content_type = _download_attachment(client, url, temp_path)
+
+ if success:
+ # Determine correct extension from Content-Type
+ if content_type:
+ extension = _get_extension_from_mime(content_type)
+ else:
+ # Fallback based on type
+ extension = '.png' if is_image else '.bin'
+
+ # Create final filename with correct extension
+ final_filename = f"{base_filename}{extension}"
+ final_path = attachments_dir / final_filename
+
+ # Avoid overwriting if file already exists
+ counter = 1
+ while final_path.exists():
+ final_filename = f"{base_filename}_{counter}{extension}"
+ final_path = attachments_dir / final_filename
+ counter += 1
+
+ # Rename from temp to final name
+ temp_path.rename(final_path)
+
+ # Store relative path (relative to the markdown file)
+ relative_path = f"{attachments_dir.name}/{final_path.name}"
+ url_mapping[url] = relative_path
+ print(f" [{idx}/{len(unique_urls)}] Downloaded: {final_path.name}")
+ else:
+ # Clean up temp file if exists
+ if temp_path.exists():
+ temp_path.unlink()
+ print(f" [{idx}/{len(unique_urls)}] Failed: {url}")
+
+ return url_mapping
+
+
def export_chat(
client: GraphClient,
chat: dict,
@@ -138,6 +369,7 @@ def export_chat(
*,
output_dir: Path,
output_format: str = "json",
+ download_attachments: bool = True,
) -> tuple[Path, int]:
chat_id = chat.get("id")
if not chat_id:
@@ -149,7 +381,20 @@ def export_chat(
identifier = members[0] if members else chat_id
filename_stem = _normalise_filename(identifier)
output_dir.mkdir(parents=True, exist_ok=True)
- suffix = output_format.lower()
+
+ # Normalize format and determine extension
+ fmt = output_format.lower()
+ if fmt in ("jira", "jira-markdown", "markdown"):
+ suffix = "md"
+ fmt = "jira"
+ elif fmt == "html":
+ suffix = "html"
+ elif fmt in ("docx", "word"):
+ suffix = "docx"
+ fmt = "docx"
+ else:
+ suffix = fmt
+
if start_dt.date() == end_dt.date():
date_fragment = start_dt.date().isoformat()
else:
@@ -168,14 +413,59 @@ def _stop_condition(message: dict) -> bool:
raw_messages = client.list_chat_messages(chat_id, stop_condition=_stop_condition)
filtered_messages = [m for m in raw_messages if _within_range(m, start_dt, end_dt)]
+
+ # Sort messages from oldest to newest (Graph API returns newest first)
+ filtered_messages.sort(
+ key=lambda m: m.get("createdDateTime") or m.get("lastModifiedDateTime") or ""
+ )
+
messages = [_transform_message(m) for m in filtered_messages]
message_count = len(messages)
- if output_format.lower() == "json":
+ # Download attachments if requested (only for formats that support it)
+ url_mapping = {}
+ attachments_dir = None
+ if download_attachments and fmt in ("jira", "html", "docx") and messages:
+ # Create attachments directory next to output file
+ attachments_dir_name = output_path.stem + "_files"
+ attachments_dir = output_path.parent / attachments_dir_name
+ url_mapping = _download_attachments(client, messages, attachments_dir)
+
+ if fmt == "json":
_write_json(messages, output_path)
- elif output_format.lower() == "csv":
+ elif fmt == "csv":
_write_csv(messages, output_path)
+ elif fmt == "jira":
+ # Prepare chat metadata for Jira formatter
+ chat_title = chat.get("topic") or chat.get("displayName") or identifier
+ participants_list = _member_labels(chat)
+ chat_info = {
+ "title": chat_title,
+ "participants": ", ".join(participants_list) if participants_list else "N/A",
+ "date_range": f"{start_dt.date()} to {end_dt.date()}",
+ }
+ write_jira_markdown(messages, output_path, chat_info=chat_info, url_mapping=url_mapping)
+ elif fmt == "html":
+ # Prepare chat metadata for HTML formatter
+ chat_title = chat.get("topic") or chat.get("displayName") or identifier
+ participants_list = _member_labels(chat)
+ chat_info = {
+ "title": chat_title,
+ "participants": ", ".join(participants_list) if participants_list else "N/A",
+ "date_range": f"{start_dt.date()} to {end_dt.date()}",
+ }
+ write_html(messages, output_path, chat_info=chat_info, url_mapping=url_mapping)
+ elif fmt == "docx":
+ # Prepare chat metadata for Word document formatter
+ chat_title = chat.get("topic") or chat.get("displayName") or identifier
+ participants_list = _member_labels(chat)
+ chat_info = {
+ "title": chat_title,
+ "participants": ", ".join(participants_list) if participants_list else "N/A",
+ "date_range": f"{start_dt.date()} to {end_dt.date()}",
+ }
+ write_docx(messages, output_path, chat_info=chat_info, url_mapping=url_mapping)
else:
- raise ValueError("Unsupported export format. Choose json or csv.")
+ raise ValueError("Unsupported export format. Choose json, csv, jira, html, or docx.")
return output_path, message_count
diff --git a/src/teams_export/formatters.py b/src/teams_export/formatters.py
new file mode 100644
index 0000000..4bd4fb6
--- /dev/null
+++ b/src/teams_export/formatters.py
@@ -0,0 +1,787 @@
+"""Output formatters for Teams chat exports."""
+
+from __future__ import annotations
+
+import html
+import re
+import base64
+from typing import Sequence
+from pathlib import Path
+
+
+def _extract_images_from_html(content: str | None) -> list[dict]:
+ """Extract inline images from HTML content.
+
+ Returns list of dicts with 'src' and 'alt' keys.
+ """
+ if not content:
+ return []
+
+ images = []
+ # Find all
tags and extract src and alt attributes
+ img_pattern = r'
]+src=["\']([^"\']+)["\'][^>]*>'
+ for match in re.finditer(img_pattern, content, flags=re.IGNORECASE):
+ img_tag = match.group(0)
+ src = match.group(1)
+
+ # Try to extract alt text
+ alt_match = re.search(r'alt=["\']([^"\']*)["\']', img_tag, flags=re.IGNORECASE)
+ alt = alt_match.group(1) if alt_match else "image"
+
+ # Try to extract itemid for better name
+ itemid_match = re.search(r'itemid=["\']([^"\']+)["\']', img_tag, flags=re.IGNORECASE)
+ if itemid_match and itemid_match.group(1):
+ alt = itemid_match.group(1)
+
+ images.append({"src": src, "alt": alt})
+
+ return images
+
+
+def _strip_html(content: str | None) -> str:
+ """Remove HTML tags and decode entities to plain text."""
+ if not content:
+ return ""
+
+ # Decode HTML entities first
+ text = html.unescape(content)
+
+ # Remove
tags (they are extracted separately)
+ text = re.sub(r'
]+>', '', text, flags=re.IGNORECASE)
+
+ # Replace common HTML elements with markdown equivalents
+ text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
+ text = re.sub(r'
]*>', '\n', text, flags=re.IGNORECASE)
+ text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
+ text = re.sub(r']*>', '\n', text, flags=re.IGNORECASE)
+ text = re.sub(r'
', '\n', text, flags=re.IGNORECASE)
+
+ # Bold and italic
+ text = re.sub(r']*>(.*?)', r'*\1*', text, flags=re.IGNORECASE | re.DOTALL)
+ text = re.sub(r']*>(.*?)', r'*\1*', text, flags=re.IGNORECASE | re.DOTALL)
+ text = re.sub(r']*>(.*?)', r'_\1_', text, flags=re.IGNORECASE | re.DOTALL)
+ text = re.sub(r']*>(.*?)', r'_\1_', text, flags=re.IGNORECASE | re.DOTALL)
+
+ # Links - convert to [text](url) format
+ text = re.sub(r']*href=["\']([^"\']+)["\'][^>]*>(.*?)', r'\2 (\1)', text, flags=re.IGNORECASE | re.DOTALL)
+
+ # Remove all other HTML tags
+ text = re.sub(r'<[^>]+>', '', text)
+
+ # Clean up excessive whitespace
+ text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
+ text = text.strip()
+
+ return text
+
+
+def _format_jira_message(message: dict, index: int, url_mapping: dict[str, str] | None = None) -> str:
+ """Format a single message in standard Markdown.
+
+ Args:
+ message: Message dictionary
+ index: Message index
+ url_mapping: Optional mapping of remote URL to local file path
+ """
+ sender = message.get("sender") or "Unknown"
+ timestamp = message.get("timestamp", "")
+
+ # Format timestamp to be more readable
+ if timestamp:
+ # Extract just the date and time, skip milliseconds
+ try:
+ # Format: 2025-10-23T14:30:45.123Z -> 2025-10-23 14:30
+ timestamp_clean = timestamp.split('.')[0].replace('T', ' ')
+ if 'Z' in timestamp:
+ timestamp_clean = timestamp_clean.replace('Z', ' UTC')
+ except Exception:
+ timestamp_clean = timestamp
+ else:
+ timestamp_clean = "No timestamp"
+
+ # Extract inline images from HTML content first
+ html_content = message.get("content", "")
+ inline_images = _extract_images_from_html(html_content)
+
+ # Then strip HTML to get text content
+ content = _strip_html(html_content)
+
+ # Format attachments if present
+ attachments = message.get("attachments", [])
+ attachment_lines = []
+
+ # Add inline images first
+ for img in inline_images:
+ src = img.get("src", "")
+ alt = img.get("alt", "image")
+ if src:
+ # Use local path if available, otherwise use remote URL
+ display_url = url_mapping.get(src, src) if url_mapping else src
+ attachment_lines.append(f"")
+
+ # Then add file attachments
+ if attachments:
+ for att in attachments:
+ name = att.get("name") or "Attachment"
+ content_type = att.get("contentType", "")
+
+ # Try to get URL from different possible fields (in order of preference)
+ url = (
+ att.get("contentUrl") or
+ att.get("content") or
+ att.get("url") or
+ att.get("thumbnailUrl") or
+ (att.get("hostedContents", {}).get("contentUrl") if isinstance(att.get("hostedContents"), dict) else None)
+ )
+
+ # Check if it's an image
+ is_image = (
+ content_type.startswith("image/") if content_type else
+ any(name.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.webp'])
+ )
+
+ if is_image and url:
+ # Use local path if available, otherwise use remote URL
+ display_url = url_mapping.get(url, url) if url_mapping else url
+ # Format as markdown image
+ attachment_lines.append(f"")
+ elif url:
+ # Use local path if available, otherwise use remote URL
+ display_url = url_mapping.get(url, url) if url_mapping else url
+ # Format as markdown link
+ attachment_lines.append(f"π [{name}]({display_url})")
+ else:
+ # Just show the name if no URL found
+ attachment_lines.append(f"π {name} (no URL)")
+
+ # Handle empty content
+ if not content:
+ content_type = message.get("type", "")
+ if content_type == "systemEventMessage":
+ content = "[System event]"
+ elif not attachment_lines:
+ # Only show "[No content]" if there are no attachments either
+ content = "[No content]"
+
+ # Format reactions if present
+ reactions = message.get("reactions", [])
+ reaction_text = ""
+ if reactions:
+ reaction_emojis = []
+ for reaction in reactions:
+ reaction_type = reaction.get("reactionType", "")
+ if reaction_type:
+ reaction_emojis.append(reaction_type)
+ if reaction_emojis:
+ reaction_text = f" [{', '.join(reaction_emojis)}]"
+
+ # Build the message block in standard Markdown format
+ lines = [
+ f"**{sender}** β *{timestamp_clean}*{reaction_text}",
+ "",
+ ]
+
+ # Add content if present
+ if content:
+ # Format content as blockquote (add '> ' prefix to each line)
+ content_lines = content.split('\n')
+ quoted_content = '\n'.join(f"> {line}" if line else ">" for line in content_lines)
+ lines.append(quoted_content)
+ lines.append("")
+
+ # Add attachments if present
+ if attachment_lines:
+ lines.extend(attachment_lines)
+ lines.append("")
+
+ return "\n".join(lines)
+
+
+def write_jira_markdown(
+ messages: Sequence[dict],
+ output_path: Path,
+ chat_info: dict | None = None,
+ url_mapping: dict[str, str] | None = None,
+) -> None:
+ """Write messages in standard Markdown format (works in Jira, GitHub, and other platforms).
+
+ Args:
+ messages: List of message dictionaries
+ output_path: Path to write markdown file
+ chat_info: Optional chat metadata (title, participants, date range)
+ url_mapping: Optional mapping of remote URLs to local file paths
+ """
+
+ lines = []
+
+ # Add header with chat info
+ if chat_info:
+ chat_title = chat_info.get("title", "Teams Chat Export")
+ participants = chat_info.get("participants", "")
+ date_range = chat_info.get("date_range", "")
+
+ lines.append(f"## {chat_title}")
+ lines.append("")
+ if participants:
+ lines.append(f"**Participants:** {participants}")
+ if date_range:
+ lines.append(f"**Date Range:** {date_range}")
+ lines.append("")
+ lines.append("---")
+ lines.append("")
+
+ # Add messages
+ if messages:
+ lines.append(f"### Messages ({len(messages)} total)")
+ lines.append("")
+
+ for idx, message in enumerate(messages, 1):
+ lines.append(_format_jira_message(message, idx, url_mapping=url_mapping))
+ # No extra empty line needed - _format_jira_message adds it
+ else:
+ lines.append("*No messages found in the specified date range.*")
+
+ # Write to file
+ content = "\n".join(lines)
+ output_path.write_text(content, encoding="utf-8")
+
+
+def _image_to_base64(image_path: Path) -> str | None:
+ """Convert image file to base64 data URL.
+
+ Returns:
+ Data URL string like "data:image/png;base64,iVBORw0KG..." or None if failed
+ """
+ try:
+ # Read image bytes
+ image_bytes = image_path.read_bytes()
+
+ # Encode to base64
+ base64_data = base64.b64encode(image_bytes).decode('utf-8')
+
+ # Determine MIME type from extension
+ ext = image_path.suffix.lower()
+ mime_type = {
+ '.png': 'image/png',
+ '.jpg': 'image/jpeg',
+ '.jpeg': 'image/jpeg',
+ '.gif': 'image/gif',
+ '.bmp': 'image/bmp',
+ '.webp': 'image/webp',
+ '.svg': 'image/svg+xml',
+ '.tiff': 'image/tiff',
+ '.tif': 'image/tiff',
+ }.get(ext, 'image/png')
+
+ return f"data:{mime_type};base64,{base64_data}"
+ except Exception as e:
+ print(f"Warning: Failed to encode {image_path}: {e}")
+ return None
+
+
+def _format_html_message(message: dict, index: int, url_mapping: dict[str, str] | None = None, base_dir: Path | None = None) -> str:
+ """Format a single message as HTML with embedded images.
+
+ Args:
+ message: Message dictionary
+ index: Message index
+ url_mapping: Mapping of remote URLs to local file paths
+ base_dir: Base directory for resolving relative image paths
+ """
+ sender = message.get("sender") or "Unknown"
+ timestamp = message.get("timestamp", "")
+
+ # Format timestamp
+ if timestamp:
+ try:
+ timestamp_clean = timestamp.split('.')[0].replace('T', ' ')
+ if 'Z' in timestamp:
+ timestamp_clean = timestamp_clean.replace('Z', ' UTC')
+ except Exception:
+ timestamp_clean = timestamp
+ else:
+ timestamp_clean = "No timestamp"
+
+ # Extract inline images from HTML content first
+ html_content = message.get("content", "")
+ inline_images = _extract_images_from_html(html_content)
+
+ # Strip HTML to get text content
+ content = _strip_html(html_content)
+
+ # Escape HTML in content
+ content = html.escape(content) if content else ""
+
+ # Replace newlines with
+ content = content.replace('\n', '
')
+
+ # Format attachments
+ attachments = message.get("attachments", [])
+ attachment_html = []
+
+ # Add inline images first
+ for img in inline_images:
+ src = img.get("src", "")
+ alt = img.get("alt", "image")
+ if src:
+ # Try to get local path from url_mapping
+ local_path = url_mapping.get(src) if url_mapping else None
+
+ if local_path and base_dir:
+ # Convert local file to base64
+ try:
+ img_path = base_dir / local_path
+ if img_path.exists():
+ data_url = _image_to_base64(img_path)
+ if data_url:
+ src = data_url
+ except Exception:
+ pass # Keep original URL if conversion fails
+
+ attachment_html.append(f'
')
+
+ # Then add file attachments
+ if attachments:
+ for att in attachments:
+ name = att.get("name") or "Attachment"
+ content_type = att.get("contentType", "")
+
+ url = (
+ att.get("contentUrl") or
+ att.get("content") or
+ att.get("url") or
+ att.get("thumbnailUrl") or
+ (att.get("hostedContents", {}).get("contentUrl") if isinstance(att.get("hostedContents"), dict) else None)
+ )
+
+ is_image = (
+ content_type.startswith("image/") if content_type else
+ any(name.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.webp'])
+ )
+
+ if is_image and url:
+ # Try to get local path from url_mapping
+ local_path = url_mapping.get(url) if url_mapping else None
+
+ if local_path and base_dir:
+ # Convert local file to base64
+ try:
+ img_path = base_dir / local_path
+ if img_path.exists():
+ data_url = _image_to_base64(img_path)
+ if data_url:
+ url = data_url
+ except Exception:
+ pass # Keep original URL if conversion fails
+
+ attachment_html.append(f'
')
+ elif url:
+ # Try to get local path from url_mapping for non-image attachments
+ local_path = url_mapping.get(url) if url_mapping else None
+ display_url = local_path if local_path else url
+ attachment_html.append(f'π {html.escape(name)}
')
+ else:
+ attachment_html.append(f'π {html.escape(name)} (no URL)
')
+
+ # Handle empty content
+ if not content:
+ content_type = message.get("type", "")
+ if content_type == "systemEventMessage":
+ content = "[System event]"
+ elif not attachment_html:
+ content = "[No content]"
+
+ # Format reactions
+ reactions = message.get("reactions", [])
+ reaction_html = ""
+ if reactions:
+ reaction_emojis = []
+ for reaction in reactions:
+ reaction_type = reaction.get("reactionType", "")
+ if reaction_type:
+ reaction_emojis.append(html.escape(reaction_type))
+ if reaction_emojis:
+ reaction_html = f" [{', '.join(reaction_emojis)}]"
+
+ # Build HTML message block
+ html_parts = [
+ f'',
+ f'
{html.escape(sender)} β {timestamp_clean}{reaction_html}
',
+ ]
+
+ if content:
+ html_parts.append(f'
{content}
')
+
+ if attachment_html:
+ html_parts.append('
')
+ html_parts.extend(attachment_html)
+ html_parts.append('
')
+
+ html_parts.append('
')
+
+ return "\n".join(html_parts)
+
+
+def write_html(
+ messages: Sequence[dict],
+ output_path: Path,
+ chat_info: dict | None = None,
+ url_mapping: dict[str, str] | None = None,
+) -> None:
+ """Write messages as HTML with embedded base64 images.
+
+ This format is perfect for copy-pasting into Jira/Confluence:
+ 1. Open the HTML file in a browser
+ 2. Select all (Ctrl+A)
+ 3. Copy (Ctrl+C)
+ 4. Paste into Jira/Confluence - images will be embedded!
+
+ Args:
+ messages: List of message dictionaries
+ output_path: Path to write HTML file
+ chat_info: Optional chat metadata (title, participants, date range)
+ url_mapping: Optional mapping of remote URLs to local file paths
+ """
+ html_parts = [
+ '',
+ '',
+ '',
+ ' ',
+ ' ',
+ ' Teams Chat Export',
+ ' ',
+ ' ',
+ '',
+ '',
+ ' ',
+ ' ',
+ ' ',
+ ]
+
+ # Add header with chat info
+ if chat_info:
+ chat_title = chat_info.get("title", "Teams Chat Export")
+ participants = chat_info.get("participants", "")
+ date_range = chat_info.get("date_range", "")
+
+ html_parts.append(f'
{html.escape(chat_title)}
')
+ if participants:
+ html_parts.append(f'
Participants: {html.escape(participants)}
')
+ if date_range:
+ html_parts.append(f'
Date Range: {html.escape(date_range)}
')
+ html_parts.append('
')
+
+ # Add messages
+ if messages:
+ html_parts.append(f'
Messages ({len(messages)} total)
')
+
+ for idx, message in enumerate(messages, 1):
+ html_parts.append(_format_html_message(message, idx, url_mapping=url_mapping, base_dir=output_path.parent))
+ else:
+ html_parts.append('
No messages found in the specified date range.
')
+
+ html_parts.extend([
+ '
', # Close content div
+ '',
+ '',
+ ])
+
+ # Write to file
+ content = "\n".join(html_parts)
+ output_path.write_text(content, encoding="utf-8")
+
+
+def write_docx(
+ messages: Sequence[dict],
+ output_path: Path,
+ chat_info: dict | None = None,
+ url_mapping: dict[str, str] | None = None,
+) -> None:
+ """Write messages as Word document with embedded images.
+
+ This format is perfect for copy-pasting into Jira/Confluence:
+ 1. Open the .docx file in Word (or LibreOffice)
+ 2. Select all (Ctrl+A)
+ 3. Copy (Ctrl+C)
+ 4. Paste into Jira/Confluence - images will be embedded!
+
+ Args:
+ messages: List of message dictionaries
+ output_path: Path to write .docx file
+ chat_info: Optional chat metadata (title, participants, date range)
+ url_mapping: Optional mapping of remote URLs to local file paths
+ """
+ try:
+ from docx import Document
+ from docx.shared import Inches, Pt, RGBColor
+ from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
+ except ImportError:
+ raise ImportError(
+ "python-docx is required for Word document export. "
+ "Install it with: pip install python-docx"
+ )
+
+ doc = Document()
+
+ # Add title and metadata
+ if chat_info:
+ title = doc.add_heading(chat_info.get("title", "Teams Chat Export"), level=1)
+ title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
+
+ if chat_info.get("participants"):
+ p = doc.add_paragraph()
+ p.add_run("Participants: ").bold = True
+ p.add_run(chat_info["participants"])
+
+ if chat_info.get("date_range"):
+ p = doc.add_paragraph()
+ p.add_run("Date Range: ").bold = True
+ p.add_run(chat_info["date_range"])
+
+ doc.add_paragraph("_" * 50) # Separator
+
+ # Get base directory for resolving image paths
+ base_dir = output_path.parent
+
+ # Add each message
+ for idx, message in enumerate(messages, 1):
+ sender = message.get("sender") or "Unknown"
+ timestamp = message.get("timestamp", "")
+
+ # Format timestamp to be more readable
+ if timestamp:
+ try:
+ # Format: 2025-10-23T14:30:45.123Z -> 2025-10-23 14:30
+ timestamp_clean = timestamp.split('.')[0].replace('T', ' ')
+ if 'Z' in timestamp:
+ timestamp_clean = timestamp_clean.replace('Z', ' UTC')
+ except Exception:
+ timestamp_clean = timestamp
+ else:
+ timestamp_clean = "No timestamp"
+
+ # Message header (sender and timestamp)
+ p = doc.add_paragraph()
+ sender_run = p.add_run(f"{sender}")
+ sender_run.bold = True
+ sender_run.font.size = Pt(11)
+ sender_run.font.color.rgb = RGBColor(0, 120, 212) # Microsoft blue
+
+ time_run = p.add_run(f" β {timestamp_clean}")
+ time_run.font.size = Pt(10)
+ time_run.font.color.rgb = RGBColor(102, 102, 102) # Gray
+
+ # Add reactions if present
+ reactions = message.get("reactions", [])
+ if reactions:
+ reaction_emojis = []
+ for reaction in reactions:
+ reaction_type = reaction.get("reactionType", "")
+ if reaction_type:
+ reaction_emojis.append(reaction_type)
+ if reaction_emojis:
+ reaction_run = p.add_run(f" [{', '.join(reaction_emojis)}]")
+ reaction_run.font.color.rgb = RGBColor(102, 102, 102)
+
+ # Message content
+ html_content = message.get("content", "")
+ text_content = _strip_html(html_content)
+ if text_content:
+ p = doc.add_paragraph(text_content)
+ p.paragraph_format.left_indent = Inches(0.3)
+
+ # Extract and add inline images
+ inline_images = _extract_images_from_html(html_content)
+
+ for img in inline_images:
+ src = img.get("src", "")
+ # Try to get local path from url_mapping
+ local_path = url_mapping.get(src) if url_mapping else None
+
+ if local_path and base_dir:
+ img_path = base_dir / local_path
+ if img_path.exists():
+ try:
+ # Add image with max width of 5 inches
+ p = doc.add_paragraph()
+ run = p.add_run()
+ run.add_picture(str(img_path), width=Inches(5))
+ p.paragraph_format.left_indent = Inches(0.3)
+ except Exception as e:
+ # If image can't be added, add a note
+ p = doc.add_paragraph(f"[Image: {img.get('alt', 'image')} - failed to embed: {e}]")
+ p.paragraph_format.left_indent = Inches(0.3)
+
+ # Process file attachments
+ attachments = message.get("attachments", [])
+ if attachments:
+ for att in attachments:
+ name = att.get("name") or "Attachment"
+ content_type = att.get("contentType", "")
+
+ url = (
+ att.get("contentUrl") or
+ att.get("content") or
+ att.get("url") or
+ att.get("thumbnailUrl") or
+ (att.get("hostedContents", {}).get("contentUrl") if isinstance(att.get("hostedContents"), dict) else None)
+ )
+
+ is_image = (
+ content_type.startswith("image/") if content_type else
+ any(name.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.webp'])
+ )
+
+ if is_image and url:
+ # Try to get local path from url_mapping
+ local_path = url_mapping.get(url) if url_mapping else None
+
+ if local_path and base_dir:
+ img_path = base_dir / local_path
+ if img_path.exists():
+ try:
+ # Add image with max width of 5 inches
+ p = doc.add_paragraph()
+ run = p.add_run()
+ run.add_picture(str(img_path), width=Inches(5))
+ p.paragraph_format.left_indent = Inches(0.3)
+ except Exception as e:
+ # If image can't be added, add a note
+ p = doc.add_paragraph(f"[Image: {name} - failed to embed: {e}]")
+ p.paragraph_format.left_indent = Inches(0.3)
+ elif url:
+ # Non-image attachment - add as hyperlink
+ local_path = url_mapping.get(url) if url_mapping else None
+ display_url = local_path if local_path else url
+
+ p = doc.add_paragraph()
+ p.paragraph_format.left_indent = Inches(0.3)
+ run = p.add_run("π ")
+ hyperlink_run = p.add_run(name)
+ hyperlink_run.font.color.rgb = RGBColor(0, 0, 255)
+ hyperlink_run.font.underline = True
+ # Note: Word hyperlinks require more complex code, so we just style it
+ # Users can click the file in the _files folder directly
+
+ # Add spacing between messages
+ doc.add_paragraph()
+
+ # Save document
+ doc.save(str(output_path))
diff --git a/src/teams_export/graph.py b/src/teams_export/graph.py
index eeb22fc..1419eb2 100644
--- a/src/teams_export/graph.py
+++ b/src/teams_export/graph.py
@@ -1,11 +1,14 @@
from __future__ import annotations
+import time
from typing import Callable, Dict, Iterable, Iterator, List, Optional
import requests
GRAPH_BASE_URL = "https://graph.microsoft.com/v1.0"
DEFAULT_TIMEOUT = 60
+MAX_RETRIES = 4
+INITIAL_RETRY_DELAY = 2.0 # seconds
class GraphError(RuntimeError):
@@ -24,23 +27,96 @@ def __init__(self, token: str, base_url: str = GRAPH_BASE_URL) -> None:
)
self._base_url = base_url.rstrip("/")
+ def _request_with_retry(
+ self,
+ url: str,
+ params: Dict[str, str] | None = None,
+ ) -> requests.Response:
+ """Execute a GET request with exponential backoff retry on rate limiting."""
+ last_exception = None
+
+ for attempt in range(MAX_RETRIES):
+ try:
+ resp = self._session.get(url, params=params, timeout=DEFAULT_TIMEOUT)
+
+ # Handle rate limiting (429) with retry
+ if resp.status_code == 429:
+ retry_after = resp.headers.get("Retry-After")
+ if retry_after:
+ try:
+ wait_time = int(retry_after)
+ except ValueError:
+ wait_time = INITIAL_RETRY_DELAY * (2 ** attempt)
+ else:
+ wait_time = INITIAL_RETRY_DELAY * (2 ** attempt)
+
+ if attempt < MAX_RETRIES - 1:
+ print(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{MAX_RETRIES}...")
+ time.sleep(wait_time)
+ continue
+ else:
+ raise GraphError(self._format_error(resp))
+
+ # Handle other 5xx errors with retry
+ if 500 <= resp.status_code < 600:
+ if attempt < MAX_RETRIES - 1:
+ wait_time = INITIAL_RETRY_DELAY * (2 ** attempt)
+ print(f"Server error {resp.status_code}. Retrying in {wait_time}s...")
+ time.sleep(wait_time)
+ continue
+
+ # Success or non-retryable error
+ return resp
+
+ except requests.exceptions.RequestException as exc:
+ last_exception = exc
+ if attempt < MAX_RETRIES - 1:
+ wait_time = INITIAL_RETRY_DELAY * (2 ** attempt)
+ print(f"Network error: {exc}. Retrying in {wait_time}s...")
+ time.sleep(wait_time)
+ continue
+
+ # If we exhausted retries
+ if last_exception:
+ raise GraphError(f"Request failed after {MAX_RETRIES} attempts: {last_exception}")
+ raise GraphError(f"Request failed after {MAX_RETRIES} attempts")
+
def _paginate(
self,
url: str,
params: Dict[str, str] | None = None,
*,
stop_condition: Optional[Callable[[dict], bool]] = None,
+ progress_callback: Optional[Callable[[int], None]] = None,
+ max_items: Optional[int] = None,
) -> Iterator[dict]:
+ """Paginate through API results with optional progress tracking and limits.
+
+ Args:
+ url: API endpoint URL
+ params: Query parameters for first request
+ stop_condition: Function that returns True to stop iteration
+ progress_callback: Called with count after each page is fetched
+ max_items: Maximum number of items to fetch (None = unlimited)
+ """
+ count = 0
while url:
- resp = self._session.get(url, params=params, timeout=DEFAULT_TIMEOUT)
+ resp = self._request_with_retry(url, params=params)
params = None # Only include params on first request.
if resp.status_code >= 400:
raise GraphError(self._format_error(resp))
payload = resp.json()
for item in payload.get("value", []):
yield item
+ count += 1
if stop_condition and stop_condition(item):
return
+ if max_items and count >= max_items:
+ return
+
+ if progress_callback:
+ progress_callback(count)
+
url = payload.get("@odata.nextLink")
def _format_error(self, response: requests.Response) -> str:
@@ -55,12 +131,32 @@ def _format_error(self, response: requests.Response) -> str:
return f"Graph API error {code or response.status_code}: {message}"
return f"Graph API error {response.status_code}: {base}"
- def list_chats(self) -> List[dict]:
+ def list_chats(
+ self,
+ *,
+ limit: Optional[int] = None,
+ progress_callback: Optional[Callable[[int], None]] = None,
+ ) -> List[dict]:
+ """List accessible chats with optional limit and progress tracking.
+
+ Args:
+ limit: Maximum number of chats to fetch (None = all chats)
+ progress_callback: Function called with count after each page
+
+ Returns:
+ List of chat objects with expanded members and lastMessagePreview
+ """
url = f"{self._base_url}/me/chats"
params = {
- "$expand": "members",
+ "$expand": "members,lastMessagePreview",
+ "$top": "50", # Fetch 50 chats per request
}
- return list(self._paginate(url, params=params))
+ return list(self._paginate(
+ url,
+ params=params,
+ max_items=limit,
+ progress_callback=progress_callback,
+ ))
def list_chat_messages(
self,
@@ -70,7 +166,7 @@ def list_chat_messages(
) -> List[dict]:
url = f"{self._base_url}/me/chats/{chat_id}/messages"
params = {
- "$top": "50",
+ "$top": "50", # Graph API maximum for chat messages endpoint
}
return list(self._paginate(url, params=params, stop_condition=stop_condition))
diff --git a/src/teams_export/interactive.py b/src/teams_export/interactive.py
new file mode 100644
index 0000000..8dc4a38
--- /dev/null
+++ b/src/teams_export/interactive.py
@@ -0,0 +1,314 @@
+"""Interactive chat selection utilities."""
+
+from __future__ import annotations
+
+import sys
+from typing import List, Sequence
+
+import typer
+import wcwidth
+
+
+def _visual_width(text: str) -> int:
+ """Calculate the visual width of text in terminal (handles emoji correctly)."""
+ return wcwidth.wcswidth(text)
+
+
+def _truncate_to_width(text: str, max_width: int, ellipsis: str = "...") -> str:
+ """Truncate text to fit within visual width, accounting for emoji.
+
+ Args:
+ text: Text to truncate
+ max_width: Maximum visual width in terminal
+ ellipsis: String to append when truncating
+
+ Returns:
+ Truncated text that fits within max_width
+ """
+ if _visual_width(text) <= max_width:
+ return text
+
+ ellipsis_width = _visual_width(ellipsis)
+ target_width = max_width - ellipsis_width
+
+ if target_width <= 0:
+ return ellipsis[:max_width]
+
+ # Build string up to target width
+ result = ""
+ current_width = 0
+
+ for char in text:
+ char_width = wcwidth.wcwidth(char)
+ if char_width < 0: # Control characters
+ char_width = 0
+
+ if current_width + char_width > target_width:
+ break
+
+ result += char
+ current_width += char_width
+
+ return result + ellipsis
+
+
+def _pad_to_width(text: str, target_width: int) -> str:
+ """Pad text to target visual width with spaces.
+
+ Args:
+ text: Text to pad
+ target_width: Target visual width
+
+ Returns:
+ Text padded with spaces to reach target_width
+ """
+ current_width = _visual_width(text)
+ if current_width >= target_width:
+ return text
+
+ padding_needed = target_width - current_width
+ return text + (" " * padding_needed)
+
+
+def _chat_display_name(chat: dict) -> str:
+ """Get a readable display name for a chat."""
+ topic = chat.get("topic") or chat.get("displayName")
+ if topic:
+ return topic
+
+ members = chat.get("members", [])
+ if members:
+ names = []
+ for m in members:
+ name = m.get("displayName") or m.get("email")
+ if name:
+ names.append(name)
+ if names:
+ return ", ".join(names)
+
+ return chat.get("id", "Unknown chat")
+
+
+def _chat_type_label(chat: dict) -> str:
+ """Get a human-readable chat type label."""
+ chat_type = chat.get("chatType", "").lower()
+ if chat_type == "oneonone":
+ return "1:1"
+ elif chat_type == "group":
+ return "Group"
+ elif chat_type == "meeting":
+ return "Meeting"
+ return chat_type.title() if chat_type else "Unknown"
+
+
+def _chat_last_updated(chat: dict) -> str:
+ """Extract last message timestamp for sorting.
+
+ Uses lastMessagePreview.createdDateTime which reflects the actual
+ last message time (what desktop Teams uses for sorting).
+ Falls back to lastUpdatedDateTime if preview not available.
+ """
+ # Try to get last message timestamp (most accurate)
+ last_message_preview = chat.get("lastMessagePreview")
+ if last_message_preview and isinstance(last_message_preview, dict):
+ created = last_message_preview.get("createdDateTime")
+ if created:
+ return created
+
+ # Fallback to chat's lastUpdatedDateTime
+ return chat.get("lastUpdatedDateTime", "")
+
+
+def select_chat_interactive(
+ chats: Sequence[dict],
+ *,
+ prompt_message: str = "Select a chat:",
+ show_limit: int = 20,
+ showing_limited: bool = False,
+) -> dict:
+ """Present an interactive menu to choose from multiple chats.
+
+ Args:
+ chats: List of chat objects to choose from
+ prompt_message: Message to display before the menu
+ show_limit: Maximum number of chats to show initially
+ showing_limited: Whether we're showing a limited subset of all chats
+
+ Returns:
+ Selected chat object
+
+ Raises:
+ typer.Abort: If user cancels selection
+ """
+
+ if not chats:
+ typer.secho("No chats available to select.", fg=typer.colors.RED)
+ raise typer.Abort()
+
+ if len(chats) == 1:
+ return chats[0]
+
+ # Sort by last updated (most recent first)
+ sorted_chats = sorted(chats, key=_chat_last_updated, reverse=True)
+
+ # Show up to show_limit chats
+ display_chats = sorted_chats[:show_limit]
+
+ typer.echo(f"\n{prompt_message}")
+ if showing_limited:
+ typer.secho(
+ "(Showing limited subset. Use --user/--chat to search for specific chats)",
+ fg=typer.colors.CYAN,
+ )
+ typer.echo("=" * 80)
+ typer.echo(f"{'#':<4} {'Type':<8} {'Chat Name':<50} {'Last Updated':<20}")
+ typer.echo("-" * 80)
+
+ for idx, chat in enumerate(display_chats, 1):
+ name = _chat_display_name(chat)
+ chat_type = _chat_type_label(chat)
+
+ # Get timestamp from lastMessagePreview (most accurate) or fallback
+ last_message_preview = chat.get("lastMessagePreview")
+ if last_message_preview and isinstance(last_message_preview, dict):
+ last_updated = last_message_preview.get("createdDateTime", "N/A")
+ else:
+ last_updated = chat.get("lastUpdatedDateTime", "N/A")
+
+ # Truncate and pad fields to fixed visual widths (handles emoji correctly)
+ name_formatted = _pad_to_width(_truncate_to_width(name, 50), 50)
+ chat_type_formatted = _pad_to_width(chat_type, 8)
+ idx_formatted = _pad_to_width(str(idx), 4)
+
+ # Format timestamp
+ if last_updated and last_updated != "N/A":
+ try:
+ # Show just date and time without milliseconds
+ timestamp_display = last_updated.split('.')[0].replace('T', ' ')
+ except Exception:
+ timestamp_display = last_updated[:19]
+ else:
+ timestamp_display = "N/A"
+
+ timestamp_formatted = _pad_to_width(timestamp_display, 20)
+
+ typer.echo(f"{idx_formatted}{chat_type_formatted}{name_formatted}{timestamp_formatted}")
+
+ if len(sorted_chats) > show_limit:
+ typer.echo("-" * 80)
+ typer.echo(f"... and {len(sorted_chats) - show_limit} more chats (showing most recent {show_limit})")
+
+ typer.echo("=" * 80)
+
+ # Get user selection
+ while True:
+ try:
+ selection = typer.prompt(
+ f"\nEnter chat number (1-{len(display_chats)}), 's' to search, 'c' to refresh cache, or 'q' to quit",
+ default="",
+ )
+
+ if selection.lower() in ("q", "quit", "exit"):
+ typer.echo("Selection cancelled.")
+ raise typer.Abort()
+
+ if not selection:
+ continue
+
+ # Refresh cache mode
+ if selection.lower() in ("c", "cache", "refresh"):
+ typer.secho("Requesting cache refresh...", fg=typer.colors.YELLOW)
+ # Return a special marker to signal cache refresh
+ return {"__action__": "refresh_cache"}
+
+ # Search mode
+ if selection.lower() in ("s", "search"):
+ search_query = typer.prompt("\nEnter search term (chat name or participant)")
+ if not search_query:
+ continue
+
+ # Search in all chats, not just displayed ones
+ search_results = filter_chats_by_query(sorted_chats, search_query)
+
+ if not search_results:
+ typer.secho(f"No chats found matching '{search_query}'", fg=typer.colors.YELLOW)
+ continue
+
+ if len(search_results) == 1:
+ selected_chat = search_results[0]
+ selected_name = _chat_display_name(selected_chat)
+ typer.secho(f"β Found and selected: {selected_name}", fg=typer.colors.GREEN)
+ return selected_chat
+
+ # Show search results
+ typer.echo(f"\nFound {len(search_results)} matching chats:")
+ typer.echo("-" * 80)
+ for idx, chat in enumerate(search_results[:20], 1):
+ name = _chat_display_name(chat)
+ # Truncate with proper emoji handling
+ name = _truncate_to_width(name, 60)
+ typer.echo(f"{idx:<4} {name}")
+
+ if len(search_results) > 20:
+ typer.echo(f"... and {len(search_results) - 20} more matches")
+ typer.echo("-" * 80)
+
+ result_selection = typer.prompt(f"Enter number (1-{min(20, len(search_results))})", default="")
+ if result_selection.isdigit():
+ result_idx = int(result_selection)
+ if 1 <= result_idx <= min(20, len(search_results)):
+ selected_chat = search_results[result_idx - 1]
+ selected_name = _chat_display_name(selected_chat)
+ typer.secho(f"\nβ Selected: {selected_name}", fg=typer.colors.GREEN)
+ return selected_chat
+ continue
+
+ choice = int(selection)
+ if 1 <= choice <= len(display_chats):
+ selected_chat = display_chats[choice - 1]
+ selected_name = _chat_display_name(selected_chat)
+ typer.secho(f"\nβ Selected: {selected_name}", fg=typer.colors.GREEN)
+ return selected_chat
+ else:
+ typer.secho(
+ f"Please enter a number between 1 and {len(display_chats)}.",
+ fg=typer.colors.YELLOW,
+ )
+ except ValueError:
+ typer.secho("Invalid input. Please enter a number, 's' to search, or 'c' to refresh cache.", fg=typer.colors.YELLOW)
+ except (KeyboardInterrupt, EOFError):
+ typer.echo("\nSelection cancelled.")
+ raise typer.Abort()
+
+
+def filter_chats_by_query(chats: Sequence[dict], query: str) -> List[dict]:
+ """Filter chats by a search query (case-insensitive substring match).
+
+ Searches in:
+ - Chat topic/display name
+ - Member names
+ - Member emails
+ """
+ if not query:
+ return list(chats)
+
+ query_lower = query.lower()
+ matches = []
+
+ for chat in chats:
+ # Check chat name
+ name = _chat_display_name(chat).lower()
+ if query_lower in name:
+ matches.append(chat)
+ continue
+
+ # Check members
+ members = chat.get("members", [])
+ for member in members:
+ display_name = (member.get("displayName") or "").lower()
+ email = (member.get("email") or "").lower()
+ if query_lower in display_name or query_lower in email:
+ matches.append(chat)
+ break
+
+ return matches