Skip to content

Commit 8bc4c9c

Browse files
committed
[Feature][runtime] Support the use of Python ChatModel in Java
1 parent 4c6ceb8 commit 8bc4c9c

File tree

19 files changed

+1015
-26
lines changed

19 files changed

+1015
-26
lines changed

api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ under the License.
5757
<version>${flink.version}</version>
5858
<scope>provided</scope>
5959
</dependency>
60+
<dependency>
61+
<groupId>com.alibaba</groupId>
62+
<artifactId>pemja</artifactId>
63+
<version>${pemja.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
6066
</dependencies>
6167

6268
</project>
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.chat.model.python;
19+
20+
import org.apache.flink.agents.api.chat.messages.ChatMessage;
21+
import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
22+
import org.apache.flink.agents.api.resource.Resource;
23+
import org.apache.flink.agents.api.resource.ResourceDescriptor;
24+
import org.apache.flink.agents.api.resource.ResourceType;
25+
import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
26+
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
27+
import org.apache.flink.agents.api.tools.Tool;
28+
import pemja.core.object.PyObject;
29+
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.function.BiFunction;
33+
34+
/**
35+
* Python-based implementation of ChatModelConnection that wraps a Python chat model object. This
36+
* class serves as a bridge between Java and Python chat model environments, but unlike {@link
37+
* PythonChatModelSetup}, it does not provide direct chat functionality in Java.
38+
*/
39+
public class PythonChatModelConnection extends BaseChatModelConnection
40+
implements PythonResourceWrapper {
41+
private PyObject chatModel;
42+
43+
/**
44+
* Creates a new PythonChatModelConnection.
45+
*
46+
* @param adapter The Python resource adapter (required by PythonResourceProvider's
47+
* reflection-based instantiation but not used directly in this implementation)
48+
* @param chatModel The Python chat model object
49+
* @param descriptor The resource descriptor
50+
* @param getResource Function to retrieve resources by name and type
51+
*/
52+
public PythonChatModelConnection(
53+
PythonResourceAdapter adapter,
54+
PyObject chatModel,
55+
ResourceDescriptor descriptor,
56+
BiFunction<String, ResourceType, Resource> getResource) {
57+
super(descriptor, getResource);
58+
this.chatModel = chatModel;
59+
}
60+
61+
@Override
62+
public Object getPythonResource() {
63+
return chatModel;
64+
}
65+
66+
@Override
67+
public ChatMessage chat(
68+
List<ChatMessage> messages, List<Tool> tools, Map<String, Object> arguments) {
69+
throw new UnsupportedOperationException(
70+
"Chat method of PythonChatModelConnection cannot be called directly from Java runtime. "
71+
+ "This connection serves as a Python resource wrapper only. "
72+
+ "Chat operations should be performed on the Python side using the underlying Python chat model object.");
73+
}
74+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.chat.model.python;
19+
20+
import org.apache.flink.agents.api.chat.messages.ChatMessage;
21+
import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
22+
import org.apache.flink.agents.api.resource.Resource;
23+
import org.apache.flink.agents.api.resource.ResourceDescriptor;
24+
import org.apache.flink.agents.api.resource.ResourceType;
25+
import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
26+
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
27+
import pemja.core.object.PyObject;
28+
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.function.BiFunction;
34+
35+
/**
36+
* Python-based implementation of ChatModelSetup that bridges Java and Python chat model
37+
* functionality. This class wraps a Python chat model setup object and provides Java interface
38+
* compatibility while delegating actual chat operations to the underlying Python implementation.
39+
*/
40+
public class PythonChatModelSetup extends BaseChatModelSetup implements PythonResourceWrapper {
41+
private static final String FROM_JAVA_CHAT_MESSAGE = "python_java_utils.from_java_chat_message";
42+
43+
private static final String TO_JAVA_CHAT_MESSAGE = "python_java_utils.to_java_chat_message";
44+
45+
private final PyObject chatModelSetup;
46+
private final PythonResourceAdapter adapter;
47+
48+
public PythonChatModelSetup(
49+
PythonResourceAdapter adapter,
50+
PyObject chatModelSetup,
51+
ResourceDescriptor descriptor,
52+
BiFunction<String, ResourceType, Resource> getResource) {
53+
super(descriptor, getResource);
54+
this.chatModelSetup = chatModelSetup;
55+
this.adapter = adapter;
56+
}
57+
58+
@Override
59+
public ChatMessage chat(List<ChatMessage> messages, Map<String, Object> parameters) {
60+
Map<String, Object> kwargs = new HashMap<>(parameters);
61+
62+
List<Object> pythonMessages = new ArrayList<>();
63+
for (ChatMessage message : messages) {
64+
pythonMessages.add(toPythonChatMessage(message));
65+
}
66+
67+
kwargs.put("messages", pythonMessages);
68+
69+
Object pythonMessageResponse = adapter.callMethod(chatModelSetup, "chat", kwargs);
70+
return fromPythonChatMessage(pythonMessageResponse);
71+
}
72+
73+
/**
74+
* Converts a Java ChatMessage object to its Python equivalent.
75+
*
76+
* @param message the Java ChatMessage to convert
77+
* @return the Python representation of the chat message
78+
*/
79+
private Object toPythonChatMessage(ChatMessage message) {
80+
return adapter.invoke(FROM_JAVA_CHAT_MESSAGE, message);
81+
}
82+
83+
/**
84+
* Converts a Python chat message object back to a Java ChatMessage.
85+
*
86+
* @param pythonChatMessage the Python chat message object to convert
87+
* @return the Java ChatMessage representation
88+
*/
89+
private ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
90+
ChatMessage message = (ChatMessage) adapter.invoke(TO_JAVA_CHAT_MESSAGE, pythonChatMessage);
91+
92+
return message;
93+
}
94+
95+
@Override
96+
public Object getPythonResource() {
97+
return chatModelSetup;
98+
}
99+
100+
@Override
101+
public Map<String, Object> getParameters() {
102+
return Map.of();
103+
}
104+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.api.resource.python;
20+
21+
import pemja.core.object.PyObject;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* Adapter interface for managing Python resources and facilitating Java-Python interoperability.
27+
* This interface provides methods to interact with Python objects, invoke Python methods, and
28+
* handle data conversion between Java and Python environments.
29+
*/
30+
public interface PythonResourceAdapter {
31+
32+
/**
33+
* Retrieves a Python resource by name and type.
34+
*
35+
* @param resourceName the name of the resource to retrieve
36+
* @param resourceType the type of the resource
37+
* @return the retrieved resource object
38+
*/
39+
Object getResource(String resourceName, String resourceType);
40+
41+
/**
42+
* Initializes a Python resource instance from the specified module and class.
43+
*
44+
* @param module the Python module containing the target class
45+
* @param clazz the Python class name to instantiate
46+
* @param kwargs keyword arguments to pass to the Python class constructor
47+
* @return a PyObject representing the initialized Python resource
48+
*/
49+
PyObject initPythonResource(String module, String clazz, Map<String, Object> kwargs);
50+
51+
/**
52+
* Invokes a method on a Python object with the specified parameters.
53+
*
54+
* @param obj the Python object on which to call the method
55+
* @param methodName the name of the method to invoke
56+
* @param kwargs keyword arguments to pass to the method
57+
* @return the result of the method invocation
58+
*/
59+
Object callMethod(Object obj, String methodName, Map<String, Object> kwargs);
60+
61+
/**
62+
* Invokes a method with the specified name and arguments.
63+
*
64+
* @param name the name of the method to invoke
65+
* @param args the arguments to pass to the method
66+
* @return the result of the method invocation
67+
*/
68+
Object invoke(String name, Object... args);
69+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.api.resource.python;
20+
21+
/**
22+
* Wrapper interface for Python resource objects. This interface provides a unified way to access
23+
* the underlying Python resource from Java objects that encapsulate Python functionality.
24+
*/
25+
public interface PythonResourceWrapper {
26+
27+
/**
28+
* Retrieves the underlying Python resource object.
29+
*
30+
* @return the wrapped Python resource object
31+
*/
32+
Object getPythonResource();
33+
}

e2e-test/integration-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ under the License.
7474
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
7575
<version>${project.version}</version>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.flink</groupId>
79+
<artifactId>flink-python</artifactId>
80+
<version>${flink.version}</version>
81+
</dependency>
7782
</dependencies>
7883

7984
</project>

0 commit comments

Comments
 (0)