Skip to content

Commit cad05c3

Browse files
committed
Wrap up integration with langchain4j - everything is cog
Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
1 parent 730fe60 commit cad05c3

File tree

9 files changed

+417
-256
lines changed

9 files changed

+417
-256
lines changed

experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,44 @@
2121
import io.serverlessworkflow.impl.WorkflowModel;
2222
import io.serverlessworkflow.impl.WorkflowModelCollection;
2323
import io.serverlessworkflow.impl.WorkflowModelFactory;
24+
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.CognisphereRegistryAssessor;
2425
import io.serverlessworkflow.impl.expressions.func.JavaModel;
2526
import java.time.OffsetDateTime;
2627
import java.util.Map;
2728

2829
class AgenticModelFactory implements WorkflowModelFactory {
2930

31+
/**
32+
* Applies any change to the model after running as task. We will always set it to
33+
* a @DefaultCognisphere object since @AgentExecutor is always adding the output to the
34+
* cognisphere. We just have to make sure that cognisphere is always passed to the next input
35+
* task.
36+
*
37+
* @param prev the global Cognisphere object getting updated by the workflow context
38+
* @param obj the same Cognisphere object updated by the AgentExecutor
39+
* @return the workflow context model holding the cognisphere object.
40+
*/
3041
@Override
3142
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
32-
((AgenticModel) prev).setObject(obj);
43+
// We ignore `obj` since it's already included in `prev` within the Cognisphere instance
3344
return prev;
3445
}
3546

3647
@Override
3748
public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {
38-
throw new UnsupportedOperationException();
49+
// TODO: create a new cognisphere object in the CognisphereRegistryAssessor per branch
50+
// TODO: Since we share the same cognisphere object, both branches are updating the same
51+
// instance, so for now we return the first key.
52+
return workflowVariables.values().iterator().next();
3953
}
4054

4155
@Override
4256
public WorkflowModelCollection createCollection() {
4357
throw new UnsupportedOperationException();
4458
}
4559

60+
// TODO: all these methods can use Cognisphere as long as we have access to the `outputName`
61+
4662
@Override
4763
public WorkflowModel from(boolean value) {
4864
return new JavaModel(value);
@@ -75,7 +91,9 @@ public WorkflowModel from(OffsetDateTime value) {
7591

7692
@Override
7793
public WorkflowModel from(Map<String, Object> map) {
78-
return new JavaModel(map);
94+
final Cognisphere cognisphere = new CognisphereRegistryAssessor().getCognisphere();
95+
cognisphere.writeStates(map);
96+
return new AgenticModel(cognisphere);
7997
}
8098

8199
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.expressions.agentic.langchain4j;
17+
18+
import dev.langchain4j.agentic.cognisphere.CognisphereRegistry;
19+
import dev.langchain4j.agentic.cognisphere.DefaultCognisphere;
20+
import dev.langchain4j.agentic.internal.CognisphereOwner;
21+
import java.util.Objects;
22+
import java.util.UUID;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
25+
public class CognisphereRegistryAssessor implements CognisphereOwner {
26+
27+
private final AtomicReference<CognisphereRegistry> cognisphereRegistry = new AtomicReference<>();
28+
private final String agentId;
29+
private DefaultCognisphere cognisphere;
30+
private Object memoryId;
31+
32+
public CognisphereRegistryAssessor(String agentId) {
33+
Objects.requireNonNull(agentId, "Agent id cannot be null");
34+
this.agentId = agentId;
35+
}
36+
37+
// TODO: have access to the workflow definition and assign its name instead
38+
public CognisphereRegistryAssessor() {
39+
this.agentId = UUID.randomUUID().toString();
40+
}
41+
42+
public void setMemoryId(Object memoryId) {
43+
this.memoryId = memoryId;
44+
}
45+
46+
public DefaultCognisphere getCognisphere() {
47+
if (cognisphere != null) {
48+
return cognisphere;
49+
}
50+
51+
if (memoryId != null) {
52+
this.cognisphere = registry().getOrCreate(memoryId);
53+
} else {
54+
this.cognisphere = registry().createEphemeralCognisphere();
55+
}
56+
return this.cognisphere;
57+
}
58+
59+
@Override
60+
public CognisphereOwner withCognisphere(DefaultCognisphere cognisphere) {
61+
this.cognisphere = cognisphere;
62+
return this;
63+
}
64+
65+
@Override
66+
public CognisphereRegistry registry() {
67+
cognisphereRegistry.compareAndSet(null, new CognisphereRegistry(agentId));
68+
return cognisphereRegistry.get();
69+
}
70+
}

fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/AbstractAgentService.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ public abstract class AbstractAgentService<T, S> implements WorkflowDefinitionBu
3636
protected final Class<T> agentServiceClass;
3737

3838
protected AbstractAgentService(Class<T> agentServiceClass) {
39-
this.workflowBuilder =
40-
AgentWorkflowBuilder.workflow()
41-
.outputAs(DEFAULT_OUTPUT_FUNCTION);
39+
this.workflowBuilder = AgentWorkflowBuilder.workflow().outputAs(DEFAULT_OUTPUT_FUNCTION);
4240
this.agentServiceClass = agentServiceClass;
4341
this.workflowExecBuilder = WorkflowApplication.builder();
4442
}
@@ -49,7 +47,8 @@ public T build() {
4947
Proxy.newProxyInstance(
5048
this.agentServiceClass.getClassLoader(),
5149
new Class<?>[] {agentServiceClass, AgentSpecification.class, CognisphereOwner.class},
52-
new WorkflowInvocationHandler(this.workflowBuilder.build(), this.workflowExecBuilder, this.agentServiceClass));
50+
new WorkflowInvocationHandler(
51+
this.workflowBuilder.build(), this.workflowExecBuilder, this.agentServiceClass));
5352
}
5453

5554
@SuppressWarnings("unchecked")

fluent/agentic-langchain4j/src/main/java/io/serverlessworkflow/fluent/agentic/langchain4j/WorkflowInvocationHandler.java

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import dev.langchain4j.agentic.UntypedAgent;
1919
import dev.langchain4j.agentic.cognisphere.Cognisphere;
2020
import dev.langchain4j.agentic.cognisphere.CognisphereAccess;
21-
import dev.langchain4j.agentic.cognisphere.CognisphereKey;
2221
import dev.langchain4j.agentic.cognisphere.CognisphereRegistry;
2322
import dev.langchain4j.agentic.cognisphere.DefaultCognisphere;
2423
import dev.langchain4j.agentic.cognisphere.ResultWithCognisphere;
@@ -28,29 +27,26 @@
2827
import dev.langchain4j.service.MemoryId;
2928
import io.serverlessworkflow.api.types.Workflow;
3029
import io.serverlessworkflow.impl.WorkflowApplication;
31-
import io.serverlessworkflow.impl.WorkflowModel;
32-
30+
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.CognisphereRegistryAssessor;
3331
import java.lang.reflect.InvocationHandler;
3432
import java.lang.reflect.Method;
3533
import java.lang.reflect.Parameter;
3634
import java.util.Map;
37-
import java.util.concurrent.CompletableFuture;
3835
import java.util.concurrent.ExecutionException;
39-
import java.util.concurrent.atomic.AtomicReference;
4036

4137
public class WorkflowInvocationHandler implements InvocationHandler, CognisphereOwner {
4238

4339
private final Workflow workflow;
4440
private final WorkflowApplication.Builder workflowApplicationBuilder;
45-
private DefaultCognisphere cognisphere;
46-
private Class<?> agentServiceClass;
47-
private final AtomicReference<CognisphereRegistry> cognisphereRegistry = new AtomicReference<>();
41+
private final CognisphereRegistryAssessor cognisphereRegistryAssessor;
4842

4943
WorkflowInvocationHandler(
50-
Workflow workflow, WorkflowApplication.Builder workflowApplicationBuilder, Class<?> agentServiceClass) {
44+
Workflow workflow,
45+
WorkflowApplication.Builder workflowApplicationBuilder,
46+
Class<?> agentServiceClass) {
5147
this.workflow = workflow;
5248
this.workflowApplicationBuilder = workflowApplicationBuilder;
53-
this.agentServiceClass = agentServiceClass;
49+
this.cognisphereRegistryAssessor = new CognisphereRegistryAssessor(agentServiceClass.getName());
5450
}
5551

5652
@SuppressWarnings("unchecked")
@@ -67,22 +63,26 @@ private static void writeCognisphereState(Cognisphere cognisphere, Method method
6763
}
6864
}
6965

70-
private String agentId() {
71-
return workflow.getDocument().getName();
66+
private String outputName() {
67+
Object outputName =
68+
this.workflow
69+
.getDocument()
70+
.getMetadata()
71+
.getAdditionalProperties()
72+
.get(WorkflowDefinitionBuilder.META_KEY_OUTPUTNAME);
73+
if (outputName != null) {
74+
return outputName.toString();
75+
}
76+
return null;
7277
}
7378

7479
@Override
7580
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
76-
CognisphereRegistry registry = cognisphereRegistry();
81+
CognisphereRegistry registry = registry();
7782
// outputName
7883
if (method.getDeclaringClass() == AgentSpecification.class) {
7984
return switch (method.getName()) {
80-
case "outputName" ->
81-
this.workflow
82-
.getDocument()
83-
.getMetadata()
84-
.getAdditionalProperties()
85-
.get(WorkflowDefinitionBuilder.META_KEY_OUTPUTNAME);
85+
case "outputName" -> outputName();
8686
default ->
8787
throw new UnsupportedOperationException(
8888
"Unknown method on AgentInstance class : " + method.getName());
@@ -113,39 +113,44 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
113113
}
114114

115115
// invoke
116-
return executeWorkflow(currentCognisphere(cognisphereRegistry(), method, args), method, args);
116+
return executeWorkflow(currentCognisphere(method, args), method, args);
117117
}
118118

119119
private Object executeWorkflow(DefaultCognisphere cognisphere, Method method, Object[] args) {
120120
writeCognisphereState(cognisphere, method, args);
121121

122122
try (WorkflowApplication app = workflowApplicationBuilder.build()) {
123-
CompletableFuture<WorkflowModel> workflowInstance = app.workflowDefinition(workflow).instance(cognisphere).start();
124-
Object result = workflowInstance.get().asJavaObject();
125-
return method.getReturnType().equals(ResultWithCognisphere.class) ?
126-
new ResultWithCognisphere<>(cognisphere, result) :
127-
result;
123+
// TODO improve result handling
124+
DefaultCognisphere output =
125+
app.workflowDefinition(workflow)
126+
.instance(cognisphere)
127+
.start()
128+
.get()
129+
.as(DefaultCognisphere.class)
130+
.orElseThrow(
131+
() ->
132+
new IllegalArgumentException(
133+
"Workflow hasn't returned a Cognisphere object."));
134+
Object result = output.readState(outputName());
135+
136+
return method.getReturnType().equals(ResultWithCognisphere.class)
137+
? new ResultWithCognisphere<>(output, result)
138+
: result;
128139

129140
} catch (ExecutionException | InterruptedException e) {
130141
throw new RuntimeException(
131-
"Failed to execute workflow: " + agentId() + " - Cognisphere: " + cognisphere, e);
142+
"Failed to execute workflow: "
143+
+ workflow.getDocument().getName()
144+
+ " - Cognisphere: "
145+
+ cognisphere,
146+
e);
132147
}
133148
}
134149

135-
private CognisphereRegistry cognisphereRegistry() {
136-
cognisphereRegistry.compareAndSet(null, new CognisphereRegistry(this.agentServiceClass.getName()));
137-
return cognisphereRegistry.get();
138-
}
139-
140-
private DefaultCognisphere currentCognisphere(CognisphereRegistry registry, Method method, Object[] args) {
141-
if (cognisphere != null) {
142-
return cognisphere;
143-
}
144-
150+
private DefaultCognisphere currentCognisphere(Method method, Object[] args) {
145151
Object memoryId = memoryId(method, args);
146-
return memoryId != null
147-
? registry.getOrCreate(new CognisphereKey(this.agentId(), memoryId))
148-
: registry.createEphemeralCognisphere();
152+
this.cognisphereRegistryAssessor.setMemoryId(memoryId);
153+
return this.cognisphereRegistryAssessor.getCognisphere();
149154
}
150155

151156
private Object memoryId(Method method, Object[] args) {
@@ -160,12 +165,12 @@ private Object memoryId(Method method, Object[] args) {
160165

161166
@Override
162167
public CognisphereOwner withCognisphere(DefaultCognisphere cognisphere) {
163-
this.cognisphere = cognisphere;
168+
this.cognisphereRegistryAssessor.withCognisphere(cognisphere);
164169
return this;
165170
}
166171

167172
@Override
168173
public CognisphereRegistry registry() {
169-
return this.cognisphereRegistry();
174+
return this.cognisphereRegistryAssessor.registry();
170175
}
171176
}

0 commit comments

Comments
 (0)