From 7f32a304de3d20496f67e51a49ba9a285c77dfcd Mon Sep 17 00:00:00 2001 From: huijiro Date: Mon, 27 Oct 2025 12:22:35 -0300 Subject: [PATCH 1/4] Move email send to context --- src/apis/email.ts | 248 ++++++++++++++++++++++++++++++++++------------ src/io/email.ts | 167 +++++++------------------------ src/types.ts | 32 +++--- 3 files changed, 237 insertions(+), 210 deletions(-) diff --git a/src/apis/email.ts b/src/apis/email.ts index f43c8408..e5a907d0 100644 --- a/src/apis/email.ts +++ b/src/apis/email.ts @@ -1,6 +1,10 @@ import { context, SpanStatusCode, trace } from '@opentelemetry/api'; +import MailComposer from 'nodemailer/lib/mail-composer/index.js'; +import type { Attachment } from 'nodemailer/lib/mailer/index.js'; +import type { EmailReply } from '../io/email'; import { getTracer, recordException } from '../router/router'; -import type { EmailService } from '../types'; +import { fromDataType } from '../server/util'; +import type { AgentContext, AgentRequest, EmailService } from '../types'; import { POST } from './api'; export default class EmailApi implements EmailService { @@ -8,45 +12,107 @@ export default class EmailApi implements EmailService { * send an email */ async send( - agentId: string, - email: string, - authToken: string, - messageId: string - ): Promise { + req: AgentRequest, + ctx: AgentContext, + to: string[], + email: EmailReply, + from?: { + name?: string; + email?: string; + } + ): Promise { + const authToken = req.metadata?.['email-auth-token'] as string; + if (!authToken) { + throw new Error( + 'email authorization token is required but not found in metadata' + ); + } + const tracer = getTracer(); const currentContext = context.active(); - - // Create a child span using the current context const span = tracer.startSpan('agentuity.email.send', {}, currentContext); try { - // Create a new context with the child span const spanContext = trace.setSpan(currentContext, span); - // Execute the operation within the new context return await context.with(spanContext, async () => { - span.setAttribute('@agentuity/agentId', agentId); - span.setAttribute('@agentuity/emailMessageId', messageId); - - const resp = await POST( - '/email/send', - email, - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken - ); - if (resp.status === 200) { - span.setStatus({ code: SpanStatusCode.OK }); - return; + let attachments: Attachment[] = []; + if (email.attachments) { + attachments = await Promise.all( + email.attachments.map(async (attachment) => { + const resp = await fromDataType(attachment.data); + return { + filename: attachment.filename, + content: await resp.data.buffer(), + contentType: resp.data.contentType, + contentDisposition: + attachment.contentDisposition ?? ('attachment' as const), + }; + }) + ); + } + + const normalizedTo = to.map((addr) => addr.trim()).filter(Boolean); + if (normalizedTo.length === 0) { + throw new Error('at least one recipient email is required'); + } + + if (!from?.email) { + throw new Error('a valid from email address is required'); } - const body = await resp.response.text(); - span.setStatus({ code: SpanStatusCode.ERROR, message: body }); - throw new Error( - `error sending email: ${resp.response.statusText} (${resp.response.status})${body}` - ); + + const mail = new MailComposer({ + date: new Date(), + from: { + name: from?.name ?? ctx.agent.name, + address: from.email, + }, + to: normalizedTo.join(', '), + subject: email.subject ?? '', + text: email.text, + html: email.html, + attachments, + }); + const newemail = mail.compile(); + + return new Promise((resolve, reject) => { + newemail.build(async (err, message) => { + if (err) { + reject(err); + } else { + try { + const messageId = newemail.messageId(); + span.setAttribute('@agentuity/agentId', ctx.agent.id); + span.setAttribute('@agentuity/emailMessageId', messageId); + + const resp = await POST( + '/email/send', + message.toString(), + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken + ); + if (resp.status === 200) { + span.setStatus({ code: SpanStatusCode.OK }); + resolve(messageId); + } else { + const body = await resp.response.text(); + span.setStatus({ code: SpanStatusCode.ERROR, message: body }); + reject( + new Error( + `error sending email: ${resp.response.statusText} (${resp.response.status})${body}` + ) + ); + } + } catch (ex) { + reject(ex); + } + } + }); + }); }); } catch (ex) { recordException(span, ex); @@ -60,45 +126,103 @@ export default class EmailApi implements EmailService { * send an email reply to an incoming email */ async sendReply( - agentId: string, - email: string, - authToken: string, - messageId: string - ): Promise { + req: AgentRequest, + ctx: AgentContext, + inReplyTo: string, + reply: EmailReply, + from?: { + name?: string; + email?: string; + } + ): Promise { + const authToken = req.metadata?.['email-auth-token'] as string; + if (!authToken) { + throw new Error( + 'email authorization token is required but not found in metadata' + ); + } + const tracer = getTracer(); const currentContext = context.active(); - - // Create a child span using the current context const span = tracer.startSpan('agentuity.email.reply', {}, currentContext); try { - // Create a new context with the child span const spanContext = trace.setSpan(currentContext, span); - // Execute the operation within the new context return await context.with(spanContext, async () => { - span.setAttribute('@agentuity/agentId', agentId); - span.setAttribute('@agentuity/emailMessageId', messageId); - - const resp = await POST( - `/email/2025-03-17/${agentId}/reply`, - email, - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken - ); - if (resp.status === 200) { - span.setStatus({ code: SpanStatusCode.OK }); - return; + let attachments: Attachment[] = []; + if (reply.attachments) { + attachments = await Promise.all( + reply.attachments.map(async (attachment) => { + const resp = await fromDataType(attachment.data); + return { + filename: attachment.filename, + content: await resp.data.buffer(), + contentType: resp.data.contentType, + contentDisposition: + attachment.contentDisposition ?? ('attachment' as const), + }; + }) + ); } - const body = await resp.response.text(); - span.setStatus({ code: SpanStatusCode.ERROR, message: body }); - throw new Error( - `error sending email reply: ${resp.response.statusText} (${resp.response.status})${body}` - ); + + if (!from?.email) { + throw new Error('a valid from email address is required'); + } + + const mail = new MailComposer({ + inReplyTo: inReplyTo, + references: inReplyTo, + date: new Date(), + from: { + name: from?.name ?? ctx.agent.name, + address: from.email, + }, + subject: reply.subject ?? '', + text: reply.text, + html: reply.html, + attachments, + }); + const newemail = mail.compile(); + + return new Promise((resolve, reject) => { + newemail.build(async (err, message) => { + if (err) { + reject(err); + } else { + try { + const messageId = newemail.messageId(); + span.setAttribute('@agentuity/agentId', ctx.agent.id); + span.setAttribute('@agentuity/emailMessageId', messageId); + + const resp = await POST( + `/email/2025-03-17/${ctx.agent.id}/reply`, + message.toString(), + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken + ); + if (resp.status === 200) { + span.setStatus({ code: SpanStatusCode.OK }); + resolve(messageId); + } else { + const body = await resp.response.text(); + span.setStatus({ code: SpanStatusCode.ERROR, message: body }); + reject( + new Error( + `error sending email reply: ${resp.response.statusText} (${resp.response.status})${body}` + ) + ); + } + } catch (ex) { + reject(ex); + } + } + }); + }); }); } catch (ex) { recordException(span, ex); @@ -107,4 +231,4 @@ export default class EmailApi implements EmailService { span.end(); } } -} +} \ No newline at end of file diff --git a/src/io/email.ts b/src/io/email.ts index 52c0015a..373199eb 100644 --- a/src/io/email.ts +++ b/src/io/email.ts @@ -4,12 +4,9 @@ import type { ReadableStream } from 'node:stream/web'; import { inspect } from 'node:util'; import { context, SpanStatusCode, trace } from '@opentelemetry/api'; import { type Headers, type ParsedMail, simpleParser } from 'mailparser'; -import MailComposer from 'nodemailer/lib/mail-composer/index.js'; -import type { Address, Attachment } from 'nodemailer/lib/mailer/index.js'; import { send } from '../apis/api'; import { DataHandler } from '../router/data'; import { getTracer, recordException } from '../router/router'; -import { fromDataType } from '../server/util'; import type { AgentContext, AgentRequest, @@ -480,74 +477,15 @@ export class Email { email?: string; } ): Promise { - const authToken = req.metadata?.['email-auth-token'] as string; - if (!authToken) { - throw new Error( - 'email authorization token is required but not found in metadata' - ); + const fromAddress = from?.email ?? this.toEmail(); + if (!fromAddress) { + throw new Error('a valid from email address is required'); } - return (async () => { - let attachments: Attachment[] = []; - if (email.attachments) { - attachments = await Promise.all( - email.attachments.map(async (attachment) => { - const resp = await fromDataType(attachment.data); - return { - filename: attachment.filename, - content: await resp.data.buffer(), - contentType: resp.data.contentType, - contentDisposition: - attachment.contentDisposition ?? ('attachment' as const), - }; - }) - ); - } - - const normalizedTo = to.map((addr) => addr.trim()).filter(Boolean); - if (normalizedTo.length === 0) { - throw new Error('at least one recipient email is required'); - } - - const fromAddress = from?.email ?? this.toEmail(); - if (!fromAddress) { - throw new Error('a valid from email address is required'); - } - - const mail = new MailComposer({ - date: new Date(), - from: { - name: from?.name ?? context.agent.name, - address: fromAddress, - }, - to: normalizedTo.join(', '), - subject: email.subject ?? '', - text: email.text, - html: email.html, - attachments, - }); - const newemail = mail.compile(); - - return new Promise((resolve, reject) => { - newemail.build(async (err, message) => { - if (err) { - reject(err); - } else { - try { - await context.email.send( - context.agent.id, - message.toString(), - authToken, - newemail.messageId() - ); - resolve(newemail.messageId()); - } catch (ex) { - reject(ex); - } - } - }); - }); - })(); + return context.email.send(req, context, to, email, { + name: from?.name, + email: fromAddress, + }); } /** @@ -562,69 +500,36 @@ export class Email { email?: string; } ): Promise { - const authToken = req.metadata?.['email-auth-token'] as string; - if (!authToken) { - throw new Error( - 'email authorization token is required but not found in metadata' - ); + const messageId = this.messageId(); + if (!messageId) { + throw new Error('cannot reply to an email without a message ID'); } - // biome-ignore lint/suspicious/noAsyncPromiseExecutor: needed for complex async email operations - return new Promise(async (resolve, reject) => { - try { - let attachments: Attachment[] = []; - if (reply.attachments) { - attachments = await Promise.all( - reply.attachments.map(async (attachment) => { - const resp = await fromDataType(attachment.data); - return { - filename: attachment.filename, - content: await resp.data.buffer(), - contentType: resp.data.contentType, - contentDisposition: - attachment.contentDisposition ?? ('attachment' as const), - }; - }) - ); - } - const mail = new MailComposer({ - inReplyTo: this.messageId() ?? undefined, - references: this.messageId() ?? undefined, - date: new Date(), - from: { - name: from?.name ?? context.agent.name, - address: from?.email ?? this.toEmail() ?? '', - }, - to: { - name: this.fromName() ?? undefined, - address: this.fromEmail() ?? undefined, - } as Address, - subject: this.makeReplySubject(reply.subject), - text: reply.text, - html: reply.html, - attachments, - }); - const newemail = mail.compile(); - newemail.build(async (err, message) => { - if (err) { - reject(err); - } else { - try { - await context.email.sendReply( - context.agent.id, - message.toString(), - authToken, - newemail.messageId() - ); - resolve(newemail.messageId()); - } catch (ex) { - reject(ex); - } - } - }); - } catch (ex) { - reject(ex); + + const fromAddress = from?.email ?? this.toEmail(); + if (!fromAddress) { + throw new Error('a valid from email address is required'); + } + + const toAddress = this.fromEmail(); + if (!toAddress) { + throw new Error('cannot reply to an email without a sender address'); + } + + return context.email.sendReply( + req, + context, + messageId, + { + subject: this.makeReplySubject(reply.subject), + text: reply.text, + html: reply.html, + attachments: reply.attachments, + }, + { + name: from?.name, + email: fromAddress, } - }); + ); } } @@ -640,4 +545,4 @@ export async function parseEmail(data: Buffer): Promise { `Failed to parse email: ${error instanceof Error ? error.message : 'Unknown error'}` ); } -} +} \ No newline at end of file diff --git a/src/types.ts b/src/types.ts index 1c27d09a..975d0cde 100644 --- a/src/types.ts +++ b/src/types.ts @@ -673,31 +673,29 @@ export interface EmailService { * Send a new email to the specified recipients. */ send( - agentId: string, - email: string, - authToken: string, - messageId: string - ): Promise; + req: AgentRequest, + context: AgentContext, + to: string[], + email: import('./io/email').EmailReply, + from?: { + name?: string; + email?: string; + } + ): Promise; /** * send an email reply to an incoming email - * - * @param agentId - the id of the agent to send the reply to - * @param email - the email to send the reply to in RFC822 format - * @param authToken - the authorization token to use to send the reply - * @param messageId - the message id of the email - * @param from - the email address to send the reply from (NOTE: you can only override the email address if you have configured custom email sending) */ sendReply( - agentId: string, - email: string, - authToken: string, - messageId: string, + req: AgentRequest, + context: AgentContext, + inReplyTo: string, + reply: import('./io/email').EmailReply, from?: { name?: string; email?: string; } - ): Promise; + ): Promise; } /** @@ -1351,4 +1349,4 @@ export interface DataPayload { * the metadata */ metadata?: JsonObject; -} +} \ No newline at end of file From 3105709d258930edc2afe359e28be402ddc42ca7 Mon Sep 17 00:00:00 2001 From: huijiro Date: Mon, 27 Oct 2025 22:15:29 -0300 Subject: [PATCH 2/4] Implemented feedback --- src/apis/api.ts | 2 +- src/apis/email.ts | 34 +++++++++++++++++----------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/apis/api.ts b/src/apis/api.ts index 3991f111..4ed9ccfa 100644 --- a/src/apis/api.ts +++ b/src/apis/api.ts @@ -49,7 +49,7 @@ type BaseApiRequest = ApiRequestOptions & ApiRequestBase; /** * Represents the body of an API request */ -export type Body = string | ArrayBuffer | ReadableStream | Blob | FormData; +export type Body = string | ArrayBuffer | ReadableStream | Blob | FormData | Buffer | Uint8Array; type GetApiRequest = BaseApiRequest & { method: 'GET'; diff --git a/src/apis/email.ts b/src/apis/email.ts index e5a907d0..ed892870 100644 --- a/src/apis/email.ts +++ b/src/apis/email.ts @@ -86,14 +86,14 @@ export default class EmailApi implements EmailService { span.setAttribute('@agentuity/emailMessageId', messageId); const resp = await POST( - '/email/send', - message.toString(), - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken + '/email/send', + message, + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken ); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK }); @@ -196,14 +196,14 @@ export default class EmailApi implements EmailService { span.setAttribute('@agentuity/emailMessageId', messageId); const resp = await POST( - `/email/2025-03-17/${ctx.agent.id}/reply`, - message.toString(), - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken + `/email/2025-03-17/${ctx.agent.id}/reply`, + message, + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken ); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK }); @@ -231,4 +231,4 @@ export default class EmailApi implements EmailService { span.end(); } } -} \ No newline at end of file +} From 267cdfc3e0eafffac5e59b2d8b8e76f1e035c48e Mon Sep 17 00:00:00 2001 From: huijiro Date: Mon, 27 Oct 2025 23:07:49 -0300 Subject: [PATCH 3/4] Reverted implementations --- src/apis/api.ts | 2 +- src/apis/email.ts | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/apis/api.ts b/src/apis/api.ts index 4ed9ccfa..3991f111 100644 --- a/src/apis/api.ts +++ b/src/apis/api.ts @@ -49,7 +49,7 @@ type BaseApiRequest = ApiRequestOptions & ApiRequestBase; /** * Represents the body of an API request */ -export type Body = string | ArrayBuffer | ReadableStream | Blob | FormData | Buffer | Uint8Array; +export type Body = string | ArrayBuffer | ReadableStream | Blob | FormData; type GetApiRequest = BaseApiRequest & { method: 'GET'; diff --git a/src/apis/email.ts b/src/apis/email.ts index ed892870..664779fd 100644 --- a/src/apis/email.ts +++ b/src/apis/email.ts @@ -86,14 +86,14 @@ export default class EmailApi implements EmailService { span.setAttribute('@agentuity/emailMessageId', messageId); const resp = await POST( - '/email/send', - message, - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken + '/email/send', + message.toString(), + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken ); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK }); @@ -196,14 +196,14 @@ export default class EmailApi implements EmailService { span.setAttribute('@agentuity/emailMessageId', messageId); const resp = await POST( - `/email/2025-03-17/${ctx.agent.id}/reply`, - message, - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken + `/email/2025-03-17/${ctx.agent.id}/reply`, + message.toString(), + { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }, + undefined, + authToken ); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK }); From 2627415655296cfdb65ff4f4030857d0482783a6 Mon Sep 17 00:00:00 2001 From: huijiro Date: Mon, 3 Nov 2025 12:39:53 -0300 Subject: [PATCH 4/4] Removed useless headers --- src/apis/email.ts | 36 +++++++----------------------------- 1 file changed, 7 insertions(+), 29 deletions(-) diff --git a/src/apis/email.ts b/src/apis/email.ts index 664779fd..7f4e1430 100644 --- a/src/apis/email.ts +++ b/src/apis/email.ts @@ -12,7 +12,7 @@ export default class EmailApi implements EmailService { * send an email */ async send( - req: AgentRequest, + _req: AgentRequest, ctx: AgentContext, to: string[], email: EmailReply, @@ -21,13 +21,6 @@ export default class EmailApi implements EmailService { email?: string; } ): Promise { - const authToken = req.metadata?.['email-auth-token'] as string; - if (!authToken) { - throw new Error( - 'email authorization token is required but not found in metadata' - ); - } - const tracer = getTracer(); const currentContext = context.active(); const span = tracer.startSpan('agentuity.email.send', {}, currentContext); @@ -85,16 +78,10 @@ export default class EmailApi implements EmailService { span.setAttribute('@agentuity/agentId', ctx.agent.id); span.setAttribute('@agentuity/emailMessageId', messageId); - const resp = await POST( - '/email/send', - message.toString(), - { - 'Content-Type': 'message/rfc822', - 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken - ); + const resp = await POST('/email/send', message.toString(), { + 'Content-Type': 'message/rfc822', + 'X-Agentuity-Message-Id': messageId, + }); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK }); resolve(messageId); @@ -126,7 +113,7 @@ export default class EmailApi implements EmailService { * send an email reply to an incoming email */ async sendReply( - req: AgentRequest, + _req: AgentRequest, ctx: AgentContext, inReplyTo: string, reply: EmailReply, @@ -135,13 +122,6 @@ export default class EmailApi implements EmailService { email?: string; } ): Promise { - const authToken = req.metadata?.['email-auth-token'] as string; - if (!authToken) { - throw new Error( - 'email authorization token is required but not found in metadata' - ); - } - const tracer = getTracer(); const currentContext = context.active(); const span = tracer.startSpan('agentuity.email.reply', {}, currentContext); @@ -201,9 +181,7 @@ export default class EmailApi implements EmailService { { 'Content-Type': 'message/rfc822', 'X-Agentuity-Message-Id': messageId, - }, - undefined, - authToken + } ); if (resp.status === 200) { span.setStatus({ code: SpanStatusCode.OK });