diff --git a/README.md b/README.md index 962c934..01ebb2d 100644 --- a/README.md +++ b/README.md @@ -61,21 +61,42 @@ print(f"✅ Uploaded! URI: ar://{result.id}") ### Core Classes -#### `Turbo(signer, network="mainnet")` +#### `Turbo(signer, network="mainnet", upload_url=None, payment_url=None)` Main client for interacting with Turbo services. **Parameters:** - `signer`: Either `EthereumSigner` or `ArweaveSigner` instance - `network`: `"mainnet"` or `"testnet"` (default: `"mainnet"`) +- `upload_url`: Optional custom upload service URL (overrides network default) +- `payment_url`: Optional custom payment service URL (overrides network default) + +```python +# Using default URLs (mainnet) +turbo = Turbo(signer) + +# Using testnet +turbo = Turbo(signer, network="testnet") + +# Using custom URLs +turbo = Turbo(signer, upload_url="https://my-upload-service.example.com") +``` **Methods:** -##### `upload(data, tags=None) -> TurboUploadResponse` +##### `upload(data, tags=None, on_progress=None, chunking=None, data_size=None) -> TurboUploadResponse` + +Upload data to the Turbo datachain. Supports both small files (single request) and large files (chunked multipart upload). -Upload data to the Turbo datachain. +**Parameters:** +- `data`: Data to upload (`bytes` or file-like `BinaryIO` object) +- `tags`: Optional list of metadata tags +- `on_progress`: Optional callback `(processed_bytes, total_bytes) -> None` +- `chunking`: Optional `ChunkingParams` for upload configuration +- `data_size`: Required when `data` is a file-like object ```python +# Simple upload result = turbo.upload( data=b"Your data here", tags=[ @@ -96,6 +117,51 @@ class TurboUploadResponse: winc: str # Winston credits cost ``` +##### Large File Uploads with Progress + +For files >= 5 MiB, the SDK automatically uses chunked multipart uploads. You can track progress with a callback: + +```python +def on_progress(processed: int, total: int): + pct = (processed / total) * 100 + print(f"Upload progress: {pct:.1f}%") + +# Upload a large file with progress tracking +with open("large-video.mp4", "rb") as f: + result = turbo.upload( + data=f, + data_size=os.path.getsize("large-video.mp4"), + tags=[{"name": "Content-Type", "value": "video/mp4"}], + on_progress=on_progress, + ) +``` + +##### Chunking Configuration + +Use `ChunkingParams` to customize chunked upload behavior: + +```python +from turbo_sdk import ChunkingParams + +result = turbo.upload( + data=large_data, + chunking=ChunkingParams( + chunk_byte_count=10 * 1024 * 1024, # 10 MiB chunks (default: 5 MiB) + max_chunk_concurrency=3, # Parallel chunk uploads (default: 1) + chunking_mode="auto", # "auto", "force", or "disabled" + ), + on_progress=lambda p, t: print(f"{p}/{t} bytes"), +) +``` + +**ChunkingParams options:** +- `chunk_byte_count`: Chunk size in bytes (5-500 MiB, default: 5 MiB) +- `max_chunk_concurrency`: Number of parallel chunk uploads (default: 1) +- `chunking_mode`: + - `"auto"` (default): Use chunked upload for files >= 5 MiB + - `"force"`: Always use chunked upload + - `"disabled"`: Always use single request upload + ##### `get_balance(address=None) -> TurboBalanceResponse` Get winston credit balance. Uses signed request for authenticated balance check when no address specified. @@ -179,6 +245,27 @@ Create signed headers for authenticated API requests. headers = signer.create_signed_headers() ``` +### Exceptions + +The SDK provides specific exceptions for error handling: + +```python +from turbo_sdk import UnderfundedError, ChunkedUploadError + +try: + result = turbo.upload(large_data) +except UnderfundedError: + print("Insufficient balance - please top up your account") +except ChunkedUploadError as e: + print(f"Upload failed: {e}") +``` + +**Exception types:** +- `ChunkedUploadError`: Base exception for chunked upload failures +- `UnderfundedError`: Account has insufficient balance (HTTP 402) +- `UploadValidationError`: Upload validation failed (INVALID status) +- `UploadFinalizationError`: Finalization timed out or failed + ## Developers @@ -203,7 +290,15 @@ pip install -e ".[dev]" pytest ``` -That's it! The test suite includes comprehensive tests for all components without requiring network access. +3. **Run performance benchmarks** (requires funded wallet): + +```bash +export TURBO_TEST_WALLET=/path/to/wallet.json +export TURBO_UPLOAD_URL=https://upload.ardrive.dev # optional, defaults to testnet +pytest -m performance -v -s +``` + +The test suite includes comprehensive unit tests for all components. Performance tests measure real upload throughput against the Turbo service. ## Acknowledgments diff --git a/examples/arweave_upload.py b/examples/arweave_upload.py index d8d2cce..a104488 100644 --- a/examples/arweave_upload.py +++ b/examples/arweave_upload.py @@ -2,6 +2,7 @@ """ Example: Upload data using Arweave JWK wallet """ + from turbo_sdk import Turbo, ArweaveSigner import json import sys diff --git a/examples/ethereum_upload.py b/examples/ethereum_upload.py index 2ddd051..a3de494 100644 --- a/examples/ethereum_upload.py +++ b/examples/ethereum_upload.py @@ -2,6 +2,7 @@ """ Example: Upload data using Ethereum private key """ + from turbo_sdk import Turbo, EthereumSigner diff --git a/examples/test_wallet_integration.py b/examples/test_wallet_integration.py index 24ce5b0..d7e1159 100644 --- a/examples/test_wallet_integration.py +++ b/examples/test_wallet_integration.py @@ -2,6 +2,7 @@ """ Test integration with real Arweave wallet (without network calls) """ + import json from pathlib import Path from turbo_sdk import Turbo, ArweaveSigner diff --git a/pyproject.toml b/pyproject.toml index de27ffe..708b5f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,10 @@ testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] -addopts = "-v --tb=short" +addopts = "-v --tb=short -m 'not performance'" +markers = [ + "performance: marks tests as performance benchmarks (may be slow and consume credits)" +] filterwarnings = [ "ignore::DeprecationWarning", "ignore::PendingDeprecationWarning" diff --git a/tests/test_chunked.py b/tests/test_chunked.py new file mode 100644 index 0000000..09eafd4 --- /dev/null +++ b/tests/test_chunked.py @@ -0,0 +1,496 @@ +"""Tests for chunked/multipart upload functionality""" + +import io +import pytest +from unittest.mock import Mock, patch, MagicMock +import json + +from turbo_sdk.types import ChunkingParams, TurboUploadStatus, ChunkedUploadInit +from turbo_sdk.chunked import ( + ChunkedUploader, + ChunkedUploadError, + UnderfundedError, + UploadValidationError, + UploadFinalizationError, +) + + +class TestChunkingParams: + """Test ChunkingParams dataclass""" + + def test_default_values(self): + """Test default parameter values""" + params = ChunkingParams() + assert params.chunk_byte_count == 5 * 1024 * 1024 # 5 MiB + assert params.max_chunk_concurrency == 1 + assert params.chunking_mode == "auto" + assert params.max_finalize_ms == 150_000 + + def test_custom_values(self): + """Test custom parameter values""" + params = ChunkingParams( + chunk_byte_count=10 * 1024 * 1024, + max_chunk_concurrency=4, + chunking_mode="force", + max_finalize_ms=300_000, + ) + assert params.chunk_byte_count == 10 * 1024 * 1024 + assert params.max_chunk_concurrency == 4 + assert params.chunking_mode == "force" + assert params.max_finalize_ms == 300_000 + + def test_chunk_size_validation_too_small(self): + """Test validation rejects chunk size below 5 MiB""" + with pytest.raises(ValueError, match="chunk_byte_count must be between"): + ChunkingParams(chunk_byte_count=1024 * 1024) # 1 MiB + + def test_chunk_size_validation_too_large(self): + """Test validation rejects chunk size above 500 MiB""" + with pytest.raises(ValueError, match="chunk_byte_count must be between"): + ChunkingParams(chunk_byte_count=600 * 1024 * 1024) # 600 MiB + + def test_chunk_size_at_boundaries(self): + """Test chunk size at valid boundaries""" + # Minimum boundary + params_min = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + assert params_min.chunk_byte_count == 5 * 1024 * 1024 + + # Maximum boundary + params_max = ChunkingParams(chunk_byte_count=500 * 1024 * 1024) + assert params_max.chunk_byte_count == 500 * 1024 * 1024 + + def test_concurrency_validation(self): + """Test validation rejects concurrency below 1""" + with pytest.raises(ValueError, match="max_chunk_concurrency must be at least 1"): + ChunkingParams(max_chunk_concurrency=0) + + +class TestChunkedUploader: + """Test ChunkedUploader class""" + + @pytest.fixture + def uploader(self): + """Create a ChunkedUploader instance for testing""" + return ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=ChunkingParams(), + ) + + def test_init(self, uploader): + """Test uploader initialization""" + assert uploader.upload_url == "https://upload.test.io" + assert uploader.token == "arweave" + assert uploader.params.chunk_byte_count == 5 * 1024 * 1024 + + def test_chunking_version_header(self, uploader): + """Test x-chunking-version header is set""" + assert uploader._session.headers["x-chunking-version"] == "2" + + def test_poll_interval_small_file(self, uploader): + """Test poll interval for files < 100 MB""" + interval = uploader._get_poll_interval(50 * 1024 * 1024) # 50 MB + assert interval == 2.0 + + def test_poll_interval_medium_file(self, uploader): + """Test poll interval for files < 3 GiB""" + interval = uploader._get_poll_interval(1024 * 1024 * 1024) # 1 GiB + assert interval == 4.0 + + def test_poll_interval_large_file(self, uploader): + """Test poll interval for files >= 3 GiB""" + interval = uploader._get_poll_interval(5 * 1024 * 1024 * 1024) # 5 GiB + assert interval == 7.5 # 1.5 * 5 + + def test_poll_interval_max_cap(self, uploader): + """Test poll interval caps at 15 seconds""" + interval = uploader._get_poll_interval(20 * 1024 * 1024 * 1024) # 20 GiB + assert interval == 15.0 + + @patch("turbo_sdk.chunked.requests.Session") + def test_initiate_success(self, mock_session_class): + """Test successful upload initiation""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test-upload-id", + "min": 5242880, + "max": 524288000, + "chunkSize": 5242880, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + result = uploader.initiate() + + assert isinstance(result, ChunkedUploadInit) + assert result.id == "test-upload-id" + assert result.min == 5242880 + assert result.max == 524288000 + + @patch("turbo_sdk.chunked.requests.Session") + def test_initiate_service_unavailable(self, mock_session_class): + """Test initiation with 503 error""" + mock_response = Mock() + mock_response.status_code = 503 + mock_response.text = "Service Unavailable" + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(ChunkedUploadError, match="Service unavailable"): + uploader.initiate() + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_success(self, mock_session_class): + """Test successful chunk upload""" + mock_response = Mock() + mock_response.status_code = 200 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + # Should not raise + uploader.upload_chunk("upload-id", 0, b"test data") + + mock_session.post.assert_called_once() + call_args = mock_session.post.call_args + assert "upload-id" in call_args[0][0] + assert "/0" in call_args[0][0] + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_underfunded(self, mock_session_class): + """Test chunk upload with 402 error""" + mock_response = Mock() + mock_response.status_code = 402 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UnderfundedError): + uploader.upload_chunk("upload-id", 0, b"test data") + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_not_found(self, mock_session_class): + """Test chunk upload with 404 error""" + mock_response = Mock() + mock_response.status_code = 404 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(ChunkedUploadError, match="not found or expired"): + uploader.upload_chunk("upload-id", 0, b"test data") + + @patch("turbo_sdk.chunked.requests.Session") + def test_finalize_success(self, mock_session_class): + """Test successful finalization""" + mock_response = Mock() + mock_response.status_code = 202 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + # Should not raise + uploader.finalize("upload-id") + + call_args = mock_session.post.call_args + assert "finalize" in call_args[0][0] + + @patch("turbo_sdk.chunked.requests.Session") + def test_get_status_finalized(self, mock_session_class): + """Test getting finalized status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "FINALIZED", + "timestamp": 1234567890, + "receipt": { + "id": "tx-id", + "owner": "owner-address", + "dataCaches": ["arweave.net"], + "fastFinalityIndexes": ["arweave.net"], + "winc": "1000", + }, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.get_status("upload-id") + + assert isinstance(status, TurboUploadStatus) + assert status.status == "FINALIZED" + assert status.id == "tx-id" + assert status.owner == "owner-address" + assert status.data_caches == ["arweave.net"] + + @patch("turbo_sdk.chunked.requests.Session") + def test_get_status_validating(self, mock_session_class): + """Test getting validating status (no receipt yet)""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "VALIDATING", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.get_status("upload-id") + + assert status.status == "VALIDATING" + assert status.id is None + assert status.owner is None + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_success(self, mock_session_class, mock_sleep): + """Test successful finalization polling""" + # First call returns VALIDATING, second returns FINALIZED + mock_response_validating = Mock() + mock_response_validating.status_code = 200 + mock_response_validating.json.return_value = { + "status": "VALIDATING", + "timestamp": 1234567890, + } + + mock_response_finalized = Mock() + mock_response_finalized.status_code = 200 + mock_response_finalized.json.return_value = { + "status": "FINALIZED", + "timestamp": 1234567891, + "receipt": { + "id": "tx-id", + "owner": "owner", + "dataCaches": [], + "fastFinalityIndexes": [], + "winc": "0", + }, + } + + mock_session = Mock() + mock_session.get.side_effect = [mock_response_validating, mock_response_finalized] + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.poll_for_finalization("upload-id", 1024 * 1024) + + assert status.status == "FINALIZED" + assert mock_sleep.called + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_underfunded(self, mock_session_class, mock_sleep): + """Test polling returns UNDERFUNDED status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "UNDERFUNDED", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UnderfundedError): + uploader.poll_for_finalization("upload-id", 1024 * 1024) + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_invalid(self, mock_session_class, mock_sleep): + """Test polling returns INVALID status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "INVALID", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UploadValidationError, match="INVALID"): + uploader.poll_for_finalization("upload-id", 1024 * 1024) + + +class TestChunkedUploaderSequentialUpload: + """Test sequential chunk upload""" + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_sequential_bytes(self, mock_session_class): + """Test sequential upload with bytes data""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create 12 MiB of data (should result in 3 chunks) + data = b"x" * (12 * 1024 * 1024) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + uploader.upload_chunks_sequential("upload-id", data, len(data), on_progress) + + # Should have 3 chunk uploads + assert mock_session.post.call_count == 3 + + # Progress should be reported after each chunk + assert len(progress_calls) == 3 + assert progress_calls[-1][0] == len(data) + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_sequential_stream(self, mock_session_class): + """Test sequential upload with stream data""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create stream with 10 MiB + data = b"y" * (10 * 1024 * 1024) + stream = io.BytesIO(data) + + uploader.upload_chunks_sequential("upload-id", stream, len(data), None) + + # Should have 2 chunk uploads + assert mock_session.post.call_count == 2 + + +class TestChunkedUploaderConcurrentUpload: + """Test concurrent chunk upload""" + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_concurrent(self, mock_session_class): + """Test concurrent upload""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams( + chunk_byte_count=5 * 1024 * 1024, + max_chunk_concurrency=3, + ) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create 15 MiB of data + data = b"z" * (15 * 1024 * 1024) + + uploader.upload_chunks_concurrent("upload-id", data, len(data), None) + + # Should have 3 chunk uploads + assert mock_session.post.call_count == 3 + + +class TestUnderfundedError: + """Test UnderfundedError exception""" + + def test_default_message(self): + """Test default error message""" + error = UnderfundedError() + assert str(error) == "Insufficient balance for upload" + assert error.status_code == 402 + + def test_custom_message(self): + """Test custom error message""" + error = UnderfundedError("Custom message") + assert str(error) == "Custom message" + assert error.status_code == 402 diff --git a/tests/test_chunked_performance.py b/tests/test_chunked_performance.py new file mode 100644 index 0000000..a4e4ec4 --- /dev/null +++ b/tests/test_chunked_performance.py @@ -0,0 +1,464 @@ +""" +Performance tests for chunked/multipart uploads vs traditional single-request uploads. + +These tests measure real upload performance against the Turbo service. +They are SKIPPED by default and must be run explicitly. + +Run with: + export TURBO_TEST_WALLET=/path/to/wallet.json + export TURBO_UPLOAD_URL=https://upload.ardrive.dev # optional, defaults to testnet + pytest -m performance -v -s + +Note: These tests upload real data and may consume credits. Use a funded wallet. + +Expected Results: +----------------- +Single-request uploads are often faster for raw throughput because they avoid: +- Multiple HTTP round trips for chunk uploads +- Server-side chunk assembly time +- Finalization polling delay + +However, chunked uploads provide important benefits: +- Progress reporting during upload (essential for UX with large files) +- Memory efficiency (can stream without loading entire file) +- Reliability (individual chunk failures can be retried) +- Foundation for resumable uploads (upload ID persistence) +- Better handling of network interruptions + +The auto-chunking threshold (5 MiB) balances these tradeoffs - small files +use efficient single requests while large files get progress visibility. +""" + +import io +import json +import os +import time +from dataclasses import dataclass +from typing import List, Optional + +import pytest + +from turbo_sdk import Turbo, ArweaveSigner, ChunkingParams + +# Test wallet path - must be set via environment variable +WALLET_PATH = os.environ.get("TURBO_TEST_WALLET", "") + +# Upload URL - defaults to testnet for safety +UPLOAD_URL = os.environ.get("TURBO_UPLOAD_URL", "https://upload.ardrive.dev") + + +@dataclass +class UploadMetrics: + """Metrics collected during an upload""" + + method: str + data_size: int + total_time: float + throughput_mbps: float + chunk_size: Optional[int] = None + concurrency: Optional[int] = None + tx_id: Optional[str] = None + + +def format_size(size_bytes: int) -> str: + """Format bytes as human-readable string""" + if size_bytes >= 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MiB" + elif size_bytes >= 1024: + return f"{size_bytes / 1024:.1f} KiB" + return f"{size_bytes} B" + + +def format_throughput(mbps: float) -> str: + """Format throughput as human-readable string""" + if mbps >= 1: + return f"{mbps:.2f} MB/s" + return f"{mbps * 1024:.2f} KB/s" + + +@pytest.fixture(scope="module") +def turbo_client(): + """Create a Turbo client for testing""" + if not WALLET_PATH or not os.path.exists(WALLET_PATH): + pytest.skip(f"TURBO_TEST_WALLET not set or wallet not found at '{WALLET_PATH}'") + + with open(WALLET_PATH) as f: + jwk = json.load(f) + + signer = ArweaveSigner(jwk) + return Turbo(signer, upload_url=UPLOAD_URL) + + +@pytest.fixture(scope="module") +def test_data_small(): + """Generate 1 MiB test data""" + return os.urandom(1 * 1024 * 1024) + + +@pytest.fixture(scope="module") +def test_data_medium(): + """Generate 6 MiB test data (triggers auto-chunking)""" + return os.urandom(6 * 1024 * 1024) + + +@pytest.fixture(scope="module") +def test_data_large(): + """Generate 15 MiB test data""" + return os.urandom(15 * 1024 * 1024) + + +class TestUploadPerformance: + """Performance comparison tests for upload methods""" + + def _upload_single(self, turbo: Turbo, data: bytes, label: str) -> UploadMetrics: + """Perform single-request upload and collect metrics""" + start = time.perf_counter() + + result = turbo.upload( + data, + tags=[{"name": "Test", "value": f"perf-single-{label}"}], + chunking=ChunkingParams(chunking_mode="disabled"), + ) + + elapsed = time.perf_counter() - start + throughput = (len(data) / (1024 * 1024)) / elapsed + + return UploadMetrics( + method="single", + data_size=len(data), + total_time=elapsed, + throughput_mbps=throughput, + tx_id=result.id, + ) + + def _upload_chunked( + self, + turbo: Turbo, + data: bytes, + label: str, + chunk_size: int = 5 * 1024 * 1024, + concurrency: int = 1, + ) -> UploadMetrics: + """Perform chunked upload and collect metrics""" + start = time.perf_counter() + + result = turbo.upload( + data, + tags=[{"name": "Test", "value": f"perf-chunked-{label}"}], + chunking=ChunkingParams( + chunking_mode="force", + chunk_byte_count=chunk_size, + max_chunk_concurrency=concurrency, + ), + ) + + elapsed = time.perf_counter() - start + throughput = (len(data) / (1024 * 1024)) / elapsed + + return UploadMetrics( + method=f"chunked-{concurrency}x", + data_size=len(data), + total_time=elapsed, + throughput_mbps=throughput, + chunk_size=chunk_size, + concurrency=concurrency, + tx_id=result.id, + ) + + def _print_metrics(self, metrics: List[UploadMetrics], title: str): + """Print metrics table""" + print(f"\n{'=' * 70}") + print(f" {title}") + print(f"{'=' * 70}") + print(f"{'Method':<20} {'Size':<12} {'Time':<10} {'Throughput':<15}") + print(f"{'-' * 70}") + + for m in metrics: + print( + f"{m.method:<20} " + f"{format_size(m.data_size):<12} " + f"{m.total_time:.2f}s{'':<5} " + f"{format_throughput(m.throughput_mbps):<15}" + ) + + print(f"{'=' * 70}\n") + + @pytest.mark.performance + def test_small_file_comparison(self, turbo_client, test_data_small): + """Compare upload methods for small files (1 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_small, "1mib") + metrics.append(m1) + + # Chunked upload (forced) + m2 = self._upload_chunked(turbo_client, test_data_small, "1mib") + metrics.append(m2) + + self._print_metrics(metrics, "Small File (1 MiB) - Single vs Chunked") + + # For small files, single request should be faster (less overhead) + # But we just verify both complete successfully + assert m1.tx_id is not None + assert m2.tx_id is not None + + @pytest.mark.performance + def test_medium_file_comparison(self, turbo_client, test_data_medium): + """Compare upload methods for medium files (6 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_medium, "6mib") + metrics.append(m1) + + # Chunked upload - 1 concurrent + m2 = self._upload_chunked(turbo_client, test_data_medium, "6mib", concurrency=1) + metrics.append(m2) + + # Chunked upload - 2 concurrent + m3 = self._upload_chunked(turbo_client, test_data_medium, "6mib", concurrency=2) + metrics.append(m3) + + self._print_metrics(metrics, "Medium File (6 MiB) - Single vs Chunked") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_large_file_comparison(self, turbo_client, test_data_large): + """Compare upload methods for large files (15 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_large, "15mib") + metrics.append(m1) + + # Chunked upload - 1 concurrent + m2 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=1) + metrics.append(m2) + + # Chunked upload - 2 concurrent + m3 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=2) + metrics.append(m3) + + # Chunked upload - 3 concurrent + m4 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=3) + metrics.append(m4) + + self._print_metrics(metrics, "Large File (15 MiB) - Single vs Chunked") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_concurrency_scaling(self, turbo_client, test_data_large): + """Test how throughput scales with concurrency""" + metrics = [] + + for concurrency in [1, 2, 3, 4]: + m = self._upload_chunked( + turbo_client, + test_data_large, + f"15mib-c{concurrency}", + concurrency=concurrency, + ) + metrics.append(m) + + self._print_metrics(metrics, "Concurrency Scaling (15 MiB)") + + # Verify all uploads succeeded + assert all(m.tx_id is not None for m in metrics) + + # Check that higher concurrency generally improves throughput + # (may not always be true due to network conditions) + print("Throughput by concurrency:") + for m in metrics: + print(f" {m.concurrency}x: {format_throughput(m.throughput_mbps)}") + + @pytest.mark.performance + def test_chunk_size_comparison(self, turbo_client, test_data_large): + """Test impact of different chunk sizes""" + metrics = [] + + chunk_sizes = [ + 5 * 1024 * 1024, # 5 MiB (minimum) + 10 * 1024 * 1024, # 10 MiB + 15 * 1024 * 1024, # 15 MiB (single chunk for this data) + ] + + for chunk_size in chunk_sizes: + m = self._upload_chunked( + turbo_client, + test_data_large, + f"15mib-cs{chunk_size // (1024*1024)}", + chunk_size=chunk_size, + concurrency=2, + ) + metrics.append(m) + + self._print_metrics(metrics, "Chunk Size Comparison (15 MiB, 2x concurrency)") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_stream_upload_performance(self, turbo_client, test_data_medium): + """Test upload performance with file-like stream input""" + metrics = [] + + # Bytes upload + start = time.perf_counter() + result1 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-bytes"}], + chunking=ChunkingParams(chunking_mode="force"), + ) + elapsed = time.perf_counter() - start + metrics.append( + UploadMetrics( + method="bytes", + data_size=len(test_data_medium), + total_time=elapsed, + throughput_mbps=(len(test_data_medium) / (1024 * 1024)) / elapsed, + tx_id=result1.id, + ) + ) + + # Stream upload + stream = io.BytesIO(test_data_medium) + start = time.perf_counter() + result2 = turbo_client.upload( + stream, + tags=[{"name": "Test", "value": "perf-stream"}], + chunking=ChunkingParams(chunking_mode="force"), + data_size=len(test_data_medium), + ) + elapsed = time.perf_counter() - start + metrics.append( + UploadMetrics( + method="stream", + data_size=len(test_data_medium), + total_time=elapsed, + throughput_mbps=(len(test_data_medium) / (1024 * 1024)) / elapsed, + tx_id=result2.id, + ) + ) + + self._print_metrics(metrics, "Bytes vs Stream Input (6 MiB)") + + assert all(m.tx_id is not None for m in metrics) + + +class TestProgressCallbackPerformance: + """Test that progress callbacks don't significantly impact performance""" + + @pytest.mark.performance + def test_progress_callback_overhead(self, turbo_client, test_data_medium): + """Measure overhead of progress callbacks""" + + # Without callback + start = time.perf_counter() + r1 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-no-callback"}], + chunking=ChunkingParams(chunking_mode="force"), + ) + time_no_callback = time.perf_counter() - start + + # With callback + progress_events = [] + + def on_progress(processed, total): + progress_events.append((time.perf_counter(), processed, total)) + + start = time.perf_counter() + r2 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-with-callback"}], + on_progress=on_progress, + chunking=ChunkingParams(chunking_mode="force"), + ) + time_with_callback = time.perf_counter() - start + + overhead_pct = ((time_with_callback - time_no_callback) / time_no_callback) * 100 + + print(f"\n{'=' * 50}") + print("Progress Callback Overhead Test") + print("=" * 50) + print(f"Without callback: {time_no_callback:.2f}s") + print(f"With callback: {time_with_callback:.2f}s") + print(f"Overhead: {overhead_pct:+.1f}%") + print(f"Callback events: {len(progress_events)}") + print(f"{'=' * 50}\n") + + assert r1.id is not None + assert r2.id is not None + assert len(progress_events) > 0 + + # Callback overhead should be minimal (< 20%) + # Note: network variance may cause this to fluctuate + assert overhead_pct < 50, f"Callback overhead too high: {overhead_pct}%" + + +def run_benchmark(): + """Run all benchmarks and print summary""" + if not WALLET_PATH or not os.path.exists(WALLET_PATH): + print(f"Error: Wallet not found at '{WALLET_PATH}'") + print("Set TURBO_TEST_WALLET environment variable to your wallet path") + return + + with open(WALLET_PATH) as f: + jwk = json.load(f) + + signer = ArweaveSigner(jwk) + turbo = Turbo(signer, upload_url=UPLOAD_URL) + + print(f"\nWallet: {signer.get_wallet_address()}") + print(f"Upload URL: {UPLOAD_URL}") + print("Running performance benchmarks...\n") + + test = TestUploadPerformance() + + # Run tests with different file sizes + sizes = [ + (1 * 1024 * 1024, "1 MiB"), + (6 * 1024 * 1024, "6 MiB"), + (15 * 1024 * 1024, "15 MiB"), + ] + + all_metrics = [] + + for size, label in sizes: + print(f"Testing {label}...") + data = os.urandom(size) + + # Single upload + m1 = test._upload_single(turbo, data, label.replace(" ", "")) + all_metrics.append(m1) + + # Chunked with different concurrency + for conc in [1, 2, 3]: + m = test._upload_chunked( + turbo, data, f"{label.replace(' ', '')}-c{conc}", concurrency=conc + ) + all_metrics.append(m) + + # Print final summary + print("\n" + "=" * 80) + print(" FINAL SUMMARY") + print("=" * 80) + print(f"{'Method':<25} {'Size':<12} {'Time':<10} {'Throughput':<15} {'TX ID':<20}") + print("-" * 80) + + for m in all_metrics: + print( + f"{m.method:<25} " + f"{format_size(m.data_size):<12} " + f"{m.total_time:.2f}s{'':<5} " + f"{format_throughput(m.throughput_mbps):<15} " + f"{m.tx_id[:16] if m.tx_id else 'N/A'}..." + ) + + print("=" * 80) + + +if __name__ == "__main__": + run_benchmark() diff --git a/tests/test_stream_signing.py b/tests/test_stream_signing.py index f89d99b..939b366 100644 --- a/tests/test_stream_signing.py +++ b/tests/test_stream_signing.py @@ -18,7 +18,6 @@ ) from turbo_sdk.signers import EthereumSigner - # Test private key (not a real key, just for testing) TEST_PRIVATE_KEY = "0x" + "ab" * 32 diff --git a/tests/test_streaming_dataitem.py b/tests/test_streaming_dataitem.py new file mode 100644 index 0000000..3017b33 --- /dev/null +++ b/tests/test_streaming_dataitem.py @@ -0,0 +1,485 @@ +""" +Unit tests for StreamingDataItem and create_data_header. + +Tests verify that streaming DataItem construction produces identical binary +output to the in-memory approach, ensuring compatibility with the server. +""" + +import io +import os +import pytest +from turbo_sdk.bundle import ( + create_data, + create_data_header, + sign, + StreamingDataItem, +) +from turbo_sdk.signers import EthereumSigner + +# Test private key (not a real key, just for testing) +TEST_PRIVATE_KEY = "0x" + "ab" * 32 + + +class TestCreateDataHeader: + """Tests for create_data_header function.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_header_matches_dataitem_prefix(self, signer): + """Header bytes should match the prefix of a full DataItem.""" + test_data = b"Hello, Arweave!" + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # Create full DataItem in memory + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + # Create header only + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=tags, + anchor=data_item.raw_anchor, + ) + + # Header should be a prefix of the full DataItem + assert full_raw.startswith(header) + + # Header + data should equal full DataItem + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_header_with_no_tags(self, signer): + """Should work correctly with no tags.""" + test_data = b"No tags here" + + data_item = create_data(bytearray(test_data), signer, tags=None) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=None, + anchor=data_item.raw_anchor, + ) + + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_header_with_multiple_tags(self, signer): + """Should work correctly with multiple tags.""" + test_data = b"Multiple tags" + tags = [ + {"name": "Content-Type", "value": "application/octet-stream"}, + {"name": "App-Name", "value": "TestApp"}, + {"name": "Version", "value": "1.0.0"}, + ] + + data_item = create_data(bytearray(test_data), signer, tags=tags) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=tags, + anchor=data_item.raw_anchor, + ) + + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_signature_length_validation(self, signer): + """Should reject signatures with wrong length.""" + with pytest.raises(ValueError, match="Signature must be"): + create_data_header( + signer=signer, + signature=b"too_short", + tags=None, + anchor=os.urandom(32), + ) + + def test_anchor_length_validation(self, signer): + """Should reject anchors with wrong length.""" + # Ethereum signature is 65 bytes + signature = b"x" * 65 + + with pytest.raises(ValueError, match="Anchor must be exactly 32 bytes"): + create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=b"short_anchor", + ) + + def test_random_anchor_generated_when_none(self, signer): + """Should generate random anchor when none provided.""" + signature = b"x" * 65 + + header1 = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=None, + ) + + header2 = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=None, + ) + + # Headers should differ due to different random anchors + assert header1 != header2 + + +class TestStreamingDataItem: + """Tests for StreamingDataItem class.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_streaming_matches_inmemory(self, signer): + """Streaming DataItem should produce identical bytes to in-memory.""" + test_data = b"Hello, streaming world!" * 100 + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # In-memory approach + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + expected = bytes(data_item.get_raw()) + + # Streaming approach - use same anchor for comparison + stream = io.BytesIO(test_data) + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=tags, + ) + # Manually set the anchor to match + streaming._anchor = data_item.raw_anchor + streaming._data_stream.seek(0) + + # Now prepare - this will use our preset anchor + from turbo_sdk.bundle.sign import sign_stream + from turbo_sdk.bundle.tags import encode_tags + + encoded_tags = encode_tags(tags) + signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=streaming._data_stream, + data_size=len(test_data), + signer=signer, + ) + + streaming._data_stream.seek(0) + streaming._header = create_data_header( + signer=signer, + signature=signature, + tags=tags, + anchor=streaming._anchor, + ) + streaming._prepared = True + + actual = streaming.read(-1) + + assert actual == expected + + def test_prepare_returns_correct_size(self, signer): + """prepare() should return correct total size.""" + test_data = b"x" * 1000 + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=[{"name": "Test", "value": "value"}], + ) + + total_size = streaming.prepare() + + # Total should be header + data + assert total_size == streaming.header_size + len(test_data) + assert total_size == streaming.total_size + + def test_read_chunks_correctly(self, signer): + """Reading in chunks should produce same result as reading all.""" + test_data = b"chunked read test data " * 50 + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + total_size = streaming.prepare() + + # Read in chunks + chunks = [] + while True: + chunk = streaming.read(100) + if not chunk: + break + chunks.append(chunk) + + chunked_result = b"".join(chunks) + + # Reset and read all at once + streaming.reset() + all_at_once = streaming.read(-1) + + assert chunked_result == all_at_once + assert len(chunked_result) == total_size + + def test_read_empty_returns_empty(self, signer): + """read(0) should return empty bytes.""" + test_data = b"test" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + result = streaming.read(0) + assert result == b"" + + def test_raises_if_not_prepared(self, signer): + """Should raise error if read() called before prepare().""" + stream = io.BytesIO(b"test") + + streaming = StreamingDataItem( + data_stream=stream, + data_size=4, + signer=signer, + tags=None, + ) + + with pytest.raises(RuntimeError, match="Must call prepare"): + streaming.read(10) + + def test_raises_on_non_seekable_stream(self, signer): + """Should raise error for non-seekable streams.""" + + class NonSeekableStream: + def read(self, size=-1): + return b"data" + + def seekable(self): + return False + + streaming = StreamingDataItem( + data_stream=NonSeekableStream(), + data_size=4, + signer=signer, + tags=None, + ) + + with pytest.raises(RuntimeError, match="seekable stream"): + streaming.prepare() + + def test_reset_allows_rereading(self, signer): + """reset() should allow reading the data again.""" + test_data = b"reset test" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + first_read = streaming.read(-1) + streaming.reset() + second_read = streaming.read(-1) + + assert first_read == second_read + + def test_progress_callback_during_signing(self, signer): + """Progress callback should be called during prepare().""" + test_data = b"x" * 10000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + on_sign_progress=on_progress, + ) + streaming.prepare() + + assert len(progress_calls) > 0 + assert progress_calls[-1][0] == len(test_data) + assert progress_calls[-1][1] == len(test_data) + + def test_large_data_streaming(self, signer): + """Should handle large data without memory issues.""" + # 1 MiB of data + test_data = os.urandom(1024 * 1024) + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=[{"name": "Size", "value": "1MB"}], + ) + + total_size = streaming.prepare() + + # Read in chunks and verify total size + total_read = 0 + while True: + chunk = streaming.read(64 * 1024) # 64 KiB chunks + if not chunk: + break + total_read += len(chunk) + + assert total_read == total_size + + def test_header_then_data_boundary(self, signer): + """Reading across header/data boundary should work correctly.""" + test_data = b"boundary test data" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + # Get header size + header_size = streaming.header_size + + # Read exactly to header boundary + header_part = streaming.read(header_size) + assert len(header_part) == header_size + + # Read the data part + data_part = streaming.read(-1) + assert data_part == test_data + + def test_seekable_returns_false(self, signer): + """StreamingDataItem should not be seekable after prepare().""" + stream = io.BytesIO(b"test") + + streaming = StreamingDataItem( + data_stream=stream, + data_size=4, + signer=signer, + tags=None, + ) + + assert streaming.seekable() is False + + +class TestStreamingDataItemIntegration: + """Integration tests for StreamingDataItem with real file-like objects.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_with_tempfile(self, signer, tmp_path): + """Should work correctly with actual file objects.""" + from turbo_sdk.bundle.sign import sign_stream + from turbo_sdk.bundle.tags import encode_tags + + # Create a temporary file + test_file = tmp_path / "test_data.bin" + test_data = os.urandom(5000) + test_file.write_bytes(test_data) + + # Compare with in-memory result + data_item = create_data(bytearray(test_data), signer, tags=None) + sign(data_item, signer) + + # Get the raw_tags from data_item for consistency + encoded_tags = encode_tags([]) + + with open(test_file, "rb") as f: + streaming = StreamingDataItem( + data_stream=f, + data_size=len(test_data), + signer=signer, + tags=None, + ) + # Use same anchor + streaming._anchor = data_item.raw_anchor + + signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=f, + data_size=len(test_data), + signer=signer, + ) + + f.seek(0) + streaming._header = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=streaming._anchor, + ) + streaming._prepared = True + + streamed_result = streaming.read(-1) + + expected = bytes(data_item.get_raw()) + assert streamed_result == expected + + def test_with_tempfile_using_prepare(self, signer, tmp_path): + """Test using the normal prepare() flow with a temp file.""" + # Create a temporary file + test_file = tmp_path / "test_data.bin" + test_data = os.urandom(10000) + test_file.write_bytes(test_data) + + with open(test_file, "rb") as f: + streaming = StreamingDataItem( + data_stream=f, + data_size=len(test_data), + signer=signer, + tags=[{"name": "App", "value": "Test"}], + ) + + total_size = streaming.prepare() + result = streaming.read(-1) + + # Verify size matches + assert len(result) == total_size + + # Verify it starts with correct signature type (Ethereum = 3) + assert result[0:2] == b"\x03\x00" # Little-endian 3 diff --git a/turbo_sdk/__init__.py b/turbo_sdk/__init__.py index 6239851..6d45b47 100644 --- a/turbo_sdk/__init__.py +++ b/turbo_sdk/__init__.py @@ -22,15 +22,45 @@ """ from .client import Turbo -from .types import TurboUploadResponse, TurboBalanceResponse +from .types import ( + TurboUploadResponse, + TurboBalanceResponse, + TurboUploadStatus, + ChunkingParams, + ChunkedUploadInit, + ProgressCallback, + ChunkingMode, +) from .signers import EthereumSigner, ArweaveSigner +from .chunked import ( + ChunkedUploader, + ChunkedUploadError, + UnderfundedError, + UploadValidationError, + UploadFinalizationError, +) __version__ = "0.1.0" __all__ = [ + # Client "Turbo", + # Response types "TurboUploadResponse", "TurboBalanceResponse", + "TurboUploadStatus", + # Chunking + "ChunkingParams", + "ChunkedUploadInit", + "ChunkedUploader", + "ProgressCallback", + "ChunkingMode", + # Errors + "ChunkedUploadError", + "UnderfundedError", + "UploadValidationError", + "UploadFinalizationError", + # Signers "EthereumSigner", "ArweaveSigner", ] diff --git a/turbo_sdk/bundle/__init__.py b/turbo_sdk/bundle/__init__.py index 2e87e29..0b4f972 100644 --- a/turbo_sdk/bundle/__init__.py +++ b/turbo_sdk/bundle/__init__.py @@ -1,6 +1,6 @@ from .constants import SIG_CONFIG, MAX_TAG_BYTES, MIN_BINARY_SIZE from .dataitem import DataItem -from .create import create_data +from .create import create_data, create_data_header from .sign import ( sign, deep_hash, @@ -12,6 +12,7 @@ ) from .tags import encode_tags, decode_tags from .utils import set_bytes, byte_array_to_long +from .stream import StreamingDataItem __all__ = [ "SIG_CONFIG", @@ -19,6 +20,7 @@ "MIN_BINARY_SIZE", "DataItem", "create_data", + "create_data_header", "sign", "deep_hash", "get_signature_data", @@ -30,4 +32,5 @@ "decode_tags", "set_bytes", "byte_array_to_long", + "StreamingDataItem", ] diff --git a/turbo_sdk/bundle/create.py b/turbo_sdk/bundle/create.py index a547e94..44b95fd 100644 --- a/turbo_sdk/bundle/create.py +++ b/turbo_sdk/bundle/create.py @@ -133,3 +133,123 @@ def create_data( set_bytes(binary, data, offset) return DataItem(binary) + + +def create_data_header( + signer, + signature: bytes, + tags: Optional[List[Dict[str, str]]] = None, + target: Optional[str] = None, + anchor: Optional[bytes] = None, +) -> bytes: + """ + Create the header portion of a DataItem (everything before the data). + Used for streaming uploads where data is appended separately. + + Args: + signer: The signer object with signature_type, public_key, etc. + signature: The pre-computed signature bytes + tags: Optional list of tags as dictionaries with 'name' and 'value' keys + target: Optional target address (hex string) + anchor: Raw anchor bytes (32 bytes). If None, random anchor is generated. + + Returns: + Header bytes that can be prepended to streamed data + """ + # Get signature configuration + sig_config = SIG_CONFIG[signer.signature_type] + sig_length = sig_config["sigLength"] + pub_length = sig_config["pubLength"] + + # Validate signature length + if len(signature) != sig_length: + raise ValueError(f"Signature must be {sig_length} bytes, got {len(signature)}") + + # Process tags + if tags is None: + tags = [] + encoded_tags = encode_tags(tags) + + # Process target + target_bytes = bytearray(32) + target_present = False + if target: + target_hex = target.replace("0x", "") + target_data = bytes.fromhex(target_hex) + if len(target_data) > 32: + raise ValueError("Target must be 32 bytes or less") + for i, b in enumerate(target_data): + target_bytes[i] = b + target_present = True + + # Process anchor + anchor_bytes = bytearray(32) + if anchor is not None: + if len(anchor) != 32: + raise ValueError("Anchor must be exactly 32 bytes") + for i, b in enumerate(anchor): + anchor_bytes[i] = b + else: + random_anchor = os.urandom(32) + for i, b in enumerate(random_anchor): + anchor_bytes[i] = b + + # Calculate header size (everything except data) + header_size = ( + 2 # signature type + + sig_length # signature + + pub_length # owner/public key + + 1 # target present flag + + (32 if target_present else 0) # target + + 1 # anchor present flag + + 32 # anchor (always present) + + 8 # number of tags + + 8 # tag data length + + len(encoded_tags) # tag data + ) + + # Create binary buffer + binary = bytearray(header_size) + offset = 0 + + # 1. Signature type (2 bytes, little-endian) + struct.pack_into(" int: + """ + Sign the data (streaming) and prepare the header. + + This reads through the entire data stream to compute the signature, + then seeks back to the start for the upload phase. + + Returns: + Total size in bytes (header + data) + + Raises: + RuntimeError: If stream is not seekable + """ + if self._prepared: + return len(self._header) + self._data_size + + # Verify stream is seekable + if not self._data_stream.seekable(): + raise RuntimeError( + "StreamingDataItem requires a seekable stream. " + "For non-seekable streams, load data into memory first." + ) + + # Generate random anchor + self._anchor = os.urandom(32) + + # Encode tags for signing + encoded_tags = encode_tags(self._tags) + + # Compute signature by streaming through data + signature = sign_stream( + signature_type=self._signer.signature_type, + raw_owner=self._signer.public_key, + raw_target=b"", + raw_anchor=self._anchor, + raw_tags=encoded_tags, + data_stream=self._data_stream, + data_size=self._data_size, + signer=self._signer, + on_progress=self._on_sign_progress, + ) + + # Seek back to start for upload + self._data_stream.seek(0) + + # Build header with the computed signature + self._header = create_data_header( + signer=self._signer, + signature=signature, + tags=self._tags, + anchor=self._anchor, + ) + + self._prepared = True + return len(self._header) + self._data_size + + @property + def total_size(self) -> int: + """Total size in bytes (header + data). Must call prepare() first.""" + if not self._prepared: + raise RuntimeError("Must call prepare() first") + return len(self._header) + self._data_size + + @property + def header_size(self) -> int: + """Size of the header in bytes. Must call prepare() first.""" + if not self._prepared: + raise RuntimeError("Must call prepare() first") + return len(self._header) + + def read(self, size: int = -1) -> bytes: + """ + Read bytes from the streaming DataItem. + + Reads header bytes first, then data bytes. This method is compatible + with the BinaryIO interface expected by ChunkedUploader. + + Args: + size: Number of bytes to read. -1 means read all remaining. + + Returns: + Bytes read (may be less than size if at end) + + Raises: + RuntimeError: If prepare() has not been called + """ + if not self._prepared: + raise RuntimeError("Must call prepare() first") + + if size == 0: + return b"" + + result = bytearray() + + # Determine how many bytes to read + if size < 0: + # Read everything remaining + remaining = float("inf") + else: + remaining = size + + # Read from header first + if self._header_offset < len(self._header): + header_remaining = len(self._header) - self._header_offset + to_read = min(header_remaining, remaining) + header_chunk = self._header[ + self._header_offset : self._header_offset + int(to_read) + ] + result.extend(header_chunk) + self._header_offset += len(header_chunk) + remaining -= len(header_chunk) + + # Then read from data stream + if remaining > 0: + if remaining == float("inf"): + data_chunk = self._data_stream.read() + else: + data_chunk = self._data_stream.read(int(remaining)) + if data_chunk: + result.extend(data_chunk) + + return bytes(result) + + def seekable(self) -> bool: + """Return False - StreamingDataItem is forward-only after prepare().""" + return False + + def reset(self) -> None: + """ + Reset the streaming position to allow re-reading. + + This seeks the underlying data stream back to the start and + resets the header offset. + """ + if not self._prepared: + raise RuntimeError("Must call prepare() first") + self._header_offset = 0 + self._data_stream.seek(0) diff --git a/turbo_sdk/chunked.py b/turbo_sdk/chunked.py new file mode 100644 index 0000000..65bc3f0 --- /dev/null +++ b/turbo_sdk/chunked.py @@ -0,0 +1,306 @@ +"""Chunked/multipart upload support for large files""" + +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import BinaryIO, Optional, Union + +import requests + +from .types import ( + ChunkedUploadInit, + TurboUploadStatus, + ChunkingParams, + ProgressCallback, + TurboUploadResponse, +) + + +class ChunkedUploadError(Exception): + """Base exception for chunked upload errors""" + + pass + + +class UnderfundedError(ChunkedUploadError): + """Raised when account has insufficient balance (402)""" + + def __init__(self, message: str = "Insufficient balance for upload"): + self.status_code = 402 + super().__init__(message) + + +class UploadValidationError(ChunkedUploadError): + """Raised when upload validation fails""" + + pass + + +class UploadFinalizationError(ChunkedUploadError): + """Raised when upload finalization fails or times out""" + + pass + + +class ChunkedUploader: + """Manages multipart upload lifecycle for large files""" + + CHUNKING_VERSION = "2" + FINALIZED_STATES = {"FINALIZED"} + ERROR_STATES = {"INVALID", "UNDERFUNDED", "APPROVAL_FAILED"} + + def __init__( + self, + upload_url: str, + token: str, + chunking_params: Optional[ChunkingParams] = None, + ): + self.upload_url = upload_url + self.token = token + self.params = chunking_params or ChunkingParams() + self._session = requests.Session() + self._session.headers.update({"x-chunking-version": self.CHUNKING_VERSION}) + + def _get_poll_interval(self, total_bytes: int) -> float: + """Calculate poll interval based on data size (from TS SDK)""" + mb_100 = 100 * 1024 * 1024 + gb_3 = 3 * 1024 * 1024 * 1024 + + if total_bytes < mb_100: + return 2.0 + elif total_bytes < gb_3: + return 4.0 + else: + # 1.5 seconds per GiB, max 15 seconds + gib = total_bytes / (1024 * 1024 * 1024) + return min(1.5 * gib, 15.0) + + def _get_max_finalize_time(self, total_bytes: int) -> float: + """Calculate max finalization wait time (2.5 min per GiB)""" + gib = max(1, total_bytes / (1024 * 1024 * 1024)) + return gib * 150_000 / 1000 # Convert ms to seconds + + def initiate(self) -> ChunkedUploadInit: + """Initiate a new chunked upload session""" + url = f"{self.upload_url}/chunks/{self.token}/-1/-1" + params = {"chunkSize": self.params.chunk_byte_count} + + response = self._session.get(url, params=params) + + if response.status_code == 200: + data = response.json() + return ChunkedUploadInit( + id=data["id"], + min=data["min"], + max=data["max"], + chunk_size=data.get("chunkSize", self.params.chunk_byte_count), + ) + elif response.status_code == 503: + raise ChunkedUploadError(f"Service unavailable: {response.text}") + else: + raise ChunkedUploadError( + f"Failed to initiate upload: {response.status_code} - {response.text}" + ) + + def upload_chunk(self, upload_id: str, offset: int, data: bytes) -> None: + """Upload a single chunk""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/{offset}" + + response = self._session.post( + url, + data=data, + headers={"Content-Type": "application/octet-stream"}, + ) + + if response.status_code == 200: + return + elif response.status_code == 402: + raise UnderfundedError() + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found or expired") + else: + raise ChunkedUploadError( + f"Chunk upload failed: {response.status_code} - {response.text}" + ) + + def finalize(self, upload_id: str) -> None: + """Finalize the chunked upload (enqueue for processing)""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/finalize" + + response = self._session.post(url) + + if response.status_code == 202: + return + elif response.status_code == 402: + raise UnderfundedError() + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found or expired") + else: + raise ChunkedUploadError(f"Finalize failed: {response.status_code} - {response.text}") + + def get_status(self, upload_id: str) -> TurboUploadStatus: + """Get current upload status""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/status" + + response = self._session.get(url) + + if response.status_code == 200: + data = response.json() + receipt = data.get("receipt", {}) + return TurboUploadStatus( + status=data["status"], + timestamp=data["timestamp"], + id=receipt.get("id"), + owner=receipt.get("owner"), + data_caches=receipt.get("dataCaches", []), + fast_finality_indexes=receipt.get("fastFinalityIndexes", []), + winc=receipt.get("winc"), + ) + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found") + else: + raise ChunkedUploadError( + f"Status check failed: {response.status_code} - {response.text}" + ) + + def poll_for_finalization(self, upload_id: str, total_bytes: int) -> TurboUploadStatus: + """Poll until upload is finalized or fails""" + poll_interval = self._get_poll_interval(total_bytes) + max_wait = self._get_max_finalize_time(total_bytes) + start_time = time.time() + + while True: + elapsed = time.time() - start_time + if elapsed > max_wait: + raise UploadFinalizationError(f"Finalization timed out after {elapsed:.1f}s") + + status = self.get_status(upload_id) + + if status.status in self.FINALIZED_STATES: + return status + elif status.status == "UNDERFUNDED": + raise UnderfundedError() + elif status.status in self.ERROR_STATES: + raise UploadValidationError(f"Upload failed with status: {status.status}") + + time.sleep(poll_interval) + + def upload_chunks_sequential( + self, + upload_id: str, + data: Union[bytes, BinaryIO], + total_size: int, + on_progress: Optional[ProgressCallback] = None, + ) -> None: + """Upload chunks sequentially""" + chunk_size = self.params.chunk_byte_count + offset = 0 + processed = 0 + + while offset < total_size: + # Read chunk + if isinstance(data, bytes): + chunk = data[offset : offset + chunk_size] + else: + chunk = data.read(chunk_size) + if not chunk: + break + + # Upload chunk + self.upload_chunk(upload_id, offset, chunk) + + processed += len(chunk) + offset += len(chunk) + + if on_progress: + on_progress(processed, total_size) + + def upload_chunks_concurrent( + self, + upload_id: str, + data: bytes, + total_size: int, + on_progress: Optional[ProgressCallback] = None, + ) -> None: + """Upload chunks concurrently (requires bytes, not stream)""" + chunk_size = self.params.chunk_byte_count + max_workers = self.params.max_chunk_concurrency + + # Build list of chunks + chunks = [] + offset = 0 + while offset < total_size: + chunk_data = data[offset : offset + chunk_size] + chunks.append((offset, chunk_data)) + offset += len(chunk_data) + + processed = 0 + lock = None + if on_progress: + import threading + + lock = threading.Lock() + + def upload_one(args): + nonlocal processed + chunk_offset, chunk_data = args + self.upload_chunk(upload_id, chunk_offset, chunk_data) + if on_progress and lock: + with lock: + nonlocal processed + processed += len(chunk_data) + on_progress(processed, total_size) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(upload_one, chunk) for chunk in chunks] + for future in as_completed(futures): + future.result() # Raise any exceptions + + def upload( + self, + data: Union[bytes, BinaryIO], + total_size: Optional[int] = None, + on_progress: Optional[ProgressCallback] = None, + ) -> TurboUploadResponse: + """ + Perform complete chunked upload + + Args: + data: Data to upload (bytes or file-like object) + total_size: Total size in bytes (required for BinaryIO) + on_progress: Optional progress callback + + Returns: + TurboUploadResponse with transaction details + """ + # Determine total size + if isinstance(data, bytes): + total_size = len(data) + elif total_size is None: + raise ValueError("total_size required for file-like objects") + + # Initiate upload + init = self.initiate() + + try: + # Upload chunks + if self.params.max_chunk_concurrency > 1 and isinstance(data, bytes): + self.upload_chunks_concurrent(init.id, data, total_size, on_progress) + else: + self.upload_chunks_sequential(init.id, data, total_size, on_progress) + + # Finalize + self.finalize(init.id) + + # Poll for completion + status = self.poll_for_finalization(init.id, total_size) + + return TurboUploadResponse( + id=status.id or "", + owner=status.owner or "", + data_caches=status.data_caches, + fast_finality_indexes=status.fast_finality_indexes, + winc=status.winc or "0", + ) + except Exception: + # Could implement cleanup here in future + raise diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index bc3df46..6f9f207 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -1,7 +1,16 @@ +import io + import requests -from typing import List, Dict, Optional -from .types import TurboUploadResponse, TurboBalanceResponse -from .bundle import create_data, sign +from typing import BinaryIO, List, Dict, Optional, Union + +from .types import ( + TurboUploadResponse, + TurboBalanceResponse, + ChunkingParams, + ProgressCallback, +) +from .bundle import create_data, sign, StreamingDataItem +from .chunked import ChunkedUploader class Turbo: @@ -21,52 +30,119 @@ class Turbo: 3: "ethereum", # Ethereum ECDSA } - def __init__(self, signer, network: str = "mainnet"): + def __init__( + self, + signer, + network: str = "mainnet", + upload_url: Optional[str] = None, + payment_url: Optional[str] = None, + ): """ Initialize Turbo client Args: signer: Signer instance (ArweaveSigner or EthereumSigner) - network: Network ("mainnet" or "testnet") + network: Network ("mainnet" or "testnet") - used for default URLs + upload_url: Optional custom upload service URL (overrides network default) + payment_url: Optional custom payment service URL (overrides network default) """ self.signer = signer self.network = network - self.upload_url = self.SERVICE_URLS[network]["upload"] - self.payment_url = self.SERVICE_URLS[network]["payment"] + self.upload_url = upload_url or self.SERVICE_URLS[network]["upload"] + self.payment_url = payment_url or self.SERVICE_URLS[network]["payment"] # Determine token type from signer using lookup self.token = self.TOKEN_MAP.get(signer.signature_type) if not self.token: raise ValueError(f"Unsupported signer type: {signer.signature_type}") + # Default threshold for auto-chunking (5 MiB) + CHUNKING_THRESHOLD = 5 * 1024 * 1024 + def upload( - self, data: bytes, tags: Optional[List[Dict[str, str]]] = None + self, + data: Union[bytes, BinaryIO], + tags: Optional[List[Dict[str, str]]] = None, + on_progress: Optional[ProgressCallback] = None, + chunking: Optional[ChunkingParams] = None, + data_size: Optional[int] = None, ) -> TurboUploadResponse: """ Upload data with automatic signing Args: - data: Data to upload + data: Data to upload (bytes or file-like object) tags: Optional metadata tags + on_progress: Optional callback for progress reporting (processed_bytes, total_bytes) + chunking: Optional chunking configuration (defaults to auto mode) + data_size: Required when data is a file-like object Returns: TurboUploadResponse with transaction details Raises: Exception: If upload fails + UnderfundedError: If account balance is insufficient """ + # Determine data size + if isinstance(data, bytes): + size = len(data) + elif data_size is not None: + size = data_size + else: + raise ValueError("data_size is required when data is a file-like object") + + # Determine chunking mode + params = chunking or ChunkingParams() + use_chunked = self._should_use_chunked_upload(size, params) + + if use_chunked: + return self._upload_chunked(data, size, tags, on_progress, params) + else: + return self._upload_single(data, size, tags, on_progress) + + def _should_use_chunked_upload(self, size: int, params: ChunkingParams) -> bool: + """Determine if chunked upload should be used""" + if params.chunking_mode == "disabled": + return False + if params.chunking_mode == "force": + return True + # Auto mode: use chunked for files >= threshold + return size >= self.CHUNKING_THRESHOLD + + def _upload_single( + self, + data: Union[bytes, BinaryIO], + size: int, + tags: Optional[List[Dict[str, str]]], + on_progress: Optional[ProgressCallback], + ) -> TurboUploadResponse: + """Upload using single request (for small files)""" + # Read data if it's a stream + if not isinstance(data, bytes): + data = data.read() # Create and sign DataItem data_item = create_data(bytearray(data), self.signer, tags) sign(data_item, self.signer) + # Report signing complete (half the work) + if on_progress: + on_progress(size // 2, size) + # Upload to Turbo endpoint url = f"{self.upload_url}/tx/{self.token}" raw_data = data_item.get_raw() - headers = {"Content-Type": "application/octet-stream", "Content-Length": str(len(raw_data))} + headers = { + "Content-Type": "application/octet-stream", + "Content-Length": str(len(raw_data)), + } response = requests.post(url, data=raw_data, headers=headers) + if on_progress: + on_progress(size, size) + if response.status_code == 200: result = response.json() return TurboUploadResponse( @@ -79,6 +155,47 @@ def upload( else: raise Exception(f"Upload failed: {response.status_code} - {response.text}") + def _upload_chunked( + self, + data: Union[bytes, BinaryIO], + size: int, + tags: Optional[List[Dict[str, str]]], + on_progress: Optional[ProgressCallback], + params: ChunkingParams, + ) -> TurboUploadResponse: + """Upload using chunked/multipart upload (for large files)""" + # Wrap bytes in BytesIO for unified streaming path + if isinstance(data, bytes): + data_stream = io.BytesIO(data) + else: + data_stream = data + + # Use StreamingDataItem for all chunked uploads + # This signs data by streaming through it, avoiding memory duplication + streaming_item = StreamingDataItem( + data_stream=data_stream, + data_size=size, + signer=self.signer, + tags=tags, + ) + + # Prepare signs the data (streaming) and builds the header + total_size = streaming_item.prepare() + + # Create chunked uploader + uploader = ChunkedUploader( + upload_url=self.upload_url, + token=self.token, + chunking_params=params, + ) + + # Upload using the streaming item as a file-like object + return uploader.upload( + data=streaming_item, + total_size=total_size, + on_progress=on_progress, + ) + def get_balance(self, address: Optional[str] = None) -> TurboBalanceResponse: """ Get winston credit balance using signed request diff --git a/turbo_sdk/types.py b/turbo_sdk/types.py index fc878fa..432a96f 100644 --- a/turbo_sdk/types.py +++ b/turbo_sdk/types.py @@ -1,5 +1,50 @@ -from dataclasses import dataclass -from typing import List +from dataclasses import dataclass, field +from typing import List, Callable, Literal, Optional + +# Type aliases +ChunkingMode = Literal["auto", "force", "disabled"] +ProgressCallback = Callable[[int, int], None] # (processed_bytes, total_bytes) + + +@dataclass +class ChunkingParams: + """Configuration for chunked/multipart uploads""" + + chunk_byte_count: int = 5 * 1024 * 1024 # 5 MiB default + max_chunk_concurrency: int = 1 + chunking_mode: ChunkingMode = "auto" + max_finalize_ms: int = 150_000 # 2.5 minutes per GiB + + def __post_init__(self): + min_chunk = 5 * 1024 * 1024 # 5 MiB + max_chunk = 500 * 1024 * 1024 # 500 MiB + if not (min_chunk <= self.chunk_byte_count <= max_chunk): + raise ValueError(f"chunk_byte_count must be between {min_chunk} and {max_chunk} bytes") + if self.max_chunk_concurrency < 1: + raise ValueError("max_chunk_concurrency must be at least 1") + + +@dataclass +class ChunkedUploadInit: + """Response from chunked upload initiation""" + + id: str # Upload session ID + min: int # Minimum chunk size + max: int # Maximum chunk size + chunk_size: int # Requested chunk size + + +@dataclass +class TurboUploadStatus: + """Status of an upload (used for chunked upload polling)""" + + status: str # VALIDATING, ASSEMBLING, FINALIZING, FINALIZED, INVALID, UNDERFUNDED + timestamp: int + id: Optional[str] = None + owner: Optional[str] = None + data_caches: List[str] = field(default_factory=list) + fast_finality_indexes: List[str] = field(default_factory=list) + winc: Optional[str] = None @dataclass