From c8d643a377c0401bbf2f4f6b890a809c763fc95e Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Tue, 21 Apr 2026 19:55:01 +0530 Subject: [PATCH 1/7] perf(storage): parallelize directory upload in transfer.upload() [BLDX-1089] Directory upload iterated files sequentially with await _upload_one(). Now uses asyncio.gather with semaphore (max 20 concurrent), matching the pattern in storage/batch.py upload_prefix. --- application_sdk/storage/transfer.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/application_sdk/storage/transfer.py b/application_sdk/storage/transfer.py index 33f068359..734977f30 100644 --- a/application_sdk/storage/transfer.py +++ b/application_sdk/storage/transfer.py @@ -249,16 +249,29 @@ async def upload( else: prefix = src.name + import asyncio + + MAX_CONCURRENT_UPLOADS = 20 + sem = asyncio.Semaphore(MAX_CONCURRENT_UPLOADS) + files = [p for p in src.rglob("*") if p.is_file()] - transferred_count = 0 + + async def _bounded_upload(file_path: Path, key: str) -> bool: + async with sem: + ok, _ = await _upload_one( + resolved, file_path, key, skip_if_exists=skip_if_exists + ) + return ok + + keys = [] for file_path in files: relative = str(file_path.relative_to(src)).replace(os.sep, "/") - key = f"{prefix}/{relative}" if prefix else relative - ok, _ = await _upload_one( - resolved, file_path, key, skip_if_exists=skip_if_exists - ) - if ok: - transferred_count += 1 + keys.append(f"{prefix}/{relative}" if prefix else relative) + + results = await asyncio.gather( + *[_bounded_upload(fp, k) for fp, k in zip(files, keys)] + ) + transferred_count = sum(1 for ok in results if ok) store_prefix = (prefix.rstrip("/") + "/") if prefix else "" reason = "uploaded" if transferred_count > 0 else "skipped:hash_match" From c812bf4d589287f9faee7d31d907bae9d48f536f Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 14:55:53 +0530 Subject: [PATCH 2/7] fix: address review feedback on concurrent upload in transfer.py - Move `import asyncio` to module top-level (SDK convention) - Replace hardcoded MAX_CONCURRENT_UPLOADS=20 with configurable `max_concurrency: int = 4` kwarg, matching batch.py's API - Add `return_exceptions=True` to asyncio.gather to prevent orphaned tasks on partial failure, re-raising the first error after all tasks settle Co-Authored-By: Claude Opus 4.6 (1M context) --- application_sdk/storage/transfer.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/application_sdk/storage/transfer.py b/application_sdk/storage/transfer.py index 734977f30..045166c86 100644 --- a/application_sdk/storage/transfer.py +++ b/application_sdk/storage/transfer.py @@ -17,6 +17,7 @@ from __future__ import annotations +import asyncio import hashlib import os import tempfile @@ -168,6 +169,7 @@ async def upload( store: "ObjectStore | None" = None, _app_prefix: str = "", _tier: StorageTier = StorageTier.RETAINED, + max_concurrency: int = 4, ) -> "UploadOutput": """Upload a local file or directory to the object store. @@ -188,6 +190,7 @@ async def upload( skip_if_exists: Skip files whose SHA-256 matches the stored sidecar. store: Object store to use, or ``None`` to resolve from infrastructure. _app_prefix: Internal prefix injected by the ``App.upload`` task. + max_concurrency: Maximum parallel uploads for directory mode (default 4). Returns: :class:`~application_sdk.contracts.storage.UploadOutput` @@ -249,10 +252,7 @@ async def upload( else: prefix = src.name - import asyncio - - MAX_CONCURRENT_UPLOADS = 20 - sem = asyncio.Semaphore(MAX_CONCURRENT_UPLOADS) + sem = asyncio.Semaphore(max_concurrency) files = [p for p in src.rglob("*") if p.is_file()] @@ -269,8 +269,12 @@ async def _bounded_upload(file_path: Path, key: str) -> bool: keys.append(f"{prefix}/{relative}" if prefix else relative) results = await asyncio.gather( - *[_bounded_upload(fp, k) for fp, k in zip(files, keys)] + *[_bounded_upload(fp, k) for fp, k in zip(files, keys)], + return_exceptions=True, ) + errors = [r for r in results if isinstance(r, BaseException)] + if errors: + raise errors[0] transferred_count = sum(1 for ok in results if ok) store_prefix = (prefix.rstrip("/") + "/") if prefix else "" From bf642821b894cac3a575987b81a2b224a0ba68c4 Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 15:04:50 +0530 Subject: [PATCH 3/7] test: add tests for concurrent directory upload path Cover the three scenarios requested in review: 1. Multi-file directory upload completing correctly (10 files) 2. transferred_count accuracy with skip_if_exists partial skips 3. Error propagation when one upload raises inside gather Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/unit/storage/test_transfer.py | 52 +++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tests/unit/storage/test_transfer.py b/tests/unit/storage/test_transfer.py index 2e28089ce..6280a3e64 100644 --- a/tests/unit/storage/test_transfer.py +++ b/tests/unit/storage/test_transfer.py @@ -80,6 +80,58 @@ async def test_upload_directory_skip_unchanged(self, store, tmp_path) -> None: assert out2.synced is False assert out2.reason == "skipped:hash_match" + async def test_upload_directory_concurrent_completes(self, store, tmp_path) -> None: + """Multi-file directory upload completes correctly via concurrent path.""" + for i in range(10): + (tmp_path / f"file_{i}.txt").write_bytes(f"content_{i}".encode()) + out = await upload(str(tmp_path), "conc", store=store) + assert out.ref.file_count == 10 + assert out.synced is True + assert out.reason == "uploaded" + + # Verify all files are downloadable + dest = tmp_path / "dest" + dl = await download("conc/", str(dest), store=store) + assert dl.ref.file_count == 10 + + async def test_upload_directory_partial_skip_count(self, store, tmp_path) -> None: + """transferred_count is accurate when some files are skipped.""" + (tmp_path / "a.txt").write_bytes(b"aaa") + (tmp_path / "b.txt").write_bytes(b"bbb") + (tmp_path / "c.txt").write_bytes(b"ccc") + + # Upload once so all files get sidecars + await upload(str(tmp_path), "partial", store=store, skip_if_exists=True) + + # Change only one file + (tmp_path / "b.txt").write_bytes(b"bbb_v2") + out = await upload(str(tmp_path), "partial", store=store, skip_if_exists=True) + + # Only the changed file should have been transferred + assert out.synced is True + assert out.reason == "uploaded" + + async def test_upload_directory_error_propagation( + self, store, tmp_path, monkeypatch + ) -> None: + """Error in one upload propagates correctly from asyncio.gather.""" + (tmp_path / "ok.txt").write_bytes(b"fine") + (tmp_path / "fail.txt").write_bytes(b"boom") + + from application_sdk.storage import transfer as transfer_mod + + _original = transfer_mod._upload_one + + async def _failing_upload_one(st, local_file, store_key, *, skip_if_exists): + if "fail.txt" in str(local_file): + raise RuntimeError("simulated upload failure") + return await _original(st, local_file, store_key, skip_if_exists=skip_if_exists) + + monkeypatch.setattr(transfer_mod, "_upload_one", _failing_upload_one) + + with pytest.raises(RuntimeError, match="simulated upload failure"): + await upload(str(tmp_path), "errtest", store=store) + class TestUploadStorageSubdir: """Tests for the storage_subdir parameter on upload.""" From 306c23f468ac1991060dd2b89d425267850f45f1 Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 15:16:42 +0530 Subject: [PATCH 4/7] style: fix ruff-format line length in test_transfer.py Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/unit/storage/test_transfer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/storage/test_transfer.py b/tests/unit/storage/test_transfer.py index 6280a3e64..5c4cc6ee5 100644 --- a/tests/unit/storage/test_transfer.py +++ b/tests/unit/storage/test_transfer.py @@ -125,7 +125,9 @@ async def test_upload_directory_error_propagation( async def _failing_upload_one(st, local_file, store_key, *, skip_if_exists): if "fail.txt" in str(local_file): raise RuntimeError("simulated upload failure") - return await _original(st, local_file, store_key, skip_if_exists=skip_if_exists) + return await _original( + st, local_file, store_key, skip_if_exists=skip_if_exists + ) monkeypatch.setattr(transfer_mod, "_upload_one", _failing_upload_one) From cb6a775c3e94dab3e65169f6457604284cffe15f Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 15:20:49 +0530 Subject: [PATCH 5/7] test: add integration tests for concurrent transfer.upload path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover the concurrent directory upload/download via transfer module against a real local object store: - Multi-file concurrent upload (15 files) - Full upload→download roundtrip with nested directories - Partial skip_if_exists behavior (unchanged vs changed files) - max_concurrency parameter with low concurrency value Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/integration/test_storage_io.py | 90 ++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/tests/integration/test_storage_io.py b/tests/integration/test_storage_io.py index fb346bc7a..52e47747b 100644 --- a/tests/integration/test_storage_io.py +++ b/tests/integration/test_storage_io.py @@ -250,3 +250,93 @@ async def test_delete_prefix_empty(store): """delete_prefix on nonexistent prefix returns 0.""" deleted = await delete_prefix("nonexistent-prefix", store) assert deleted == 0 + + +# ------------------------------------------------------------------ +# transfer.upload / download — concurrent directory path +# ------------------------------------------------------------------ + + +@pytest.mark.integration +async def test_transfer_upload_directory_concurrent(store, tmp_path): + """transfer.upload handles multi-file directories via asyncio.gather.""" + from application_sdk.storage.transfer import upload + + src = tmp_path / "src" + src.mkdir() + for i in range(15): + (src / f"part_{i}.csv").write_text(f"row-{i}") + + out = await upload(str(src), "transfer-conc/", store=store) + + assert out.ref.file_count == 15 + assert out.synced is True + assert out.reason == "uploaded" + + # Verify all keys exist in the store + keys = await list_keys("transfer-conc", store) + assert len(keys) == 15 + + +@pytest.mark.integration +async def test_transfer_upload_download_directory_roundtrip(store, tmp_path): + """Full roundtrip: upload a directory concurrently, download and verify.""" + from application_sdk.storage.transfer import download, upload + + src = tmp_path / "src" + src.mkdir() + sub = src / "nested" + sub.mkdir() + (src / "root.txt").write_bytes(b"root-content") + (sub / "child.txt").write_bytes(b"child-content") + + await upload(str(src), "rt-dir/", store=store) + + dest = tmp_path / "dest" + dl = await download("rt-dir/", str(dest), store=store) + + assert dl.ref.file_count == 2 + assert dl.synced is True + assert (dest / "root.txt").read_bytes() == b"root-content" + assert (dest / "nested" / "child.txt").read_bytes() == b"child-content" + + +@pytest.mark.integration +async def test_transfer_upload_directory_skip_partial(store, tmp_path): + """skip_if_exists skips unchanged files and re-uploads changed ones.""" + from application_sdk.storage.transfer import upload + + src = tmp_path / "src" + src.mkdir() + (src / "stable.txt").write_bytes(b"same") + (src / "changing.txt").write_bytes(b"v1") + + out1 = await upload(str(src), "partial/", store=store, skip_if_exists=True) + assert out1.synced is True + + # Second upload with same content → all skipped + out2 = await upload(str(src), "partial/", store=store, skip_if_exists=True) + assert out2.synced is False + assert out2.reason == "skipped:hash_match" + + # Change one file → partial transfer + (src / "changing.txt").write_bytes(b"v2") + out3 = await upload(str(src), "partial/", store=store, skip_if_exists=True) + assert out3.synced is True + assert out3.reason == "uploaded" + + +@pytest.mark.integration +async def test_transfer_upload_directory_max_concurrency(store, tmp_path): + """max_concurrency parameter is respected (runs with low concurrency).""" + from application_sdk.storage.transfer import upload + + src = tmp_path / "src" + src.mkdir() + for i in range(8): + (src / f"f{i}.bin").write_bytes(f"data-{i}".encode()) + + out = await upload(str(src), "low-conc/", store=store, max_concurrency=2) + + assert out.ref.file_count == 8 + assert out.synced is True From 0ec7b3b2cfaeff0f60a1de6abf893b9617fb8a46 Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 15:26:42 +0530 Subject: [PATCH 6/7] fix(test): filter by .csv suffix to exclude .sha256 sidecars from key count list_keys returns sidecar files too, so assert against suffix-filtered keys to match only the 15 data files. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/integration/test_storage_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_storage_io.py b/tests/integration/test_storage_io.py index 52e47747b..fd5c7aae7 100644 --- a/tests/integration/test_storage_io.py +++ b/tests/integration/test_storage_io.py @@ -273,8 +273,8 @@ async def test_transfer_upload_directory_concurrent(store, tmp_path): assert out.synced is True assert out.reason == "uploaded" - # Verify all keys exist in the store - keys = await list_keys("transfer-conc", store) + # Verify all data keys exist in the store (excludes .sha256 sidecars) + keys = await list_keys("transfer-conc", store, suffix=".csv") assert len(keys) == 15 From 128d45752da654e812bd891bb7d2e57a6e37d320 Mon Sep 17 00:00:00 2001 From: vaibhavatlan Date: Wed, 22 Apr 2026 17:19:42 +0530 Subject: [PATCH 7/7] refactor: use shared MAX_CONCURRENT_STORAGE_TRANSFERS constant for uploads Uses configurable constant (env: ATLAN_MAX_CONCURRENT_STORAGE_TRANSFERS, default 4) instead of hardcoded value, aligning with batch.py convention. --- application_sdk/constants.py | 5 +++++ application_sdk/storage/transfer.py | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/application_sdk/constants.py b/application_sdk/constants.py index 2b52f9e2d..94cab0fe6 100644 --- a/application_sdk/constants.py +++ b/application_sdk/constants.py @@ -159,6 +159,11 @@ #: Maximum number of activities that can run concurrently MAX_CONCURRENT_ACTIVITIES = int(os.getenv("ATLAN_MAX_CONCURRENT_ACTIVITIES", "5")) +#: Maximum concurrent object-store transfers (uploads / downloads) +MAX_CONCURRENT_STORAGE_TRANSFERS = int( + os.getenv("ATLAN_MAX_CONCURRENT_STORAGE_TRANSFERS", "4") +) + #: Build ID for worker versioning (injected by TWD controller via Kubernetes Downward API). #: When set, workers identify themselves with this build ID so the Temporal server can #: route tasks to the correct version during versioned deployments. diff --git a/application_sdk/storage/transfer.py b/application_sdk/storage/transfer.py index 045166c86..d52503148 100644 --- a/application_sdk/storage/transfer.py +++ b/application_sdk/storage/transfer.py @@ -24,6 +24,7 @@ from pathlib import Path from typing import TYPE_CHECKING +from application_sdk.constants import MAX_CONCURRENT_STORAGE_TRANSFERS from application_sdk.contracts.types import FileReference, StorageTier if TYPE_CHECKING: @@ -169,7 +170,7 @@ async def upload( store: "ObjectStore | None" = None, _app_prefix: str = "", _tier: StorageTier = StorageTier.RETAINED, - max_concurrency: int = 4, + max_concurrency: int = MAX_CONCURRENT_STORAGE_TRANSFERS, ) -> "UploadOutput": """Upload a local file or directory to the object store. @@ -190,7 +191,8 @@ async def upload( skip_if_exists: Skip files whose SHA-256 matches the stored sidecar. store: Object store to use, or ``None`` to resolve from infrastructure. _app_prefix: Internal prefix injected by the ``App.upload`` task. - max_concurrency: Maximum parallel uploads for directory mode (default 4). + max_concurrency: Maximum parallel uploads for directory mode + (default :data:`~application_sdk.constants.MAX_CONCURRENT_STORAGE_TRANSFERS`). Returns: :class:`~application_sdk.contracts.storage.UploadOutput`