Refactor: Abstract BaseNode for common graph logic#31
Refactor: Abstract BaseNode for common graph logic#31AkashBhadana wants to merge 2 commits intocuga-project:mainfrom
Conversation
Added an abstract BaseNode class that serves as a common template for graph nodes. This centralizes shared logic such as state handling, routing, and agent invocation into a single reusable implementation.
📝 WalkthroughWalkthroughThis PR centralizes node orchestration by adding a BaseNode with a HumanInTheLoopHandler and feature flags, and simplifies FinalAnswerNode to a single async _invoke that delegates response generation to the shared base processing pipeline. Changes
Sequence Diagram(s)sequenceDiagram
participant State as AgentState
participant Node as BaseNode.node_handler
participant Pre as pre_invoke_handler
participant Agent as Agent (FinalAnswerAgent / BaseAgent)
participant Proc as _generate_and_process_response
participant HITL as HumanInTheLoopHandler
State->>Node: invoke node_handler(state, agent, name)
Node->>Pre: pre_invoke_handler(state, name)
alt pre-invoke routed
Pre->>State: update state, emit AIMessage, set tracking
end
Node->>Agent: call _invoke(state, agent, name)
Agent->>Proc: run agent, parse response, log messages
Proc->>State: update final output / mark end-of-flow
Proc->>HITL: if HITL required -> handle_human_response(state, name)
HITL->>State: return Command (e.g., SAVE_REUSE / CONTINUE)
HITL->>Node: node_handler resumes/reroutes as needed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/cuga/backend/cuga_graph/nodes/answer/final_answer.py`:
- Around line 31-36: The call to _generate_and_process_response in
FinalAnswerNode (final_answer.py) passes the wrong keyword: replace
output_model=FinalAnswerOutput with response_parser=FinalAnswerOutput so the
parameter name matches BaseNode._generate_and_process_response; ensure you only
change the keyword name (keep state=state, agent=agent, name=name) and run tests
to confirm no TypeError occurs.
In `@src/cuga/backend/cuga_graph/nodes/shared/base_node.py`:
- Around line 227-229: Wrap the call to json.loads(response.content) used when
building parsed_output in a try/except that catches json.JSONDecodeError (and
optionally TypeError) and raises/logs a clearer error including the raw
response.content and context (e.g., which node or request) so debugging
malformed agent output is easier; update the code around parsed_output =
response_parser(**json.loads(response.content)) to parse the JSON inside the try
block, and on exception call processLogger.error or raise a new
ValueError/RuntimeError with a message that includes response.content and
mentions response_parser/base_node to locate the failure.
🧹 Nitpick comments (3)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py (3)
27-29: GlobalActivityTrackerinstance may complicate testing.The module-level
trackersingleton makes it harder to mock or isolate in unit tests. Consider injecting the tracker as a dependency or using a factory pattern to allow test isolation.
91-92: Use explicitNonecheck instead of truthiness.
if not agentwould fail for any agent that evaluates to falsy (e.g., a mock object). Useif agent is Nonefor explicit null checking.Suggested fix
- if not agent: + if agent is None: raise ValueError("Agent must be provided")
149-202: Consider extracting common logic to reduce duplication.All three sender-check branches follow the same pattern: set
state.sender, build an output dict, append anAIMessage, track the step, and return aCommandtoEND. Only the "thoughts" message andfinal_answersource differ.Example refactor using a helper
`@staticmethod` def _build_pre_invoke_response( state: AgentState, name: str, thoughts: list[str], final_answer: str ) -> Command: state.sender = name state.final_answer = final_answer output = {"thoughts": thoughts, "final_answer": final_answer} state.messages.append(AIMessage(content=json.dumps(output), name=name)) tracker.collect_step(Step(name=name, data=json.dumps(output))) return Command(update=state.model_dump(), goto=NodeNames.END) `@staticmethod` async def pre_invoke_handler(state: AgentState, name: str) -> Optional[Command]: if state.sender == NodeNames.CHAT_AGENT: return BaseNode._build_pre_invoke_response( state, name, ["Chat response provided directly."], state.chat_agent_messages[-1].content ) if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer: return BaseNode._build_pre_invoke_response( state, name, ["No applications matched the request. Providing available applications information."], state.final_answer ) if state.sender == NodeNames.CUGA_LITE: return BaseNode._build_pre_invoke_response( state, name, ["Cuga lite response provided directly."], state.final_answer ) return None
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/cuga/backend/cuga_graph/nodes/answer/final_answer.pysrc/cuga/backend/cuga_graph/nodes/shared/base_node.py
🧰 Additional context used
🧬 Code graph analysis (1)
src/cuga/backend/cuga_graph/nodes/answer/final_answer.py (3)
src/cuga/backend/cuga_graph/nodes/answer/final_answer_agent/final_answer_agent.py (1)
FinalAnswerAgent(27-106)src/cuga/backend/cuga_graph/nodes/shared/base_node.py (3)
BaseNode(85-250)_invoke(206-213)_generate_and_process_response(216-250)src/cuga/backend/cuga_graph/utils/nodes_names.py (1)
NodeNames(7-24)
🪛 Ruff (0.14.11)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py
92-92: Avoid specifying long messages outside the exception class
(TRY003)
🔇 Additional comments (3)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py (2)
35-79: LGTM!The
HumanInTheLoopHandlerprovides a clean, extensible pattern for action dispatch with sensible defaults and the ability to register custom handlers.
128-140: Post-invocation routing may override_invokeresults unexpectedly.The check
state.sender == NodeNames.PLAN_CONTROLLER_AGENTat line 132 evaluates the sender value that existed beforeinvoke_methodran. If_invokemodifiesstate.sender(asFinalAnswerNode._invokedoes at line 38), this condition might not reflect the intended state. Additionally, when the condition is true, thecommandreturned frominvoke_methodis discarded entirely.Verify that this behavior is intentional and that discarding the
_invokeresult in favor of HITL routing is the expected flow.src/cuga/backend/cuga_graph/nodes/answer/final_answer.py (1)
14-18: LGTM!Clean inheritance pattern - the constructor properly delegates to
BaseNodeand the class is focused on domain-specific behavior.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
- Corrected parameter name from 'output_model' to 'response_parser' in FinalAnswerNode. - Added try-except block for JSON decoding in BaseNode to handle malformed responses.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/cuga/backend/cuga_graph/nodes/shared/base_node.py`:
- Around line 129-141: Capture the original sender into a local variable before
calling invoke_method (e.g., original_sender = state.sender), then after
awaiting invoke_method(state, agent, name) use original_sender in the
post-invocation check instead of state.sender to decide the HITL routing; keep
the rest of the logic (set state.hitl_action via create_save_reuse_action(), set
state.sender = name, return Command(update=state.model_dump(),
goto=NodeNames.SUGGEST_HUMAN_ACTIONS)) unchanged so any mutations made by
invoke_method still persist but the routing decision is based on who initiated
the request.
🧹 Nitpick comments (4)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py (4)
92-93: Consider more explicit agent validation.The check
if not agentrelies on truthiness, which could pass for any truthy object. Consider usingif agent is Noneor adding a type check to ensureagentis actually aBaseAgentinstance.Suggested improvement
- if not agent: - raise ValueError("Agent must be provided") + if agent is None: + raise ValueError("Agent must be provided")Or for stricter validation:
if not isinstance(agent, BaseAgent): raise TypeError(f"Expected BaseAgent instance, got {type(agent).__name__}")
143-204: Significant code duplication across pre-invoke branches.The three branches for
CHAT_AGENT,TASK_ANALYZER_AGENT, andCUGA_LITEshare nearly identical logic. Consider extracting a helper to reduce duplication and improve maintainability.Additionally, this method is marked
asyncbut contains noawaitexpressions.Suggested refactor
`@staticmethod` - async def pre_invoke_handler( + def pre_invoke_handler( state: AgentState, name: str, ) -> Optional[Command]: """Handles specific cases before the main agent invocation.""" + def _create_early_return(thoughts: list[str], final_answer: str) -> Command: + state.sender = name + state.final_answer = final_answer + output = {"thoughts": thoughts, "final_answer": final_answer} + state.messages.append(AIMessage(content=json.dumps(output), name=name)) + tracker.collect_step(Step(name=name, data=json.dumps(output))) + return Command(update=state.model_dump(), goto=NodeNames.END) + if state.sender == NodeNames.CHAT_AGENT: - state.sender = name final_answer = state.chat_agent_messages[-1].content - - state.final_answer = final_answer - output = { - "thoughts": ["Chat response provided directly."], - "final_answer": final_answer, - } - - state.messages.append( - AIMessage(content=json.dumps(output), name=name) - ) - tracker.collect_step( - Step(name=name, data=json.dumps(output)) - ) - - return Command(update=state.model_dump(), goto=NodeNames.END) + return _create_early_return(["Chat response provided directly."], final_answer) - if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer: - # ... similar reduction for other branches + if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer: + return _create_early_return( + ["No applications matched the request. Providing available applications information."], + state.final_answer, + ) + + if state.sender == NodeNames.CUGA_LITE: + return _create_early_return( + ["Cuga lite response provided directly."], + state.final_answer, + ) + + return NoneNote: If you remove
async, update the call site on line 125 accordingly.
206-214: RedundantNotImplementedErrorin abstract method.The
@abstractmethoddecorator already prevents instantiation of classes that don't implement_invoke. The explicitraise NotImplementedErroris unnecessary.Suggested simplification
`@abstractmethod` async def _invoke( self, state: AgentState, agent: BaseAgent, name: str, ) -> Command: """Node-specific invocation logic.""" - raise NotImplementedError + ...
228-234: Uselogging.exceptionto capture full traceback.Per static analysis,
logging.exceptionautomatically includes the traceback, which is more useful for debugging thanlogging.errorwhen an exception is being handled.Suggested fix
try: parsed_output = response_parser( **json.loads(response.content) ) except json.JSONDecodeError: - logging.error(f"Failed to decode JSON from agent response: {response.content}") + logging.exception( + "Failed to decode JSON from agent response: %s", response.content + ) raise
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/cuga/backend/cuga_graph/nodes/answer/final_answer.pysrc/cuga/backend/cuga_graph/nodes/shared/base_node.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cuga/backend/cuga_graph/nodes/answer/final_answer.py
🧰 Additional context used
🪛 Ruff (0.14.11)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py
93-93: Avoid specifying long messages outside the exception class
(TRY003)
233-233: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🔇 Additional comments (3)
src/cuga/backend/cuga_graph/nodes/shared/base_node.py (3)
1-31: Module setup looks appropriate.Imports are well-organized and the module-level feature flags provide a clean way to control HITL behavior. The singleton
trackerinstance is a common pattern for activity tracking.
36-81: Clean HITL handler implementation.The strategy pattern for action handlers is well-suited for extensibility. The default fallback to
NodeNames.ENDprovides safe behavior for unhandled actions.
236-255: Response processing logic is well-structured.The chat message construction, activity tracking, and variable replacement flow logically. State updates are appropriately sequenced.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| # Main invocation | ||
| command = await invoke_method(state, agent, name) | ||
|
|
||
| # Post-invocation HITL routing | ||
| if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT: | ||
| state.hitl_action = create_save_reuse_action() | ||
| state.sender = name | ||
| return Command( | ||
| update=state.model_dump(), | ||
| goto=NodeNames.SUGGEST_HUMAN_ACTIONS, | ||
| ) | ||
|
|
||
| return command |
There was a problem hiding this comment.
Post-invocation check may read stale state.sender.
The condition on line 133 checks state.sender == NodeNames.PLAN_CONTROLLER_AGENT after invoke_method has executed. If _invoke or any called method modifies state.sender, this check will evaluate against the modified value rather than the original sender.
Consider capturing the original sender before invocation if the post-invocation routing should be based on who initiated the request:
Suggested fix
+ original_sender = state.sender
+
# Main invocation
command = await invoke_method(state, agent, name)
# Post-invocation HITL routing
- if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT:
+ if ENABLE_SAVE_REUSE and original_sender == NodeNames.PLAN_CONTROLLER_AGENT:
state.hitl_action = create_save_reuse_action()
state.sender = name
return Command(
update=state.model_dump(),
goto=NodeNames.SUGGEST_HUMAN_ACTIONS,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Main invocation | |
| command = await invoke_method(state, agent, name) | |
| # Post-invocation HITL routing | |
| if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT: | |
| state.hitl_action = create_save_reuse_action() | |
| state.sender = name | |
| return Command( | |
| update=state.model_dump(), | |
| goto=NodeNames.SUGGEST_HUMAN_ACTIONS, | |
| ) | |
| return command | |
| # Main invocation | |
| original_sender = state.sender | |
| command = await invoke_method(state, agent, name) | |
| # Post-invocation HITL routing | |
| if ENABLE_SAVE_REUSE and original_sender == NodeNames.PLAN_CONTROLLER_AGENT: | |
| state.hitl_action = create_save_reuse_action() | |
| state.sender = name | |
| return Command( | |
| update=state.model_dump(), | |
| goto=NodeNames.SUGGEST_HUMAN_ACTIONS, | |
| ) | |
| return command |
🤖 Prompt for AI Agents
In `@src/cuga/backend/cuga_graph/nodes/shared/base_node.py` around lines 129 -
141, Capture the original sender into a local variable before calling
invoke_method (e.g., original_sender = state.sender), then after awaiting
invoke_method(state, agent, name) use original_sender in the post-invocation
check instead of state.sender to decide the HITL routing; keep the rest of the
logic (set state.hitl_action via create_save_reuse_action(), set state.sender =
name, return Command(update=state.model_dump(),
goto=NodeNames.SUGGEST_HUMAN_ACTIONS)) unchanged so any mutations made by
invoke_method still persist but the routing decision is based on who initiated
the request.
This refactoring introduces an abstract
BaseNodeclass that serves as a common template for all graph nodes, significantly reducing code duplication and improving overall maintainability.Motivation
During an analysis of the agent architecture, it became clear that individual node implementations (e.g.,
FinalAnswerNode,PlanControllerNode) contained a large amount of repeated boilerplate code for state handling, routing, and other shared responsibilities. This duplication increased complexity and made the codebase harder to maintain and evolve.Changes Implemented
Created Abstract
BaseNodeA new
BaseNodeclass was created innodes/shared/base_node.py. It uses the Template Method design pattern to encapsulate all generic node logic, including:node_handlerexecution flowRefactored
FinalAnswerNodeThe
FinalAnswerNodeclass was refactored to inherit fromBaseNode. This change reduced its implementation by over 70%, resulting in a clean and focused class that contains only logic specific to generating a final answer.Impact
Reduces Boilerplate
Future nodes can be implemented with minimal code by inheriting shared behavior from
BaseNode.Improves Maintainability
Core node execution logic is now centralized, allowing changes to be made in one place rather than across multiple node implementations.
Increases Code Readability
Node classes are shorter, clearer, and easier to understand, as they now express only their unique, domain-specific behavior.
Summary by CodeRabbit
Refactor
New Features
✏️ Tip: You can customize this high-level summary in your review settings.