Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/ema-ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"@lancedb/lancedb": "^0.23.0",
"arktype": "^2.1.29",
"ema": "workspace:*",
"mongodb-agenda": "npm:mongodb@4.17.2",
"mongodb": "^7.0.0",
"next": "16.0.10",
"pino": "^10.1.0",
Expand Down
29 changes: 15 additions & 14 deletions packages/ema-ui/src/app/chat/page.module.css
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,27 @@
}

.snapshotStatus {
max-width: 800px;
margin: -1rem auto 1rem;
position: fixed;
left: 50%;
top: 1rem;
transform: translateX(-50%);
max-width: 720px;
width: calc(100% - 2rem);
margin: 0;
padding: 0.75rem 1rem;
border-radius: 12px;
background: rgba(255, 255, 255, 0.08);
border: 1px solid rgba(255, 255, 255, 0.12);
color: #cdd6f7;
background: #cdd6f7;
border: 1px solid rgba(0, 0, 0, 0.12);
color: #30354a;
font-size: 0.9rem;
animation: toastFade 3s ease forwards;
text-align: center;
animation: toastFade 2.8s ease forwards;
z-index: 30;
pointer-events: none;
}

@keyframes toastFade {
0% {
opacity: 0;
transform: translateY(-4px);
}
15% {
opacity: 1;
transform: translateY(0);
}
0%,
70% {
opacity: 1;
}
Expand Down
81 changes: 54 additions & 27 deletions packages/ema-ui/src/app/chat/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import { useState, useEffect, useRef } from "react";
import styles from "./page.module.css";
import type { ActorAgentEvent, Message } from "ema";

let initialLoadPromise: Promise<Message[] | null> | null = null;
let initialMessagesCache: Message[] | null = null;

// todo: consider adding tests for this component to verify message state management
export default function ChatPage() {
const [messages, setMessages] = useState<Message[]>([]);
const [inputValue, setInputValue] = useState("");
const [initializing, setInitializing] = useState(true);
const [snapshotting, setSnapshotting] = useState(false);
const [snapshotStatus, setSnapshotStatus] = useState<string | null>(null);
const [notice, setNotice] = useState<string | null>(null);
const chatAreaRef = useRef<HTMLDivElement | null>(null);
const messagesEndRef = useRef<HTMLDivElement | null>(null);
const shouldAutoScrollRef = useRef(true);
Expand All @@ -21,24 +25,48 @@ export default function ChatPage() {
let isActive = true;

const init = async () => {
setInitializing(true);
try {
await fetch("/api/users/login");
} catch (error) {
console.error("Error logging in:", error);
}
if (!initialLoadPromise) {
initialLoadPromise = (async () => {
try {
await fetch("/api/users/login");
} catch (error) {
console.error("Error logging in:", error);
}

try {
const response = await fetch(
"/api/conversations/messages?conversationId=1&limit=100",
);
if (response.ok) {
const data = (await response.json()) as { messages: Message[] };
if (isActive && Array.isArray(data.messages)) {
setMessages(data.messages);
}
try {
const response = await fetch(
"/api/conversations/messages?conversationId=1&limit=100",
);
if (response.ok) {
const data = (await response.json()) as { messages: Message[] };
if (Array.isArray(data.messages)) {
return data.messages;
}
}
} catch (error) {
console.error("Error loading history:", error);
}
return null;
})().catch((error) => {
initialLoadPromise = null;
throw error;
});
}

if (!initialMessagesCache) {
initialMessagesCache = await initialLoadPromise;
}

if (isActive && Array.isArray(initialMessagesCache)) {
setMessages(initialMessagesCache);
setNotice("Conversation history loaded.");
}
} finally {
if (isActive) {
setInitializing(false);
}
} catch (error) {
console.error("Error loading history:", error);
}

if (!isActive) {
Expand Down Expand Up @@ -101,14 +129,14 @@ export default function ChatPage() {
}, [messages.length]);

useEffect(() => {
if (!snapshotStatus) {
if (!notice) {
return;
}
if (snapshotTimerRef.current) {
clearTimeout(snapshotTimerRef.current);
}
snapshotTimerRef.current = setTimeout(() => {
setSnapshotStatus(null);
setNotice(null);
snapshotTimerRef.current = null;
}, 3200);
return () => {
Expand All @@ -117,7 +145,7 @@ export default function ChatPage() {
snapshotTimerRef.current = null;
}
};
}, [snapshotStatus]);
}, [notice]);

const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
Expand Down Expand Up @@ -172,7 +200,7 @@ export default function ChatPage() {
};

const handleSnapshot = async () => {
setSnapshotStatus(null);
setNotice(null);
setSnapshotting(true);
try {
const response = await fetch("/api/snapshot", {
Expand All @@ -182,20 +210,20 @@ export default function ChatPage() {
});
if (!response.ok) {
const text = await response.text();
setSnapshotStatus(text || "Snapshot failed.");
setNotice(text || "Snapshot failed.");
return;
}
const data = (await response.json().catch(() => null)) as {
fileName?: string;
} | null;
setSnapshotStatus(
setNotice(
data?.fileName
? `Snapshot saved: ${data.fileName}`
: "Snapshot created.",
);
} catch (error) {
console.error("Snapshot error:", error);
setSnapshotStatus("Snapshot failed.");
setNotice("Snapshot failed.");
} finally {
setSnapshotting(false);
}
Expand All @@ -215,9 +243,7 @@ export default function ChatPage() {
{snapshotting ? "Snapshotting..." : "Snapshot"}
</button>
</div>
{snapshotStatus ? (
<div className={styles.snapshotStatus}>{snapshotStatus}</div>
) : null}
{notice ? <div className={styles.snapshotStatus}>{notice}</div> : null}

<div
className={styles.chatArea}
Expand Down Expand Up @@ -270,13 +296,14 @@ export default function ChatPage() {
placeholder="Enter message..."
value={inputValue}
onChange={(e) => setInputValue(e.target.value)}
disabled={initializing}
/>
<div className={styles.buttonGroup}>
<button
type="submit"
aria-label="Send message"
className={styles.sendButton}
disabled={!inputValue.trim()}
disabled={initializing || !inputValue.trim()}
>
<svg
width="16"
Expand Down
2 changes: 2 additions & 0 deletions packages/ema/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
],
"dependencies": {
"@google/genai": "^1.34.0",
"@hokify/agenda": "^6.3.0",
"@lancedb/lancedb": "^0.23.0",
"@modelcontextprotocol/sdk": "^1.25.1",
"apache-arrow": "^18.1.0",
Expand All @@ -36,6 +37,7 @@
"js-tiktoken": "^1.0.21",
"js-yaml": "^4.1.0",
"mongodb": "^7.0.0",
"mongodb-agenda": "npm:mongodb@4.17.2",
"mongodb-memory-server": "^11.0.0",
"openai": "^6.13.0",
"pino": "^10.1.0",
Expand Down
14 changes: 14 additions & 0 deletions packages/ema/src/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ import type { Content } from "./schema";
import { LLMClient } from "./llm";
import { type AgentState } from "./agent";

/** The scope information for the actor. */
export interface ActorScope {
actorId: number;
userId: number;
conversationId?: number;
}

/**
* A facade of the actor functionalities between the server (system) and the agent (actor).
*/
Expand Down Expand Up @@ -293,6 +300,13 @@ export class ActorWorker implements ActorStateStorage, ActorMemory {
),
messages: batches.map((item) => bufferMessageToUserMessage(item)),
tools: this.config.baseTools,
toolContext: {
actorScope: {
actorId: this.actorId,
userId: this.userId,
conversationId: this.conversationId,
},
},
};
}
this.resumeStateAfterAbort = false;
Expand Down
14 changes: 6 additions & 8 deletions packages/ema/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
isToolMessage,
isUserMessage,
} from "./schema";
import type { Tool, ToolResult } from "./tools/base";
import type { Tool, ToolResult, ToolContext } from "./tools/base";
import type { EmaReply } from "./tools/ema_reply_tool";

/** Event emitted when the agent finishes a run. */
Expand Down Expand Up @@ -71,6 +71,7 @@ export type AgentState = {
systemPrompt: string;
messages: Message[];
tools: Tool[];
toolContext?: ToolContext;
};

/** Callback type for running the agent with a given state. */
Expand Down Expand Up @@ -314,13 +315,10 @@ export class Agent {
};
} else {
try {
const props = (
tool.parameters as { properties?: Record<string, unknown> }
).properties;
const positionalArgs = props
? Object.keys(props).map((key) => callArgs[key])
: Object.values(callArgs);
result = await tool.execute(...positionalArgs);
result = await tool.execute(
callArgs,
this.contextManager.state.toolContext,
);
} catch (err) {
const errorDetail = `${(err as Error).name}: ${(err as Error).message}`;
const errorTrace = (err as Error).stack ?? "";
Expand Down
6 changes: 6 additions & 0 deletions packages/ema/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export abstract class Mongo {
*/
abstract getClient(): MongoClient;

/**
* Gets the MongoDB connection URI.
* @returns The MongoDB connection URI
*/
abstract getUri(): string;

/**
* Connects to the MongoDB database
* @returns Promise resolving when connection is established
Expand Down
18 changes: 17 additions & 1 deletion packages/ema/src/db/mongo/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ export class MemoryMongo extends Mongo {
let client: MongoClient | undefined;

try {
mongoServer = await MongoMemoryServer.create();
mongoServer = await MongoMemoryServer.create({
instance: {
port: 0,
},
});
const uri = mongoServer.getUri();
client = new MongoClient(uri);
await client.connect();
Expand Down Expand Up @@ -83,6 +87,18 @@ export class MemoryMongo extends Mongo {
return this.client;
}

/**
* Gets the MongoDB connection URI.
* @returns The MongoDB connection URI
* @throws Error if not connected
*/
getUri(): string {
if (!this.mongoServer) {
throw new Error("MongoDB not connected. Call connect() first.");
}
return this.mongoServer.getUri(this.dbName);
}

/**
* Closes the MongoDB connection and stops the in-memory server
* @returns Promise resolving when connection is closed and server is stopped
Expand Down
36 changes: 35 additions & 1 deletion packages/ema/src/db/mongo/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,28 @@
* Connects to an actual MongoDB instance using connection string.
*/

import { MongoClient, type Db } from "mongodb";
import { createRequire } from "node:module";
import { MongoClient } from "mongodb";
import type { CreateMongoArgs } from "../mongo";
import { Mongo } from "../mongo";

type ConnectionString = {
pathname: string;
toString(): string;
};

type ConnectionStringConstructor = new (
uri: string,
options?: { looseValidation?: boolean },
) => ConnectionString;

const requireFromMongo = createRequire(
createRequire(import.meta.url).resolve("mongodb/package.json"),
);
const { default: MongoConnectionString } = requireFromMongo(
"mongodb-connection-string-url",
) as { default: ConnectionStringConstructor };

/**
* Remote MongoDB implementation
* Connects to an actual MongoDB instance for production environments
Expand Down Expand Up @@ -62,6 +80,14 @@ export class RemoteMongo extends Mongo {
return this.client;
}

/**
* Gets the MongoDB connection URI.
* @returns The MongoDB connection URI
*/
getUri(): string {
return this.buildUriWithDb();
}

/**
* Closes the MongoDB connection
* @returns Promise resolving when connection is closed
Expand All @@ -72,4 +98,12 @@ export class RemoteMongo extends Mongo {
this.client = undefined;
}
}

private buildUriWithDb(): string {
const connectionString = new MongoConnectionString(this.uri);
if (connectionString.pathname === "" || connectionString.pathname === "/") {
connectionString.pathname = `/${this.dbName}`;
}
return connectionString.toString();
}
}
Loading