Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions src/examples/client/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
} from '../../types.js';
import { getDisplayName } from '../../shared/metadataUtils.js';
import { Ajv } from 'ajv';
import { InMemoryTaskStore } from '../../experimental/tasks/stores/in-memory.js';

// Create readline interface for user input
const readline = createInterface({
Expand Down Expand Up @@ -65,6 +66,7 @@ function printHelp(): void {
console.log(' greet [name] - Call the greet tool');
console.log(' multi-greet [name] - Call the multi-greet tool with notifications');
console.log(' collect-info [type] - Test form elicitation with collect-user-info tool (contact/preferences/feedback)');
console.log(' collect-info-task [type] - Test bidirectional task support (server+client tasks) with elicitation');
console.log(' start-notifications [interval] [count] - Start periodic notifications');
console.log(' run-notifications-tool-with-resumability [interval] [count] - Run notification tool with resumability');
console.log(' list-prompts - List available prompts');
Expand Down Expand Up @@ -131,6 +133,10 @@ function commandLoop(): void {
await callCollectInfoTool(args[1] || 'contact');
break;

case 'collect-info-task':
await callCollectInfoWithTask(args[1] || 'contact');
break;

case 'start-notifications': {
const interval = args[1] ? parseInt(args[1], 10) : 2000;
const count = args[2] ? parseInt(args[2], 10) : 10;
Expand Down Expand Up @@ -232,7 +238,10 @@ async function connect(url?: string): Promise<void> {
console.log(`Connecting to ${serverUrl}...`);

try {
// Create a new client with form elicitation capability
// Create task store for client-side task support
const clientTaskStore = new InMemoryTaskStore();

// Create a new client with form elicitation capability and task support
client = new Client(
{
name: 'example-client',
Expand All @@ -242,25 +251,46 @@ async function connect(url?: string): Promise<void> {
capabilities: {
elicitation: {
form: {}
},
tasks: {
requests: {
elicitation: {
create: {}
}
}
}
}
},
taskStore: clientTaskStore
}
);
client.onerror = error => {
console.error('\x1b[31mClient error:', error, '\x1b[0m');
};

// Set up elicitation request handler with proper validation
client.setRequestHandler(ElicitRequestSchema, async request => {
// Set up elicitation request handler with proper validation and task support
client.setRequestHandler(ElicitRequestSchema, async (request, extra) => {
if (request.params.mode !== 'form') {
throw new McpError(ErrorCode.InvalidParams, `Unsupported elicitation mode: ${request.params.mode}`);
}
console.log('\n🔔 Elicitation (form) Request Received:');
console.log(`Message: ${request.params.message}`);
console.log(`Related Task: ${request.params._meta?.[RELATED_TASK_META_KEY]?.taskId}`);
console.log(`Task Creation Requested: ${request.params.task ? 'yes' : 'no'}`);
console.log('Requested Schema:');
console.log(JSON.stringify(request.params.requestedSchema, null, 2));

// Helper to return result, optionally creating a task if requested
const returnResult = async (result: { action: 'accept' | 'decline' | 'cancel'; content?: Record<string, unknown> }) => {
if (request.params.task && extra.taskStore) {
// Create a task and store the result
const task = await extra.taskStore.createTask({ ttl: extra.taskRequestedTtl });
await extra.taskStore.storeTaskResult(task.taskId, 'completed', result);
console.log(`📋 Created client-side task: ${task.taskId}`);
return { task };
}
return result;
};

const schema = request.params.requestedSchema;
const properties = schema.properties;
const required = schema.required || [];
Expand Down Expand Up @@ -381,7 +411,7 @@ async function connect(url?: string): Promise<void> {
}

if (inputCancelled) {
return { action: 'cancel' };
return returnResult({ action: 'cancel' });
}

// If we didn't complete all fields due to an error, try again
Expand All @@ -394,7 +424,7 @@ async function connect(url?: string): Promise<void> {
continue;
} else {
console.log('Maximum attempts reached. Declining request.');
return { action: 'decline' };
return returnResult({ action: 'decline' });
}
}

Expand All @@ -412,7 +442,7 @@ async function connect(url?: string): Promise<void> {
continue;
} else {
console.log('Maximum attempts reached. Declining request.');
return { action: 'decline' };
return returnResult({ action: 'decline' });
}
}

Expand All @@ -427,24 +457,24 @@ async function connect(url?: string): Promise<void> {
});

if (confirmAnswer === 'yes' || confirmAnswer === 'y') {
return {
return returnResult({
action: 'accept',
content
};
});
} else if (confirmAnswer === 'cancel' || confirmAnswer === 'c') {
return { action: 'cancel' };
return returnResult({ action: 'cancel' });
} else if (confirmAnswer === 'no' || confirmAnswer === 'n') {
if (attempts < maxAttempts) {
console.log('Please re-enter the information...');
continue;
} else {
return { action: 'decline' };
return returnResult({ action: 'decline' });
}
}
}

console.log('Maximum attempts reached. Declining request.');
return { action: 'decline' };
return returnResult({ action: 'decline' });
});

transport = new StreamableHTTPClientTransport(new URL(serverUrl), {
Expand Down Expand Up @@ -641,6 +671,12 @@ async function callCollectInfoTool(infoType: string): Promise<void> {
await callTool('collect-user-info', { infoType });
}

async function callCollectInfoWithTask(infoType: string): Promise<void> {
console.log(`\n🔄 Testing bidirectional task support with collect-user-info-task tool (${infoType})...`);
console.log('This will create a task on the server, which will elicit input and create a task on the client.\n');
await callToolTask('collect-user-info-task', { infoType });
}

async function startNotifications(interval: number, count: number): Promise<void> {
console.log(`Starting notification stream: interval=${interval}ms, count=${count || 'unlimited'}`);
await callTool('start-notification-stream', { interval, count });
Expand Down
109 changes: 109 additions & 0 deletions src/examples/server/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { requireBearerAuth } from '../../server/auth/middleware/bearerAuth.js';
import { createMcpExpressApp } from '../../server/index.js';
import {
CallToolResult,
ElicitResult,
ElicitResultSchema,
GetPromptResult,
isInitializeRequest,
Expand Down Expand Up @@ -500,6 +501,114 @@ const getServer = () => {
}
);

// Register a tool that demonstrates bidirectional task support:
// Server creates a task, then elicits input from client using elicitInputStream
// Using the experimental tasks API - WARNING: may change without notice
server.experimental.tasks.registerToolTask(
'collect-user-info-task',
{
title: 'Collect Info with Task',
description: 'Collects user info via elicitation with task support using elicitInputStream',
inputSchema: {
infoType: z.enum(['contact', 'preferences']).describe('Type of information to collect').default('contact')
}
},
{
async createTask({ infoType }, { taskStore, taskRequestedTtl }) {
// Create the server-side task
const task = await taskStore.createTask({
ttl: taskRequestedTtl
});

// Perform async work that makes a nested elicitation request using elicitInputStream
(async () => {
try {
const message = infoType === 'contact' ? 'Please provide your contact information' : 'Please set your preferences';

// Define schemas with proper typing for PrimitiveSchemaDefinition
const contactSchema: {
type: 'object';
properties: Record<string, PrimitiveSchemaDefinition>;
required: string[];
} = {
type: 'object',
properties: {
name: { type: 'string', title: 'Full Name', description: 'Your full name' },
email: { type: 'string', title: 'Email', description: 'Your email address' }
},
required: ['name', 'email']
};

const preferencesSchema: {
type: 'object';
properties: Record<string, PrimitiveSchemaDefinition>;
required: string[];
} = {
type: 'object',
properties: {
theme: { type: 'string', title: 'Theme', enum: ['light', 'dark', 'auto'] },
notifications: { type: 'boolean', title: 'Enable Notifications', default: true }
},
required: ['theme']
};

const requestedSchema = infoType === 'contact' ? contactSchema : preferencesSchema;

// Use elicitInputStream to elicit input from client
// This demonstrates the streaming elicitation API
// Access via server.server to get the underlying Server instance
const stream = server.server.experimental.tasks.elicitInputStream({
mode: 'form',
message,
requestedSchema
});

let elicitResult: ElicitResult | undefined;
for await (const msg of stream) {
if (msg.type === 'result') {
elicitResult = msg.result as ElicitResult;
} else if (msg.type === 'error') {
throw msg.error;
}
}

if (!elicitResult) {
throw new Error('No result received from elicitation');
}

let resultText: string;
if (elicitResult.action === 'accept') {
resultText = `Collected ${infoType} info: ${JSON.stringify(elicitResult.content, null, 2)}`;
} else if (elicitResult.action === 'decline') {
resultText = `User declined to provide ${infoType} information`;
} else {
resultText = 'User cancelled the request';
}

await taskStore.storeTaskResult(task.taskId, 'completed', {
content: [{ type: 'text', text: resultText }]
});
} catch (error) {
console.error('Error in collect-user-info-task:', error);
await taskStore.storeTaskResult(task.taskId, 'failed', {
content: [{ type: 'text', text: `Error: ${error}` }],
isError: true
});
}
})();

return { task };
},
async getTask(_args, { taskId, taskStore }) {
return await taskStore.getTask(taskId);
},
async getTaskResult(_args, { taskId, taskStore }) {
const result = await taskStore.getTaskResult(taskId);
return result as CallToolResult;
}
}
);

return server;
};

Expand Down
Loading
Loading