forked from Y-Research-SBU/QuantAgent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindicator_agent.py
More file actions
77 lines (70 loc) · 3.19 KB
/
indicator_agent.py
File metadata and controls
77 lines (70 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Agent for technical indicator analysis in high-frequency trading (HFT) context.
Uses LLM and toolkit to compute and interpret indicators like MACD, RSI, ROC, Stochastic, and Williams %R.
"""
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import ToolMessage, AIMessage
import json
def create_indicator_agent(llm, toolkit):
"""
Create an indicator analysis agent node for HFT. The agent uses LLM and indicator tools to analyze OHLCV data.
"""
def indicator_agent_node(state):
# --- Tool definitions ---
tools = [
toolkit.compute_macd,
toolkit.compute_rsi,
toolkit.compute_roc,
toolkit.compute_stoch,
toolkit.compute_willr,
]
time_frame = state['time_frame']
# --- System prompt for LLM ---
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a high-frequency trading (HFT) analyst assistant operating under time-sensitive conditions. "
"You must analyze technical indicators to support fast-paced trading execution.\n\n"
"You have access to tools: compute_rsi, compute_macd, compute_roc, compute_stoch, and compute_willr. "
"Use them by providing appropriate arguments like `kline_data` and the respective periods.\n\n"
f"⚠️ The OHLC data provided is from a {time_frame} intervals, reflecting recent market behavior. "
"You must interpret this data quickly and accurately.\n\n"
"Here is the OHLC data:\n{kline_data}.\n\n"
"Call necessary tools, and analyze the results.\n"
),
MessagesPlaceholder(variable_name="messages"),
]
).partial(
kline_data=json.dumps(state["kline_data"], indent=2)
)
chain = prompt | llm.bind_tools(tools)
messages = state["messages"]
# --- Step 1: Ask for tool calls ---
ai_response = chain.invoke(messages)
messages.append(ai_response)
# --- Step 2: Collect tool results ---
if hasattr(ai_response, "tool_calls"):
for call in ai_response.tool_calls:
tool_name = call["name"]
tool_args = call["args"]
# Always provide kline_data
import copy
tool_args["kline_data"] = copy.deepcopy(state["kline_data"])
# Lookup tool by name
tool_fn = next(t for t in tools if t.name == tool_name)
tool_result = tool_fn.invoke(tool_args)
# Append result as ToolMessage
messages.append(
ToolMessage(
tool_call_id=call["id"],
content=json.dumps(tool_result)
)
)
# --- Step 3: Re-run the chain with tool results ---
final_response = chain.invoke(messages)
return {
"messages": messages + [final_response],
"indicator_report": final_response.content,
}
return indicator_agent_node