Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion dapr_agents/agents/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@

import re
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, MutableMapping, Optional, Sequence, Type
from typing import (
Any,
Callable,
Dict,
List,
MutableMapping,
Optional,
Sequence,
Type,
Union,
)

from pydantic import BaseModel

Expand Down Expand Up @@ -265,3 +275,23 @@ class AgentExecutionConfig:
# TODO: add stop_at_tokens
max_iterations: int = 10
tool_choice: Optional[str] = "auto"


@dataclass
class WorkflowRetryPolicy:
"""
Configuration for durable retry policies in workflows.

Attributes:
max_attempts: Maximum number of retry attempts.
initial_backoff_seconds: Initial backoff interval in seconds.
max_backoff_seconds: Maximum backoff interval in seconds.
backoff_multiplier: Multiplier for exponential backoff.
retry_timeout: Optional total timeout for all retries in seconds.
"""

max_attempts: Optional[int] = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is asking for infinite retries by default. In dapr we have -1 in places to signal infinity. Would it make sense to allow for infinite retries here in that same manner as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that's not how it is in the upstream sdk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so as-is, there is no way to specify retries indefinitely? maybe we should tweak the upstream sdk eventually to support this :)

this PR is a good start!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cicoyle yes I concur. This should be fixed upstream in the python-sdk then carried down here. I'll put it on our todo list!

initial_backoff_seconds: Optional[int] = 5
max_backoff_seconds: Optional[int] = 30
backoff_multiplier: Optional[float] = 1.5
Comment on lines +293 to +296
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason there are four fields here yet docs show 5 options we can set?
https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#retry-policies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an oversight on my part. I've added the 5th as well!

retry_timeout: Optional[Union[int, None]] = None
35 changes: 34 additions & 1 deletion dapr_agents/agents/durable.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

from datetime import timedelta
import json
import logging
from typing import Any, Dict, Iterable, List, Optional
from os import getenv

import dapr.ext.workflow as wf

Expand All @@ -14,6 +16,7 @@
AgentRegistryConfig,
AgentStateConfig,
WorkflowGrpcOptions,
WorkflowRetryPolicy,
)
from dapr_agents.agents.prompting import AgentProfileConfig
from dapr_agents.agents.schemas import (
Expand All @@ -26,7 +29,6 @@
from dapr_agents.types import (
AgentError,
LLMChatResponse,
ToolExecutionRecord,
ToolMessage,
UserMessage,
)
Expand Down Expand Up @@ -76,6 +78,7 @@ def __init__(
agent_metadata: Optional[Dict[str, Any]] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
runtime: Optional[wf.WorkflowRuntime] = None,
retry_policy: WorkflowRetryPolicy = WorkflowRetryPolicy(),
) -> None:
"""
Initialize behavior, infrastructure, and workflow runtime.
Expand Down Expand Up @@ -104,6 +107,7 @@ def __init__(
agent_metadata: Extra metadata to publish to the registry.
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
runtime: Optional pre-existing workflow runtime to attach to.
retry_policy: Durable retry policy configuration.
"""
super().__init__(
pubsub=pubsub,
Expand Down Expand Up @@ -132,6 +136,28 @@ def __init__(
self._registered = False
self._started = False

try:
retries = int(getenv("DAPR_API_MAX_RETRIES", ""))
except ValueError:
retries = retry_policy.max_attempts

if retries < 1:
raise (
ValueError("max_attempts or DAPR_API_MAX_RETRIES must be at least 1.")
)
Comment on lines +145 to +147
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider if we want to deviate here. The default that i see in docs is actually a 0 value for this... which I think for durable agent durability we want retries enabled, just want confirmation on that :)
https://docs.dapr.io/developing-applications/sdks/python/python-client/#initialising-the-client

Copy link
Contributor Author

@CasperGN CasperGN Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to check on 1, else it might fail upstream


self._retry_policy: wf.RetryPolicy = wf.RetryPolicy(
max_number_of_attempts=retries,
first_retry_interval=timedelta(
seconds=retry_policy.initial_backoff_seconds
),
max_retry_interval=timedelta(seconds=retry_policy.max_backoff_seconds),
backoff_coefficient=retry_policy.backoff_multiplier,
retry_timeout=timedelta(seconds=retry_policy.retry_timeout)
if retry_policy.retry_timeout
else None,
)

# ------------------------------------------------------------------
# Runtime accessors
# ------------------------------------------------------------------
Expand Down Expand Up @@ -203,6 +229,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"start_time": ctx.current_utc_datetime.isoformat(),
"trace_context": otel_span_context,
},
retry_policy=self._retry_policy,
)

final_message: Dict[str, Any] = {}
Expand All @@ -226,6 +253,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"instance_id": ctx.instance_id,
"time": ctx.current_utc_datetime.isoformat(),
},
retry_policy=self._retry_policy,
)

tool_calls = assistant_response.get("tool_calls") or []
Expand All @@ -246,6 +274,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"time": ctx.current_utc_datetime.isoformat(),
"order": idx,
},
retry_policy=self._retry_policy,
)
for idx, tc in enumerate(tool_calls)
]
Expand All @@ -257,6 +286,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"tool_results": tool_results,
"instance_id": ctx.instance_id,
},
retry_policy=self._retry_policy,
)

task = None # prepare for next turn
Expand Down Expand Up @@ -298,6 +328,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
yield ctx.call_activity(
self.broadcast_message_to_agents,
input={"message": final_message},
retry_policy=self._retry_policy,
)

# Optionally send a direct response back to the trigger origin.
Expand All @@ -309,6 +340,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"target_agent": source,
"target_instance_id": trigger_instance_id,
},
retry_policy=self._retry_policy,
)

# Finalize the workflow entry in durable state.
Expand All @@ -320,6 +352,7 @@ def agent_workflow(self, ctx: wf.DaprWorkflowContext, message: dict):
"end_time": ctx.current_utc_datetime.isoformat(),
"triggering_workflow_instance_id": trigger_instance_id,
},
retry_policy=self._retry_policy,
)

if not ctx.is_replaying:
Expand Down
Loading