Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Dec 3, 2025

📄 7% (0.07x) speedup for JiraDataSource.set_workflow_scheme_issue_type in backend/python/app/sources/external/jira/jira.py

⏱️ Runtime : 2.32 milliseconds 2.16 milliseconds (best of 49 runs)

📝 Explanation and details

The optimization achieves a 7% runtime improvement by eliminating unnecessary function calls and reducing object allocations in the HTTP request preparation path.

Key optimizations:

  1. Eliminated redundant _as_str_dict() calls: The original code called _as_str_dict() three times (63.8% of total execution time), creating temporary dictionaries and string conversions. The optimized version inlines these operations as dict comprehensions directly in the HTTPRequest constructor, reducing function call overhead.

  2. Streamlined header initialization: Replaced dict(headers or {}) with a conditional {**headers} pattern, avoiding the dict() constructor call and the or {} evaluation when headers are provided.

  3. Optimized empty query params handling: Since _query is always empty in this context, the optimized version directly passes an empty dict {} instead of calling _as_str_dict(_query).

  4. Improved HTTPClient header merging: Added a guard check if self.headers: to avoid unnecessary dictionary copying when the base headers are empty.

Performance impact analysis:

  • The line profiler shows the HTTPRequest construction time dropped from 7.6ms (63.8%) to 2.25ms (22.9%) - a significant reduction in the hottest code path
  • However, the throughput decreased by 5.8%, suggesting the optimization may have negative effects under concurrent load scenarios typical in HTTP client usage
  • The runtime improvement is most beneficial for single-request scenarios or low-concurrency workloads where the reduced object allocation overhead provides clear gains

Trade-offs:
This optimization prioritizes single-request latency over throughput, making it suitable for applications where individual request speed matters more than concurrent processing capacity.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 358 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 94.4%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions
from typing import Any, Dict, Optional

import pytest  # used for our unit tests
from app.sources.external.jira.jira import JiraDataSource


# Minimal stubs for HTTPRequest/HTTPResponse for testing
class HTTPRequest:
    def __init__(self, method: str, url: str, headers: Dict[str, str], path_params: Dict[str, str], query_params: Dict[str, str], body: Dict[str, Any]):
        self.method = method
        self.url = url
        self.headers = headers
        self.path_params = path_params
        self.query_params = query_params
        self.body = body

class HTTPResponse:
    def __init__(self, data: Any):
        self.data = data

# Minimal JiraRESTClientViaApiKey stub
class JiraRESTClientViaApiKey:
    def __init__(self, base_url: str, email: str, api_key: str):
        self.base_url = base_url

    def get_base_url(self) -> str:
        return self.base_url

    async def execute(self, request: HTTPRequest, **kwargs) -> HTTPResponse:
        # Simulate a response based on URL and body for testing
        return HTTPResponse({
            'method': request.method,
            'url': request.url,
            'headers': request.headers,
            'path_params': request.path_params,
            'query_params': request.query_params,
            'body': request.body
        })

class JiraClient:
    def __init__(self, client: JiraRESTClientViaApiKey):
        self.client = client

    def get_client(self):
        return self.client
from app.sources.external.jira.jira import JiraDataSource

# ------------------- UNIT TESTS -------------------

@pytest.fixture
def jira_data_source():
    # Provide a JiraDataSource with a working client
    base_url = "https://jira.example.com"
    client = JiraRESTClientViaApiKey(base_url, "test@example.com", "apikey123")
    jira_client = JiraClient(client)
    return JiraDataSource(jira_client)

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_basic(jira_data_source):
    """Test basic async behavior and expected response structure."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(123, "bug")

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_with_body(jira_data_source):
    """Test with all body parameters provided."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(
        456, "task", issueType_body="task", updateDraftIfNeeded=True, workflow="custom_workflow"
    )

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_with_headers(jira_data_source):
    """Test with custom headers provided."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(
        789, "story", headers={"X-Test-Header": "testval"}
    )

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_invalid_client():
    """Test ValueError if client is None."""
    class BadClient:
        def get_client(self):
            return None
    with pytest.raises(ValueError, match="HTTP client is not initialized"):
        JiraDataSource(BadClient())

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_missing_get_base_url():
    """Test ValueError if client lacks get_base_url."""
    class BadClientObj:
        pass
    class BadClient:
        def get_client(self):
            return BadClientObj()
    with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
        JiraDataSource(BadClient())

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_empty_strings(jira_data_source):
    """Test with empty strings for issueType and workflow."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(0, "", workflow="")

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_none_body_fields(jira_data_source):
    """Test with None for all optional body fields."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(1, "improvement", issueType_body=None, updateDraftIfNeeded=None, workflow=None)

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_concurrent(jira_data_source):
    """Test concurrent execution of multiple requests."""
    coros = [
        jira_data_source.set_workflow_scheme_issue_type(i, f"type{i}", workflow=f"wf{i}")
        for i in range(5)
    ]
    results = await asyncio.gather(*coros)
    # Each result should be unique and correct
    for i, resp in enumerate(results):
        pass

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_large_concurrent(jira_data_source):
    """Test with a large number of concurrent requests (but <100)."""
    coros = [
        jira_data_source.set_workflow_scheme_issue_type(i, f"type{i}", workflow=f"wf{i}")
        for i in range(50)
    ]
    results = await asyncio.gather(*coros)
    # Spot check some results
    for idx in [0, 25, 49]:
        resp = results[idx]

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_varied_inputs(jira_data_source):
    """Test with varied types for optional parameters."""
    resp = await jira_data_source.set_workflow_scheme_issue_type(
        101, "epic", issueType_body="epic", updateDraftIfNeeded=False, workflow=None
    )

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_throughput_small_load(jira_data_source):
    """Throughput: small load test (10 requests)."""
    coros = [
        jira_data_source.set_workflow_scheme_issue_type(i, f"type{i}")
        for i in range(10)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_throughput_medium_load(jira_data_source):
    """Throughput: medium load test (50 requests)."""
    coros = [
        jira_data_source.set_workflow_scheme_issue_type(i, f"type{i}", workflow="wf")
        for i in range(50)
    ]
    results = await asyncio.gather(*coros)
    for resp in results:
        pass

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_throughput_varying_headers(jira_data_source):
    """Throughput: test with varying headers."""
    coros = [
        jira_data_source.set_workflow_scheme_issue_type(
            i, f"type{i}", headers={"X-Req": f"val{i}"}
        )
        for i in range(20)
    ]
    results = await asyncio.gather(*coros)
    for i, resp in enumerate(results):
        pass

@pytest.mark.asyncio
import asyncio  # Used to run async functions

import pytest  # Used for our unit tests
from app.sources.external.jira.jira import JiraDataSource


# Mocks and helpers for HTTPRequest/HTTPResponse
class HTTPRequest:
    def __init__(self, method, url, headers=None, path_params=None, query_params=None, body=None):
        self.method = method
        self.url = url
        self.headers = headers or {}
        self.path_params = path_params or {}
        self.query_params = query_params or {}
        self.body = body

class HTTPResponse:
    def __init__(self, response_data):
        self.response_data = response_data
    def json(self):
        return self.response_data

# Minimal stub for JiraRESTClientViaApiKey
class JiraRESTClientViaApiKey:
    def __init__(self, base_url, email, api_key):
        self.base_url = base_url
    def get_base_url(self):
        return self.base_url
    async def execute(self, request):
        # Simulate a response object based on request
        return HTTPResponse({
            "method": request.method,
            "url": request.url,
            "headers": request.headers,
            "path_params": request.path_params,
            "query_params": request.query_params,
            "body": request.body
        })

# Minimal stub for JiraClient
class JiraClient:
    def __init__(self, client):
        self.client = client
    def get_client(self):
        return self.client
from app.sources.external.jira.jira import JiraDataSource

# ---------------------- UNIT TESTS -----------------------

# BASIC TEST CASES

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_basic_minimal():
    """Test basic usage with only required arguments."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    resp = await ds.set_workflow_scheme_issue_type(123, "bug")
    data = resp.json()

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_basic_all_args():
    """Test usage with all possible arguments."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    resp = await ds.set_workflow_scheme_issue_type(
        456, "task", issueType_body="task", updateDraftIfNeeded=True, workflow="custom_workflow",
        headers={"X-Test": "yes"}
    )
    data = resp.json()

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_basic_async_await():
    """Test that the function returns a coroutine and can be awaited."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    # Should return a coroutine before awaiting
    codeflash_output = ds.set_workflow_scheme_issue_type(1, "story"); coro = codeflash_output
    resp = await coro

# EDGE TEST CASES

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_concurrent_execution():
    """Test concurrent execution of multiple requests."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    tasks = [
        ds.set_workflow_scheme_issue_type(i, f"type_{i}")
        for i in range(10)
    ]
    responses = await asyncio.gather(*tasks)
    for i, resp in enumerate(responses):
        data = resp.json()

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_invalid_client_raises():
    """Test that ValueError is raised if HTTP client is not initialized."""
    class DummyClient:
        def get_client(self):
            return None  # Simulate missing client
    with pytest.raises(ValueError, match="HTTP client is not initialized"):
        JiraDataSource(DummyClient())

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_missing_base_url_method_raises():
    """Test that ValueError is raised if client lacks get_base_url method."""
    class NoBaseUrlClient:
        pass
    class DummyClient:
        def get_client(self):
            return NoBaseUrlClient()
    with pytest.raises(ValueError, match="HTTP client does not have get_base_url method"):
        JiraDataSource(DummyClient())

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_empty_headers_dict():
    """Test that passing empty headers still sets Content-Type."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    resp = await ds.set_workflow_scheme_issue_type(1, "story", headers={})
    data = resp.json()

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_none_headers():
    """Test that passing None for headers sets Content-Type."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    resp = await ds.set_workflow_scheme_issue_type(1, "story", headers=None)
    data = resp.json()

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_body_fields_none():
    """Test that body fields set to None do not appear in body."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    resp = await ds.set_workflow_scheme_issue_type(1, "story", issueType_body=None, updateDraftIfNeeded=None, workflow=None)
    data = resp.json()

# LARGE SCALE TEST CASES

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_large_scale_concurrent():
    """Test many concurrent requests for scalability (up to 50)."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    n = 50
    tasks = [
        ds.set_workflow_scheme_issue_type(i, f"type_{i}", issueType_body=f"body_{i}", workflow=f"workflow_{i}")
        for i in range(n)
    ]
    responses = await asyncio.gather(*tasks)
    for i, resp in enumerate(responses):
        data = resp.json()

# THROUGHPUT TEST CASES

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_throughput_small_load():
    """Throughput test: small load (5 requests)."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    tasks = [
        ds.set_workflow_scheme_issue_type(i, f"type_{i}")
        for i in range(5)
    ]
    responses = await asyncio.gather(*tasks)
    for resp in responses:
        pass

@pytest.mark.asyncio
async def test_set_workflow_scheme_issue_type_throughput_medium_load():
    """Throughput test: medium load (20 requests)."""
    client = JiraRESTClientViaApiKey("https://jira.example.com", "email", "api_key")
    ds = JiraDataSource(JiraClient(client))
    tasks = [
        ds.set_workflow_scheme_issue_type(i, f"type_{i}", workflow="wf")
        for i in range(20)
    ]
    responses = await asyncio.gather(*tasks)
    for resp in responses:
        pass

@pytest.mark.asyncio

To edit these changes git checkout codeflash/optimize-JiraDataSource.set_workflow_scheme_issue_type-miqigkdm and push.

Codeflash Static Badge

The optimization achieves a **7% runtime improvement** by eliminating unnecessary function calls and reducing object allocations in the HTTP request preparation path.

**Key optimizations:**

1. **Eliminated redundant `_as_str_dict()` calls**: The original code called `_as_str_dict()` three times (63.8% of total execution time), creating temporary dictionaries and string conversions. The optimized version inlines these operations as dict comprehensions directly in the `HTTPRequest` constructor, reducing function call overhead.

2. **Streamlined header initialization**: Replaced `dict(headers or {})` with a conditional `{**headers}` pattern, avoiding the `dict()` constructor call and the `or {}` evaluation when headers are provided.

3. **Optimized empty query params handling**: Since `_query` is always empty in this context, the optimized version directly passes an empty dict `{}` instead of calling `_as_str_dict(_query)`.

4. **Improved HTTPClient header merging**: Added a guard check `if self.headers:` to avoid unnecessary dictionary copying when the base headers are empty.

**Performance impact analysis:**
- The line profiler shows the `HTTPRequest` construction time dropped from 7.6ms (63.8%) to 2.25ms (22.9%) - a significant reduction in the hottest code path
- However, the **throughput decreased by 5.8%**, suggesting the optimization may have negative effects under concurrent load scenarios typical in HTTP client usage
- The runtime improvement is most beneficial for **single-request scenarios** or **low-concurrency workloads** where the reduced object allocation overhead provides clear gains

**Trade-offs:**
This optimization prioritizes single-request latency over throughput, making it suitable for applications where individual request speed matters more than concurrent processing capacity.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 December 3, 2025 21:20
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash labels Dec 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant