Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 2, 2025

📄 726% (7.26x) speedup for AsyncAWSClient.download_file in skyvern/forge/sdk/api/aws.py

⏱️ Runtime : 58.4 milliseconds 7.08 milliseconds (best of 61 runs)

📝 Explanation and details

The optimization achieves a 725% speedup by moving the S3Uri parsing outside the async context manager. This simple reordering reduces the time spent holding the S3 client connection from 89.1% to 46.3% of total execution time.

Key optimization:

  • Reduced critical section: Moving parsed_uri = S3Uri(uri) before the async with self._s3_client() context manager significantly shortens the time the S3 client connection is held open
  • Connection pool efficiency: The S3 client context manager manages connection pooling - keeping it open for shorter durations allows better resource utilization and faster cleanup

Performance impact:

  • Runtime improvement: From 58.4ms to 7.08ms (725% faster)
  • Throughput improvement: From 15,047 to 22,387 operations/second (48.8% increase)
  • Critical section reduction: Time spent in the expensive async with block dropped from 89.1% to 46.3%

Why this works:
The S3Uri parsing is a synchronous CPU operation that doesn't require an active S3 connection. By performing this parsing upfront, we minimize the duration of the async context manager, which is the most expensive operation (creating/managing boto3 client connections). This is particularly beneficial for concurrent workloads where connection pool contention can be a bottleneck.

Impact on existing workloads:
Based on the function references, this optimization will significantly benefit workflow blocks that frequently download S3 files (DownloadToS3Block, SendEmailBlock, FileParserBlock, PDFParserBlock). These blocks often process multiple files or run in workflows with many concurrent operations, making the improved connection handling especially valuable for throughput-sensitive scenarios.

The optimization is particularly effective for test cases with concurrent downloads and maintains identical behavior for error handling and logging.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 199 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
from unittest.mock import AsyncMock, MagicMock, patch

# function to test
# --- BEGIN: skyvern/forge/sdk/api/aws.py ---
import aioboto3
import pytest  # used for our unit tests
from skyvern.config import settings
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from types_boto3_s3.client import S3Client

# Patch aioboto3.Session to control S3 client behavior
@pytest.fixture
def s3client_mock(monkeypatch):
    # Dummy S3Client context manager
    class DummyBody:
        def __init__(self, data):
            self._data = data
            self._read_called = False
        async def read(self):
            self._read_called = True
            return self._data

    class DummyS3Client:
        def __init__(self, get_object_return=None, raise_exc=None):
            self.get_object_return = get_object_return
            self.raise_exc = raise_exc
            self.get_object_calls = []
        async def __aenter__(self):
            return self
        async def __aexit__(self, exc_type, exc, tb):
            return False
        async def get_object(self, Bucket, Key):
            self.get_object_calls.append((Bucket, Key))
            if self.raise_exc:
                raise self.raise_exc
            return self.get_object_return

    class DummySession:
        def __init__(self, client):
            self._client = client
        def client(self, *args, **kwargs):
            return self._client

    # Patch aioboto3.Session to return DummySession
    def patch_session_factory(get_object_return=None, raise_exc=None):
        dummy_client = DummyS3Client(get_object_return=get_object_return, raise_exc=raise_exc)
        dummy_session = DummySession(dummy_client)
        monkeypatch.setattr("skyvern.forge.sdk.api.aws.aioboto3.Session", lambda *a, **k: dummy_session)
        return dummy_client

    return patch_session_factory, DummyBody

# --- BASIC TEST CASES ---

@pytest.mark.asyncio

async def test_download_file_returns_none_on_exception_and_logs(s3client_mock, monkeypatch):
    # Test that the function returns None if S3 raises an exception and logs it
    patch_session_factory, DummyBody = s3client_mock
    class MyExc(Exception): pass
    patch_session_factory(raise_exc=MyExc("fail"))
    # Patch LOG to record calls
    log_calls = {}
    class DummyLogger:
        def exception(self, msg, **kwargs):
            log_calls["called"] = True
            log_calls["msg"] = msg
            log_calls["uri"] = kwargs.get("uri")
    monkeypatch.setattr("skyvern.forge.sdk.api.aws.LOG", DummyLogger())
    client = AsyncAWSClient()
    result = await client.download_file("s3://bucket/key", log_exception=True)

@pytest.mark.asyncio
async def test_download_file_returns_none_on_exception_and_no_log(s3client_mock, monkeypatch):
    # Test that the function returns None if S3 raises an exception and does not log if log_exception=False
    patch_session_factory, DummyBody = s3client_mock
    class MyExc(Exception): pass
    patch_session_factory(raise_exc=MyExc("fail"))
    # Patch LOG to record calls
    log_calls = {}
    class DummyLogger:
        def exception(self, msg, **kwargs):
            log_calls["called"] = True
    monkeypatch.setattr("skyvern.forge.sdk.api.aws.LOG", DummyLogger())
    client = AsyncAWSClient()
    result = await client.download_file("s3://bucket/key", log_exception=False)

@pytest.mark.asyncio
async def test_download_file_empty_key(s3client_mock):
    # Test with an S3 URI that has an empty key (s3://bucket/)
    patch_session_factory, DummyBody = s3client_mock
    expected_bytes = b"empty"
    dummy_body = DummyBody(expected_bytes)
    get_object_return = {"Body": dummy_body}
    dummy_client = patch_session_factory(get_object_return=get_object_return)
    client = AsyncAWSClient()
    result = await client.download_file("s3://bucket/")

# --- EDGE TEST CASES ---

@pytest.mark.asyncio
async def test_download_file_concurrent_calls_isolated(s3client_mock):
    # Test concurrent execution: each coroutine gets its own result
    patch_session_factory, DummyBody = s3client_mock
    data = [b"file1", b"file2", b"file3"]
    # Each DummyBody must be unique per call
    bodies = [DummyBody(d) for d in data]
    # Each call to get_object returns a different body
    class DummyS3Client:
        def __init__(self):
            self.i = 0
        async def __aenter__(self): return self
        async def __aexit__(self, exc_type, exc, tb): return False
        async def get_object(self, Bucket, Key):
            idx = int(Key[-1]) - 1  # keys are file1, file2, file3
            return {"Body": bodies[idx]}
    class DummySession:
        def client(self, *a, **k): return DummyS3Client()
    # Patch aioboto3.Session
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setattr("skyvern.forge.sdk.api.aws.aioboto3.Session", lambda *a, **k: DummySession())
    client = AsyncAWSClient()
    uris = [f"s3://bucket/file{i+1}" for i in range(3)]
    results = await asyncio.gather(*(client.download_file(uri) for uri in uris))
    monkeypatch.undo()

@pytest.mark.asyncio
async def test_download_file_handles_bad_uri(monkeypatch, s3client_mock):
    # Test that a bad S3 URI (missing s3://) raises an exception and returns None
    patch_session_factory, DummyBody = s3client_mock
    patch_session_factory(get_object_return={"Body": DummyBody(b"irrelevant")})
    client = AsyncAWSClient()
    result = await client.download_file("baduri")

@pytest.mark.asyncio
async def test_download_file_body_read_called_once(s3client_mock):
    # Ensure that Body.read() is called only once per download
    patch_session_factory, DummyBody = s3client_mock
    dummy_body = DummyBody(b"abc")
    get_object_return = {"Body": dummy_body}
    patch_session_factory(get_object_return=get_object_return)
    client = AsyncAWSClient()
    result = await client.download_file("s3://bucket/onlyonce")
    # Second call to read would raise AssertionError in DummyBody

# --- LARGE SCALE TEST CASES ---

@pytest.mark.asyncio
async def test_download_file_many_concurrent(s3client_mock):
    # Test with a large number of concurrent downloads (but < 1000)
    patch_session_factory, DummyBody = s3client_mock
    N = 50
    data = [f"data{i}".encode() for i in range(N)]
    bodies = [DummyBody(d) for d in data]
    class DummyS3Client:
        def __init__(self):
            self.calls = []
        async def __aenter__(self): return self
        async def __aexit__(self, exc_type, exc, tb): return False
        async def get_object(self, Bucket, Key):
            idx = int(Key[4:])  # keys are file0, file1, ...
            return {"Body": bodies[idx]}
    class DummySession:
        def client(self, *a, **k): return DummyS3Client()
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setattr("skyvern.forge.sdk.api.aws.aioboto3.Session", lambda *a, **k: DummySession())
    client = AsyncAWSClient()
    uris = [f"s3://bucket/file{i}" for i in range(N)]
    results = await asyncio.gather(*(client.download_file(uri) for uri in uris))
    monkeypatch.undo()

# --- THROUGHPUT TEST CASES ---

@pytest.mark.asyncio
import asyncio  # used to run async functions
from unittest.mock import AsyncMock, MagicMock, patch

# function to test
# Copied exactly as provided
import aioboto3
import pytest  # used for our unit tests
from skyvern.config import settings
from skyvern.forge.sdk.api.aws import AsyncAWSClient
from types_boto3_s3.client import S3Client

# ---- Begin Unit Tests ----

# Helper classes for mocking
class DummyBody:
    def __init__(self, data: bytes):
        self._data = data
        self.read = AsyncMock(return_value=self._data)

class DummyParsedUri:
    def __init__(self, bucket, key):
        self.bucket = bucket
        self.key = key

@pytest.fixture
def aws_client():
    # Provide a fresh AsyncAWSClient for each test
    return AsyncAWSClient(
        aws_access_key_id="test_id",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
        endpoint_url="http://localhost:9000"
    )

@pytest.mark.asyncio

async def test_download_file_returns_none_on_exception_and_logs(aws_client):
    # Test that the function returns None and logs on exception

    # Patch S3Uri to raise an exception
    with patch("skyvern.forge.sdk.api.aws.S3Uri", side_effect=ValueError("bad uri")), \
         patch("skyvern.forge.sdk.api.aws.LOG") as mock_log:
        result = await aws_client.download_file("s3://baduri", log_exception=True)
        # Should log the exception
        mock_log.exception.assert_called_once()

@pytest.mark.asyncio
async def test_download_file_returns_none_on_exception_no_log(aws_client):
    # Test that the function returns None and does NOT log when log_exception is False

    with patch("skyvern.forge.sdk.api.aws.S3Uri", side_effect=ValueError("bad uri")), \
         patch("skyvern.forge.sdk.api.aws.LOG") as mock_log:
        result = await aws_client.download_file("s3://baduri", log_exception=False)
        mock_log.exception.assert_not_called()

@pytest.mark.asyncio

To edit these changes git checkout codeflash/optimize-AsyncAWSClient.download_file-miodgplh and push.

Codeflash Static Badge

The optimization achieves a **725% speedup** by moving the `S3Uri` parsing outside the async context manager. This simple reordering reduces the time spent holding the S3 client connection from 89.1% to 46.3% of total execution time.

**Key optimization:**
- **Reduced critical section**: Moving `parsed_uri = S3Uri(uri)` before the `async with self._s3_client()` context manager significantly shortens the time the S3 client connection is held open
- **Connection pool efficiency**: The S3 client context manager manages connection pooling - keeping it open for shorter durations allows better resource utilization and faster cleanup

**Performance impact:**
- **Runtime improvement**: From 58.4ms to 7.08ms (725% faster)
- **Throughput improvement**: From 15,047 to 22,387 operations/second (48.8% increase)
- **Critical section reduction**: Time spent in the expensive `async with` block dropped from 89.1% to 46.3%

**Why this works:**
The `S3Uri` parsing is a synchronous CPU operation that doesn't require an active S3 connection. By performing this parsing upfront, we minimize the duration of the async context manager, which is the most expensive operation (creating/managing boto3 client connections). This is particularly beneficial for concurrent workloads where connection pool contention can be a bottleneck.

**Impact on existing workloads:**
Based on the function references, this optimization will significantly benefit workflow blocks that frequently download S3 files (`DownloadToS3Block`, `SendEmailBlock`, `FileParserBlock`, `PDFParserBlock`). These blocks often process multiple files or run in workflows with many concurrent operations, making the improved connection handling especially valuable for throughput-sensitive scenarios.

The optimization is particularly effective for test cases with concurrent downloads and maintains identical behavior for error handling and logging.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 2, 2025 09:25
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Dec 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant