Skip to content

Commit e6c063f

Browse files
committed
Add PythonChatModel Example in e2e test
1 parent 8d04d7f commit e6c063f

File tree

3 files changed

+217
-0
lines changed

3 files changed

+217
-0
lines changed

e2e-test/integration-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ under the License.
7474
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
7575
<version>${project.version}</version>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.flink</groupId>
79+
<artifactId>flink-python</artifactId>
80+
<version>${flink.version}</version>
81+
</dependency>
7782
</dependencies>
7883

7984
</project>
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.integration.test;
19+
20+
import org.apache.flink.agents.api.Agent;
21+
import org.apache.flink.agents.api.InputEvent;
22+
import org.apache.flink.agents.api.OutputEvent;
23+
import org.apache.flink.agents.api.annotation.Action;
24+
import org.apache.flink.agents.api.annotation.ChatModelConnection;
25+
import org.apache.flink.agents.api.annotation.ChatModelSetup;
26+
import org.apache.flink.agents.api.annotation.Tool;
27+
import org.apache.flink.agents.api.annotation.ToolParam;
28+
import org.apache.flink.agents.api.chat.messages.ChatMessage;
29+
import org.apache.flink.agents.api.chat.messages.MessageRole;
30+
import org.apache.flink.agents.api.context.RunnerContext;
31+
import org.apache.flink.agents.api.event.ChatRequestEvent;
32+
import org.apache.flink.agents.api.event.ChatResponseEvent;
33+
import org.apache.flink.agents.api.resource.ResourceDescriptor;
34+
import org.apache.flink.agents.api.resource.ResourceType;
35+
import org.apache.flink.agents.api.resource.python.PythonChatModelConnection;
36+
import org.apache.flink.agents.api.resource.python.PythonChatModelSetup;
37+
38+
import java.util.Collections;
39+
import java.util.List;
40+
41+
/**
42+
* Agent example integrating a built-in Python chat model into Flink Agents.
43+
*
44+
* <p>This class demonstrates how to:
45+
*
46+
* <ul>
47+
* <li>Declare a chat model connection using {@link ChatModelConnection} metadata pointing to
48+
* {@link PythonChatModelConnection}
49+
* <li>Declare a chat model setup using {@link ChatModelSetup} metadata pointing to {@link
50+
* PythonChatModelSetup}
51+
* <li>Expose callable tools via {@link Tool} annotated static methods (temperature conversion,
52+
* BMI, random number)
53+
* <li>Fetch a chat model from the {@link RunnerContext} and perform a single-turn chat
54+
* <li>Emit the model response as an {@link OutputEvent}
55+
* </ul>
56+
*
57+
* <p>The {@code pythonChatModel()} method publishes a resource with type {@link
58+
* ResourceType#CHAT_MODEL} so it can be retrieved at runtime inside the {@code process} action. The
59+
* resource is configured with the connection name, the model name and the list of tool names that
60+
* the model is allowed to call.
61+
*/
62+
public class AgentWithPythonChatModel extends Agent {
63+
@ChatModelConnection
64+
public static ResourceDescriptor pythonChatModelConnection() {
65+
return ResourceDescriptor.Builder.newBuilder(PythonChatModelConnection.class.getName())
66+
.addInitialArgument(
67+
"module", "flink_agents.integrations.chat_models.tongyi_chat_model")
68+
.addInitialArgument("clazz", "TongyiChatModelConnection")
69+
.build();
70+
}
71+
72+
@ChatModelSetup
73+
public static ResourceDescriptor pythonChatModel() {
74+
return ResourceDescriptor.Builder.newBuilder(PythonChatModelSetup.class.getName())
75+
.addInitialArgument("connection", "pythonChatModelConnection")
76+
.addInitialArgument(
77+
"module", "flink_agents.integrations.chat_models.tongyi_chat_model")
78+
.addInitialArgument("clazz", "TongyiChatModelSetup")
79+
.addInitialArgument("model", "qwen-plus")
80+
.addInitialArgument(
81+
"tools",
82+
List.of("calculateBMI", "convertTemperature", "createRandomNumber"))
83+
.addInitialArgument("extract_reasoning", "true")
84+
.build();
85+
}
86+
87+
@Tool(description = "Converts temperature between Celsius and Fahrenheit")
88+
public static double convertTemperature(
89+
@ToolParam(name = "value", description = "Temperature value to convert") Double value,
90+
@ToolParam(
91+
name = "fromUnit",
92+
description = "Source unit ('C' for Celsius or 'F' for Fahrenheit)")
93+
String fromUnit,
94+
@ToolParam(
95+
name = "toUnit",
96+
description = "Target unit ('C' for Celsius or 'F' for Fahrenheit)")
97+
String toUnit) {
98+
99+
fromUnit = fromUnit.toUpperCase();
100+
toUnit = toUnit.toUpperCase();
101+
102+
if (fromUnit.equals(toUnit)) {
103+
return value;
104+
}
105+
106+
if (fromUnit.equals("C") && toUnit.equals("F")) {
107+
return (value * 9 / 5) + 32;
108+
} else if (fromUnit.equals("F") && toUnit.equals("C")) {
109+
return (value - 32) * 5 / 9;
110+
} else {
111+
throw new IllegalArgumentException("Invalid temperature units. Use 'C' or 'F'");
112+
}
113+
}
114+
115+
@Tool(description = "Calculates Body Mass Index (BMI)")
116+
public static double calculateBMI(
117+
@ToolParam(name = "weightKg", description = "Weight in kilograms") Double weightKg,
118+
@ToolParam(name = "heightM", description = "Height in meters") Double heightM) {
119+
120+
if (weightKg <= 0 || heightM <= 0) {
121+
throw new IllegalArgumentException("Weight and height must be positive values");
122+
}
123+
return weightKg / (heightM * heightM);
124+
}
125+
126+
@Tool(description = "Create a random number")
127+
public static double createRandomNumber() {
128+
return Math.random();
129+
}
130+
131+
@Action(listenEvents = {InputEvent.class})
132+
public static void process(InputEvent event, RunnerContext ctx) throws Exception {
133+
ctx.sendEvent(
134+
new ChatRequestEvent(
135+
"pythonChatModel",
136+
Collections.singletonList(
137+
new ChatMessage(MessageRole.USER, (String) event.getInput()))));
138+
}
139+
140+
@Action(listenEvents = {ChatResponseEvent.class})
141+
public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx) {
142+
ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
143+
}
144+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.integration.test;
20+
21+
import org.apache.flink.agents.api.AgentsExecutionEnvironment;
22+
import org.apache.flink.api.java.functions.KeySelector;
23+
import org.apache.flink.streaming.api.datastream.DataStream;
24+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25+
26+
/**
27+
* Example application that applies {@link AgentWithPythonChatModel} to a DataStream of user
28+
* prompts.
29+
*/
30+
public class AgentWithPythonChatModelExample {
31+
/** Runs the example pipeline. */
32+
public static void main(String[] args) throws Exception {
33+
// Create the execution environment
34+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
35+
env.setParallelism(1);
36+
37+
// Use prompts that trigger different tool calls in the agent
38+
DataStream<String> inputStream =
39+
env.fromData(
40+
"Convert 25 degrees Celsius to Fahrenheit",
41+
"What is 98.6 Fahrenheit in Celsius?",
42+
"Change 32 degrees Celsius to Fahrenheit",
43+
"If it's 75 degrees Fahrenheit, what would that be in Celsius?",
44+
"Convert room temperature of 20C to F",
45+
"Calculate BMI for someone who is 1.75 meters tall and weighs 70 kg",
46+
"What's the BMI for a person weighing 85 kg with height 1.80 meters?",
47+
"Can you tell me the BMI if I'm 1.65m tall and weigh 60kg?",
48+
"Find BMI for 75kg weight and 1.78m height",
49+
"Create me a random number please");
50+
51+
// Create agents execution environment
52+
AgentsExecutionEnvironment agentsEnv =
53+
AgentsExecutionEnvironment.getExecutionEnvironment(env);
54+
55+
// Apply agent to the DataStream and use the prompt itself as the key
56+
DataStream<Object> outputStream =
57+
agentsEnv
58+
.fromDataStream(inputStream, (KeySelector<String, String>) value -> value)
59+
.apply(new AgentWithPythonChatModel())
60+
.toDataStream();
61+
62+
// Print the results
63+
outputStream.print();
64+
65+
// Execute the pipeline
66+
agentsEnv.execute();
67+
}
68+
}

0 commit comments

Comments
 (0)