Skip to content

Commit 57b88dc

Browse files
committed
[Feature][java] Support Python ChatModel in Java
1 parent 82a2c30 commit 57b88dc

File tree

19 files changed

+1019
-26
lines changed

19 files changed

+1019
-26
lines changed

api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ under the License.
5757
<version>${flink.version}</version>
5858
<scope>provided</scope>
5959
</dependency>
60+
<dependency>
61+
<groupId>com.alibaba</groupId>
62+
<artifactId>pemja</artifactId>
63+
<version>${pemja.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
6066
</dependencies>
6167

6268
</project>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.resource.python;
19+
20+
import org.apache.flink.agents.api.chat.messages.ChatMessage;
21+
import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
22+
import org.apache.flink.agents.api.resource.Resource;
23+
import org.apache.flink.agents.api.resource.ResourceDescriptor;
24+
import org.apache.flink.agents.api.resource.ResourceType;
25+
import org.apache.flink.agents.api.tools.Tool;
26+
import pemja.core.object.PyObject;
27+
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.function.BiFunction;
31+
32+
/**
33+
* Python-based implementation of ChatModelConnection that wraps a Python chat model object. This
34+
* class serves as a bridge between Java and Python chat model environments, but unlike {@link
35+
* PythonChatModelSetup}, it does not provide direct chat functionality in Java.
36+
*/
37+
public class PythonChatModelConnection extends BaseChatModelConnection
38+
implements PythonResourceWrapper {
39+
private PyObject chatModel;
40+
41+
/**
42+
* Creates a new PythonChatModelConnection.
43+
*
44+
* @param adapter The Python resource adapter (required by PythonResourceProvider's
45+
* reflection-based instantiation but not used directly in this implementation)
46+
* @param chatModel The Python chat model object
47+
* @param descriptor The resource descriptor
48+
* @param getResource Function to retrieve resources by name and type
49+
*/
50+
public PythonChatModelConnection(
51+
PythonResourceAdapter adapter,
52+
PyObject chatModel,
53+
ResourceDescriptor descriptor,
54+
BiFunction<String, ResourceType, Resource> getResource) {
55+
super(descriptor, getResource);
56+
this.chatModel = chatModel;
57+
}
58+
59+
@Override
60+
public Object getPythonResource() {
61+
return chatModel;
62+
}
63+
64+
@Override
65+
public ChatMessage chat(
66+
List<ChatMessage> messages, List<Tool> tools, Map<String, Object> arguments) {
67+
throw new UnsupportedOperationException(
68+
"Chat method of PythonChatModelConnection cannot be called directly from Java runtime. "
69+
+ "This connection serves as a Python resource wrapper only. "
70+
+ "Chat operations should be performed on the Python side using the underlying Python chat model object.");
71+
}
72+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.resource.python;
19+
20+
import org.apache.flink.agents.api.chat.messages.ChatMessage;
21+
import org.apache.flink.agents.api.chat.messages.MessageRole;
22+
import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
23+
import org.apache.flink.agents.api.resource.Resource;
24+
import org.apache.flink.agents.api.resource.ResourceDescriptor;
25+
import org.apache.flink.agents.api.resource.ResourceType;
26+
import pemja.core.object.PyObject;
27+
28+
import java.util.ArrayList;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.function.BiFunction;
33+
34+
/**
35+
* Python-based implementation of ChatModelSetup that bridges Java and Python chat model
36+
* functionality. This class wraps a Python chat model setup object and provides Java interface
37+
* compatibility while delegating actual chat operations to the underlying Python implementation.
38+
*/
39+
public class PythonChatModelSetup extends BaseChatModelSetup implements PythonResourceWrapper {
40+
private static final String FROM_JAVA_CHAT_MESSAGE = "python_java_utils.from_java_chat_message";
41+
42+
private static final String UPDATE_JAVA_CHAT_MESSAGE =
43+
"python_java_utils.update_java_chat_message";
44+
45+
private final PyObject chatModelSetup;
46+
private final PythonResourceAdapter adapter;
47+
48+
public PythonChatModelSetup(
49+
PythonResourceAdapter adapter,
50+
PyObject chatModelSetup,
51+
ResourceDescriptor descriptor,
52+
BiFunction<String, ResourceType, Resource> getResource) {
53+
super(descriptor, getResource);
54+
this.chatModelSetup = chatModelSetup;
55+
this.adapter = adapter;
56+
}
57+
58+
@Override
59+
public ChatMessage chat(List<ChatMessage> messages, Map<String, Object> parameters) {
60+
Map<String, Object> kwargs = new HashMap<>(parameters);
61+
62+
List<Object> pythonMessages = new ArrayList<>();
63+
for (ChatMessage message : messages) {
64+
pythonMessages.add(toPythonChatMessage(message));
65+
}
66+
67+
kwargs.put("messages", pythonMessages);
68+
69+
Object pythonMessageResponse = adapter.callMethod(chatModelSetup, "chat", kwargs);
70+
return fromPythonChatMessage(pythonMessageResponse);
71+
}
72+
73+
/**
74+
* Converts a Java ChatMessage object to its Python equivalent.
75+
*
76+
* @param message the Java ChatMessage to convert
77+
* @return the Python representation of the chat message
78+
*/
79+
private Object toPythonChatMessage(ChatMessage message) {
80+
return adapter.invoke(FROM_JAVA_CHAT_MESSAGE, message);
81+
}
82+
83+
/**
84+
* Converts a Python chat message object back to a Java ChatMessage.
85+
*
86+
* @param pythonChatMessage the Python chat message object to convert
87+
* @return the Java ChatMessage representation
88+
*/
89+
private ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
90+
ChatMessage message = new ChatMessage();
91+
92+
String roleValue =
93+
(String) adapter.invoke(UPDATE_JAVA_CHAT_MESSAGE, pythonChatMessage, message);
94+
95+
message.setRole(MessageRole.fromValue(roleValue));
96+
97+
return message;
98+
}
99+
100+
@Override
101+
public Object getPythonResource() {
102+
return chatModelSetup;
103+
}
104+
105+
@Override
106+
public Map<String, Object> getParameters() {
107+
return Map.of();
108+
}
109+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.api.resource.python;
20+
21+
import pemja.core.object.PyObject;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* Adapter interface for managing Python resources and facilitating Java-Python interoperability.
27+
* This interface provides methods to interact with Python objects, invoke Python methods, and
28+
* handle data conversion between Java and Python environments.
29+
*/
30+
public interface PythonResourceAdapter {
31+
32+
/**
33+
* Retrieves a Python resource by name and type.
34+
*
35+
* @param resourceName the name of the resource to retrieve
36+
* @param resourceType the type of the resource
37+
* @return the retrieved resource object
38+
*/
39+
Object getResource(String resourceName, String resourceType);
40+
41+
/**
42+
* Initializes a Python resource instance from the specified module and class.
43+
*
44+
* @param module the Python module containing the target class
45+
* @param clazz the Python class name to instantiate
46+
* @param kwargs keyword arguments to pass to the Python class constructor
47+
* @return a PyObject representing the initialized Python resource
48+
*/
49+
PyObject initPythonResource(String module, String clazz, Map<String, Object> kwargs);
50+
51+
/**
52+
* Invokes a method on a Python object with the specified parameters.
53+
*
54+
* @param obj the Python object on which to call the method
55+
* @param methodName the name of the method to invoke
56+
* @param kwargs keyword arguments to pass to the method
57+
* @return the result of the method invocation
58+
*/
59+
Object callMethod(Object obj, String methodName, Map<String, Object> kwargs);
60+
61+
/**
62+
* Invokes a method with the specified name and arguments.
63+
*
64+
* @param name the name of the method to invoke
65+
* @param args the arguments to pass to the method
66+
* @return the result of the method invocation
67+
*/
68+
Object invoke(String name, Object... args);
69+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.api.resource.python;
20+
21+
/**
22+
* Wrapper interface for Python resource objects. This interface provides a unified way to access
23+
* the underlying Python resource from Java objects that encapsulate Python functionality.
24+
*/
25+
public interface PythonResourceWrapper {
26+
27+
/**
28+
* Retrieves the underlying Python resource object.
29+
*
30+
* @return the wrapped Python resource object
31+
*/
32+
Object getPythonResource();
33+
}

e2e-test/integration-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ under the License.
7474
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
7575
<version>${project.version}</version>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.flink</groupId>
79+
<artifactId>flink-python</artifactId>
80+
<version>${flink.version}</version>
81+
</dependency>
7782
</dependencies>
7883

7984
</project>

0 commit comments

Comments
 (0)