From cd505a0199d376c17a4948cdc2c7461969c7c4d2 Mon Sep 17 00:00:00 2001 From: actbit <57023457+actbit@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:06:05 +0900 Subject: [PATCH 1/4] [fix] replys --- .../WebRtcChatInterface.cs | 175 +++++++++++++----- Clawleash/Configuration/ClawleashSettings.cs | 6 + Clawleash/Program.cs | 6 +- Clawleash/Services/ChatInterfaceManager.cs | 72 ++++++- Clawleash/appsettings.json | 1 + 5 files changed, 207 insertions(+), 53 deletions(-) diff --git a/Clawleash.Interfaces.WebRTC/WebRtcChatInterface.cs b/Clawleash.Interfaces.WebRTC/WebRtcChatInterface.cs index 09a8c05..e235aa6 100644 --- a/Clawleash.Interfaces.WebRTC/WebRtcChatInterface.cs +++ b/Clawleash.Interfaces.WebRTC/WebRtcChatInterface.cs @@ -28,6 +28,7 @@ public class WebRtcChatInterface : IChatInterface // Lucid.Rtc high-level API private RtcConnection? _rtcConnection; private readonly ConcurrentDictionary _peers = new(); + private bool _nativeClientAvailable; // 接続状態 private int _activeConnections; @@ -59,8 +60,16 @@ public async Task StartAsync(CancellationToken cancellationToken = default) try { - // Initialize Lucid.Rtc connection - InitializeRtcConnection(); + // Initialize Lucid.Rtc connection (if native client is enabled) + if (_settings.TryUseNativeClient) + { + InitializeRtcConnection(); + } + else + { + _logger?.LogInformation("Native WebRTC client disabled, using simulation mode"); + _nativeClientAvailable = false; + } // SignalRハブ接続を構築 _hubConnection = new HubConnectionBuilder() @@ -105,8 +114,9 @@ public async Task StartAsync(CancellationToken cancellationToken = default) // 既存のピアを取得して接続開始 await DiscoverAndConnectPeersAsync(); - _logger?.LogInformation("WebRTC interface started. E2EE: {E2ee}, Backend: Lucid.Rtc", - _settings.EnableE2ee ? "Enabled" : "Disabled"); + _logger?.LogInformation("WebRTC interface started. E2EE: {E2ee}, Mode: {Mode}", + _settings.EnableE2ee ? "Enabled" : "Disabled", + _nativeClientAvailable ? "Native (Lucid.Rtc)" : "SignalR (Fallback)"); } catch (Exception ex) { @@ -117,43 +127,59 @@ public async Task StartAsync(CancellationToken cancellationToken = default) private void InitializeRtcConnection() { - var builder = new RtcConnectionBuilder(); - - // STUN servers - foreach (var stunServer in _settings.StunServers) + try { - builder.WithStunServer(stunServer); - } + var builder = new RtcConnectionBuilder(); - // TURN server (optional) - if (!string.IsNullOrEmpty(_settings.TurnServerUrl)) - { - builder.WithTurnServer( - _settings.TurnServerUrl, - _settings.TurnUsername ?? "", - _settings.TurnPassword ?? ""); - } + // STUN servers + foreach (var stunServer in _settings.StunServers) + { + builder.WithStunServer(stunServer); + } - // Other settings - builder - .WithIceConnectionTimeout(_settings.IceConnectionTimeoutMs) - .WithDataChannelReliable(_settings.DataChannelReliable); + // TURN server (optional) + if (!string.IsNullOrEmpty(_settings.TurnServerUrl)) + { + builder.WithTurnServer( + _settings.TurnServerUrl, + _settings.TurnUsername ?? "", + _settings.TurnPassword ?? ""); + } + + // Other settings + builder + .WithIceConnectionTimeout(_settings.IceConnectionTimeoutMs) + .WithDataChannelReliable(_settings.DataChannelReliable); - _rtcConnection = builder.Build(); + _rtcConnection = builder.Build(); - // Register event handlers with method chaining - _rtcConnection - .On(e => OnPeerConnected(e.PeerId, e.Peer)) - .On(e => OnPeerDisconnected(e.PeerId)) - .On(e => HandleMessage(e.PeerId, e.Data)) - .On(e => SendIceCandidate(e.PeerId, e.Candidate)) - .On(e => SendOffer(e.PeerId, e.Sdp)) - .On(e => SendAnswer(e.PeerId, e.Sdp)) - .On(e => OnDataChannelOpen(e.PeerId, e.Peer)) - .On(e => OnDataChannelClosed(e.PeerId, e.Peer)) - .On(e => _logger?.LogError("Lucid.Rtc error: {Message}", e.Message)); + // Register event handlers with method chaining + _rtcConnection + .On(e => OnPeerConnected(e.PeerId, e.Peer)) + .On(e => OnPeerDisconnected(e.PeerId)) + .On(e => HandleMessage(e.PeerId, e.Data)) + .On(e => SendIceCandidate(e.PeerId, e.Candidate)) + .On(e => SendOffer(e.PeerId, e.Sdp)) + .On(e => SendAnswer(e.PeerId, e.Sdp)) + .On(e => OnDataChannelOpen(e.PeerId, e.Peer)) + .On(e => OnDataChannelClosed(e.PeerId, e.Peer)) + .On(e => _logger?.LogError("Lucid.Rtc error: {Message}", e.Message)); - _logger?.LogInformation("Lucid.Rtc connection initialized"); + _nativeClientAvailable = true; + _logger?.LogInformation("Lucid.Rtc connection initialized"); + } + catch (DllNotFoundException ex) + { + _logger?.LogWarning(ex, "Native WebRTC library not found, falling back to SignalR mode"); + _nativeClientAvailable = false; + _rtcConnection = null; + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to initialize native WebRTC, falling back to SignalR mode"); + _nativeClientAvailable = false; + _rtcConnection = null; + } } private void OnPeerConnected(string peerId, Peer peer) @@ -381,6 +407,16 @@ private void SetupSignalREventHandlers() { await HandleE2eeKeyExchangeAsync(data.FromPeerId, data.SessionId, data.PublicKey); }); + + // DataChannelメッセージ受信(SignalRフォールバック用) + _hubConnection!.On("datachannel-message", async data => + { + // ネイティブクライアントが利用できない場合のみ処理 + if (!_nativeClientAvailable) + { + await HandleDataChannelMessageAsync(data.FromPeerId, Convert.FromBase64String(data.Payload)); + } + }); } private async Task RegisterAsync() @@ -690,7 +726,7 @@ public async Task StopAsync(CancellationToken cancellationToken = default) public async Task SendMessageAsync(string message, string? replyToMessageId = null, CancellationToken cancellationToken = default) { - if (_rtcConnection == null || string.IsNullOrEmpty(_localPeerId)) + if (_hubConnection == null || string.IsNullOrEmpty(_localPeerId)) { _logger?.LogWarning("WebRTC not connected"); return; @@ -707,33 +743,70 @@ public async Task SendMessageAsync(string message, string? replyToMessageId = nu payload = Encoding.UTF8.GetBytes(message); } - // 特定のピアに送信(返信の場合) - if (!string.IsNullOrEmpty(replyToMessageId) && _channelTracking.TryGetValue(replyToMessageId, out var targetPeerId)) + // ネイティブクライアントが利用可能な場合はLucid.Rtcを使用 + if (_nativeClientAvailable && _rtcConnection != null) { - if (_peers.TryGetValue(targetPeerId, out var peer)) + // 特定のピアに送信(返信の場合) + if (!string.IsNullOrEmpty(replyToMessageId) && _channelTracking.TryGetValue(replyToMessageId, out var targetPeerId)) + { + if (_peers.TryGetValue(targetPeerId, out var peer)) + { + try + { + peer.Send(payload); + _logger?.LogDebug("Sent message to peer {PeerId} via native", targetPeerId); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to send message to peer {PeerId}", targetPeerId); + } + } + } + else { + // 全ピアにブロードキャスト try { - peer.Send(payload); - _logger?.LogDebug("Sent message to peer {PeerId}", targetPeerId); + _rtcConnection.Broadcast(payload); + _logger?.LogDebug("Broadcast message to all peers via native"); } catch (Exception ex) { - _logger?.LogWarning(ex, "Failed to send message to peer {PeerId}", targetPeerId); + _logger?.LogWarning(ex, "Failed to broadcast message"); } } } else { - // 全ピアにブロードキャスト - try + // SignalRフォールバックモード + var payloadBase64 = Convert.ToBase64String(payload); + + if (!string.IsNullOrEmpty(replyToMessageId) && _channelTracking.TryGetValue(replyToMessageId, out var targetPeerId)) { - _rtcConnection.Broadcast(payload); - _logger?.LogDebug("Broadcast message to all peers"); + try + { + await _hubConnection.InvokeAsync("SendDataChannelMessage", targetPeerId, payloadBase64, cancellationToken); + _logger?.LogDebug("Sent message to peer {PeerId} via SignalR", targetPeerId); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to send message to peer {PeerId} via SignalR", targetPeerId); + } } - catch (Exception ex) + else { - _logger?.LogWarning(ex, "Failed to broadcast message"); + // 全ピアに送信 + foreach (var peerId in _peers.Keys) + { + try + { + await _hubConnection.InvokeAsync("SendDataChannelMessage", peerId, payloadBase64, cancellationToken); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to send message to peer {PeerId} via SignalR", peerId); + } + } } } } @@ -875,3 +948,9 @@ internal class E2eeKeyExchangeEvent public string SessionId { get; set; } = string.Empty; public string PublicKey { get; set; } = string.Empty; } + +internal class DataChannelMessageEvent +{ + public string FromPeerId { get; set; } = string.Empty; + public string Payload { get; set; } = string.Empty; +} diff --git a/Clawleash/Configuration/ClawleashSettings.cs b/Clawleash/Configuration/ClawleashSettings.cs index 74ddf26..7fd5bf2 100644 --- a/Clawleash/Configuration/ClawleashSettings.cs +++ b/Clawleash/Configuration/ClawleashSettings.cs @@ -111,6 +111,12 @@ public class ChatInterfaceSettings /// public bool EnableHotReload { get; set; } = true; + /// + /// 返信を全インターフェースにブロードキャストするか + /// falseの場合は送信元のみに返信 + /// + public bool BroadcastReplies { get; set; } = false; + /// /// Discord設定 /// diff --git a/Clawleash/Program.cs b/Clawleash/Program.cs index e494a0b..f70588b 100644 --- a/Clawleash/Program.cs +++ b/Clawleash/Program.cs @@ -348,7 +348,11 @@ async Task HandleMessage(ChatMessageReceivedEventArgs e) return await ProcessMessageAsync(agent, e.Content); } - var manager = new ChatInterfaceManager(HandleMessage, logger); + var managerSettings = new ChatInterfaceManagerSettings + { + BroadcastReplies = settings.ChatInterface.BroadcastReplies + }; + var manager = new ChatInterfaceManager(HandleMessage, managerSettings, logger); // CLIを追加(ビルトイン) if (settings.ChatInterface.EnableCli) diff --git a/Clawleash/Services/ChatInterfaceManager.cs b/Clawleash/Services/ChatInterfaceManager.cs index 6ff4857..6f5704b 100644 --- a/Clawleash/Services/ChatInterfaceManager.cs +++ b/Clawleash/Services/ChatInterfaceManager.cs @@ -3,6 +3,18 @@ namespace Clawleash.Services; +/// +/// ChatInterfaceManagerの設定 +/// +public class ChatInterfaceManagerSettings +{ + /// + /// 返信を全インターフェースにブロードキャストするかどうか + /// falseの場合は送信元のみに返信 + /// + public bool BroadcastReplies { get; set; } = false; +} + /// /// チャットインターフェースを管理するサービス /// 複数のインターフェースを統合し、メッセージをエージェントに振り分ける @@ -13,6 +25,7 @@ public class ChatInterfaceManager : IAsyncDisposable private readonly List _interfaces = new(); private readonly Func> _messageHandler; private readonly ILogger? _logger; + private readonly ChatInterfaceManagerSettings _settings; private readonly object _lock = new(); private bool _disposed; @@ -46,9 +59,11 @@ public int TotalInterfaceCount public ChatInterfaceManager( Func> messageHandler, + ChatInterfaceManagerSettings? settings = null, ILogger? logger = null) { _messageHandler = messageHandler ?? throw new ArgumentNullException(nameof(messageHandler)); + _settings = settings ?? new ChatInterfaceManagerSettings(); _logger = logger; } @@ -227,17 +242,31 @@ private async void OnMessageReceived(object? sender, ChatMessageReceivedEventArg // エージェントにメッセージを処理させる var response = await _messageHandler(e); - // 必要に応じて返信 - if (e.RequiresReply && sender is IChatInterface iface) + // 返信の送信先を決定 + if (e.RequiresReply) { - await iface.SendMessageAsync(response, e.MessageId); + if (_settings.BroadcastReplies) + { + // 全インターフェースにブロードキャスト + await BroadcastMessageAsync(response, e.MessageId); + } + else if (sender is IChatInterface iface) + { + // 送信元のみに返信 + await iface.SendMessageAsync(response, e.MessageId); + } } } catch (Exception ex) { _logger?.LogError(ex, "Error processing message from {InterfaceName}", e.InterfaceName); - if (sender is IChatInterface iface) + // エラー通知の送信先を決定 + if (_settings.BroadcastReplies) + { + await BroadcastMessageAsync($"エラーが発生しました: {ex.Message}", e.MessageId); + } + else if (sender is IChatInterface iface) { try { @@ -251,6 +280,41 @@ private async void OnMessageReceived(object? sender, ChatMessageReceivedEventArg } } + /// + /// 全インターフェースにメッセージをブロードキャスト + /// + public async Task BroadcastMessageAsync(string message, string? replyToMessageId = null) + { + List interfacesToBroadcast; + + lock (_lock) + { + interfacesToBroadcast = _interfaces.Where(i => i.IsConnected).ToList(); + } + + if (interfacesToBroadcast.Count == 0) + { + _logger?.LogWarning("No connected interfaces to broadcast message"); + return; + } + + _logger?.LogDebug("Broadcasting message to {Count} interfaces", interfacesToBroadcast.Count); + + var tasks = interfacesToBroadcast.Select(async iface => + { + try + { + await iface.SendMessageAsync(message, replyToMessageId); + } + catch (Exception ex) + { + _logger?.LogWarning(ex, "Failed to send message to interface {Name}", iface.Name); + } + }); + + await Task.WhenAll(tasks); + } + public async ValueTask DisposeAsync() { if (_disposed) diff --git a/Clawleash/appsettings.json b/Clawleash/appsettings.json index ec89b65..e150c93 100644 --- a/Clawleash/appsettings.json +++ b/Clawleash/appsettings.json @@ -30,6 +30,7 @@ "EnableCli": true, "InterfacesDirectory": null, "EnableHotReload": true, + "BroadcastReplies": false, "Discord": { "Enabled": false, "Token": "", From e277b0c2137f334ccfa586a81f1e17abe79d351f Mon Sep 17 00:00:00 2001 From: actbit <57023457+actbit@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:24:25 +0900 Subject: [PATCH 2/4] [fix] stdio --- Clawleash/Mcp/McpPluginFactory.cs | 207 ++++++++++++++++++++++++++++++ Clawleash/Program.cs | 11 +- 2 files changed, 215 insertions(+), 3 deletions(-) create mode 100644 Clawleash/Mcp/McpPluginFactory.cs diff --git a/Clawleash/Mcp/McpPluginFactory.cs b/Clawleash/Mcp/McpPluginFactory.cs new file mode 100644 index 0000000..7a7bcfd --- /dev/null +++ b/Clawleash/Mcp/McpPluginFactory.cs @@ -0,0 +1,207 @@ +using System.Text.Json; +using Microsoft.SemanticKernel; + +namespace Clawleash.Mcp; + +/// +/// MCPツールからSemantic KernelのKernelFunctionを動的生成するファクトリ +/// +public static class McpPluginFactory +{ + /// + /// MCPツール情報からKernelFunctionを生成 + /// + public static KernelFunction CreateKernelFunction( + McpClientManager clientManager, + string serverName, + McpToolInfo tool) + { + // パラメータメタデータを生成 + var parameters = ParseInputSchema(tool.InputSchema); + + // デリゲートを作成 + Func> executeAsync = async (kernel, args) => + { + // 引数を辞書に変換 + var arguments = new Dictionary(); + foreach (var param in parameters) + { + if (args.TryGetValue(param.Name, out var value)) + { + arguments[param.Name] = value; + } + } + + return await clientManager.ExecuteToolAsync(serverName, tool.ToolName, arguments); + }; + + // KernelFunctionを作成 + var function = KernelFunctionFactory.CreateFromMethod( + method: executeAsync, + functionName: tool.ToolName, + description: tool.Description, + parameters: parameters); + + return function; + } + + /// + /// MCPサーバー全体をKernelPluginとして生成 + /// + public static KernelPlugin CreateKernelPlugin( + McpClientManager clientManager, + string serverName, + IEnumerable tools) + { + var functions = new List(); + + foreach (var tool in tools) + { + var function = CreateKernelFunction(clientManager, serverName, tool); + functions.Add(function); + } + + return KernelPluginFactory.CreateFromFunctions( + pluginName: $"Mcp_{serverName}", + description: $"MCP Server: {serverName}", + functions: functions); + } + + /// + /// JSON SchemaからKernelParameterMetadataを生成 + /// + private static List ParseInputSchema(JsonElement? inputSchema) + { + var parameters = new List(); + + if (inputSchema == null) + { + return parameters; + } + + try + { + var schema = inputSchema.Value; + + // JSON Schemaのpropertiesからパラメータを抽出 + if (schema.TryGetProperty("properties", out var properties)) + { + // requiredフィールドを取得 + var requiredParams = new HashSet(); + if (schema.TryGetProperty("required", out var required) && required.ValueKind == JsonValueKind.Array) + { + foreach (var req in required.EnumerateArray()) + { + requiredParams.Add(req.GetString() ?? ""); + } + } + + foreach (var prop in properties.EnumerateObject()) + { + var paramName = prop.Name; + var paramSchema = prop.Value; + + // 説明を取得 + var description = ""; + if (paramSchema.TryGetProperty("description", out var desc)) + { + description = desc.GetString() ?? ""; + } + + // 型を推測 + var type = InferParameterType(paramSchema); + + // デフォルト値 + object? defaultValue = null; + if (paramSchema.TryGetProperty("default", out var defaultVal)) + { + defaultValue = GetDefaultValue(defaultVal, type); + } + + var metadata = new KernelParameterMetadata(paramName) + { + Description = description, + ParameterType = type, + IsRequired = requiredParams.Contains(paramName), + DefaultValue = defaultValue + }; + + parameters.Add(metadata); + } + } + } + catch (Exception) + { + // パースエラーは無視(空のパラメータリストを返す) + } + + return parameters; + } + + /// + /// JSON Schemaから.NET型を推測 + /// + private static Type InferParameterType(JsonElement schema) + { + if (schema.TryGetProperty("type", out var typeProp)) + { + var typeStr = typeProp.GetString() ?? ""; + + return typeStr switch + { + "string" => typeof(string), + "integer" => typeof(int), + "number" => typeof(double), + "boolean" => typeof(bool), + "array" => typeof(object[]), + "object" => typeof(object), + _ => typeof(object) + }; + } + + // typeがない場合、他のヒントから推測 + if (schema.TryGetProperty("enum", out _)) + { + return typeof(string); + } + + return typeof(object); + } + + /// + /// デフォルト値を取得 + /// + private static object? GetDefaultValue(JsonElement defaultVal, Type type) + { + if (defaultVal.ValueKind == JsonValueKind.Null || defaultVal.ValueKind == JsonValueKind.Undefined) + { + return null; + } + + try + { + if (type == typeof(string)) + { + return defaultVal.GetString(); + } + if (type == typeof(int)) + { + return defaultVal.GetInt32(); + } + if (type == typeof(double)) + { + return defaultVal.GetDouble(); + } + if (type == typeof(bool)) + { + return defaultVal.GetBoolean(); + } + } + catch + { + // 変換エラーは無視 + } + + return null; + } +} diff --git a/Clawleash/Program.cs b/Clawleash/Program.cs index f70588b..c7e3333 100644 --- a/Clawleash/Program.cs +++ b/Clawleash/Program.cs @@ -203,13 +203,18 @@ private static Kernel BuildKernel(IServiceProvider serviceProvider, ClawleashSet var toolCount = mcpManager.GetAllTools().Count(); Console.WriteLine($"MCPツールを {toolCount} 件ロードしました"); - // 各MCPサーバーをプラグインとして登録 + // 各MCPサーバーのツールを個別のKernelFunctionとして登録 foreach (var (serverName, server) in mcpManager.Servers) { if (server.IsConnected && server.Tools.Count > 0) { - var mcpPlugin = new McpServerPlugin(mcpManager, serverName, server.Tools); - kernel.Plugins.AddFromObject(mcpPlugin, $"Mcp_{serverName}"); + // 動的に各ツールをKernelFunctionとして生成 + var plugin = McpPluginFactory.CreateKernelPlugin( + mcpManager, serverName, server.Tools); + kernel.Plugins.Add(plugin); + + Console.WriteLine($" {serverName}: {server.Tools.Count} ツール " + + $"({string.Join(", ", server.Tools.Select(t => t.ToolName))})"); } } } From 3a13c1b1a4c6f6db4039acee2a35edb542bc63cd Mon Sep 17 00:00:00 2001 From: actbit <57023457+actbit@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:28:33 +0900 Subject: [PATCH 3/4] [add] support sse --- Clawleash/Mcp/McpClientManager.cs | 262 +++++++++++++++++++++++++++++- 1 file changed, 257 insertions(+), 5 deletions(-) diff --git a/Clawleash/Mcp/McpClientManager.cs b/Clawleash/Mcp/McpClientManager.cs index edc98f9..90cb09b 100644 --- a/Clawleash/Mcp/McpClientManager.cs +++ b/Clawleash/Mcp/McpClientManager.cs @@ -1,4 +1,7 @@ +using System.Collections.Concurrent; using System.Diagnostics; +using System.Net.Http; +using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using Clawleash.Sandbox; @@ -22,10 +25,19 @@ public class McpToolInfo public class ConnectedServer { public McpServerConfig Config { get; set; } = null!; + + // stdio用 public Process? Process { get; set; } public StreamWriter? StdIn { get; set; } public StreamReader? StdOut { get; set; } public StreamReader? StdErr { get; set; } + + // SSE用 + public HttpClient? HttpClient { get; set; } + public CancellationTokenSource? SseCts { get; set; } + public Task? SseListenerTask { get; set; } + public ConcurrentDictionary> PendingRequests { get; } = new(); + public List Tools { get; set; } = new(); public bool IsConnected { get; set; } public int RequestId { get; set; } @@ -213,9 +225,9 @@ private async Task ConnectStdioAsync(ConnectedServer server) } /// - /// SSE トランスポートで接続(プレースホルダー) + /// SSE トランスポートで接続 /// - private Task ConnectSseAsync(ConnectedServer server) + private async Task ConnectSseAsync(ConnectedServer server) { var config = server.Config; @@ -225,8 +237,221 @@ private Task ConnectSseAsync(ConnectedServer server) } _logger.LogInformation("SSE接続: {Url}", config.Url); - // TODO: HttpClient + SSE実装 - throw new NotImplementedException("SSE接続は今後実装予定です"); + + // HttpClientを作成 + var handler = new HttpClientHandler + { + AutomaticDecompression = System.Net.DecompressionMethods.GZip | System.Net.DecompressionMethods.Deflate + }; + + server.HttpClient = new HttpClient(handler) + { + Timeout = TimeSpan.FromMilliseconds(config.TimeoutMs) + }; + + // カスタムヘッダーを設定 + if (config.Headers != null) + { + foreach (var (key, value) in config.Headers) + { + server.HttpClient.DefaultRequestHeaders.Add(key, value); + } + } + + // SSEストリームを開始 + server.SseCts = new CancellationTokenSource(); + server.SseListenerTask = Task.Run(() => ListenToSseStreamAsync(server), server.SseCts.Token); + + // SSE接続が確立されるまで少し待機 + await Task.Delay(100); + + _logger.LogInformation("SSE接続確立: {Url}", config.Url); + } + + /// + /// SSEストリームをリッスン + /// + private async Task ListenToSseStreamAsync(ConnectedServer server) + { + var baseUrl = server.Config.Url!.TrimEnd('/'); + var sseUrl = $"{baseUrl}/sse"; + + _logger.LogDebug("SSEストリーム開始: {Url}", sseUrl); + + while (server.SseCts?.Token.IsCancellationRequested == false) + { + try + { + var request = new HttpRequestMessage(HttpMethod.Get, sseUrl); + request.Headers.Add("Accept", "text/event-stream"); + + using var response = await server.HttpClient!.SendAsync( + request, + HttpCompletionOption.ResponseHeadersRead, + server.SseCts.Token); + + response.EnsureSuccessStatusCode(); + + using var stream = await response.Content.ReadAsStreamAsync(server.SseCts.Token); + using var reader = new StreamReader(stream); + + string? eventType = null; + var dataBuilder = new StringBuilder(); + + while (server.SseCts.Token.IsCancellationRequested == false) + { + var line = await reader.ReadLineAsync(server.SseCts.Token); + + if (line == null) + break; + + if (line.StartsWith("event:")) + { + eventType = line[6..].Trim(); + } + else if (line.StartsWith("data:")) + { + dataBuilder.AppendLine(line[5..].Trim()); + } + else if (string.IsNullOrEmpty(line)) + { + // 空行 = イベント終了 + if (dataBuilder.Length > 0) + { + await ProcessSseEventAsync(server, eventType, dataBuilder.ToString()); + } + eventType = null; + dataBuilder.Clear(); + } + } + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "SSEストリームエラー、再接続中..."); + + if (server.SseCts.Token.IsCancellationRequested) + break; + + await Task.Delay(1000, server.SseCts.Token); + } + } + + _logger.LogDebug("SSEストリーム終了: {Name}", server.Config.Name); + } + + /// + /// SSEイベントを処理 + /// + private Task ProcessSseEventAsync(ConnectedServer server, string? eventType, string data) + { + _logger.LogDebug("SSEイベント受信: type={Type}, data={Data}", eventType, data[..Math.Min(100, data.Length)]); + + try + { + using var doc = JsonDocument.Parse(data.Trim()); + var root = doc.RootElement; + + // JSON-RPCレスポンスの場合 + if (root.TryGetProperty("id", out var idElement)) + { + var id = idElement.GetInt32(); + + if (server.PendingRequests.TryRemove(id, out var tcs)) + { + if (root.TryGetProperty("error", out var error)) + { + var message = error.TryGetProperty("message", out var msg) + ? msg.GetString() + : "Unknown error"; + _logger.LogError("MCP SSEエラー: {Message}", message); + tcs.TrySetResult(null); + } + else if (root.TryGetProperty("result", out var result)) + { + var resultJson = result.GetRawText(); + tcs.TrySetResult(JsonDocument.Parse(resultJson).RootElement); + } + else + { + tcs.TrySetResult(null); + } + } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "SSEイベント解析エラー"); + } + + return Task.CompletedTask; + } + + /// + /// SSE経由でJSON-RPCリクエストを送信 + /// + private async Task SendSseRequestAsync(ConnectedServer server, string method, object? parameters = null) + { + if (server.HttpClient == null) + { + return null; + } + + var requestId = ++server.RequestId; + var request = new + { + jsonrpc = "2.0", + id = requestId, + method, + @params = parameters + }; + + var json = JsonSerializer.Serialize(request); + _logger.LogDebug("MCP SSE Request: {Json}", json); + + // TaskCompletionSourceを作成して待機 + var tcs = new TaskCompletionSource(); + server.PendingRequests[requestId] = tcs; + + try + { + var baseUrl = server.Config.Url!.TrimEnd('/'); + var messageUrl = $"{baseUrl}/message"; + + var content = new StringContent(json, Encoding.UTF8, "application/json"); + var response = await server.HttpClient.PostAsync(messageUrl, content, server.SseCts?.Token ?? CancellationToken.None); + + if (!response.IsSuccessStatusCode) + { + _logger.LogError("MCP SSE POST失敗: {Status}", response.StatusCode); + server.PendingRequests.TryRemove(requestId, out _); + return null; + } + + // タイムアウト付きでレスポンスを待機 + using var cts = new CancellationTokenSource(server.Config.TimeoutMs); + var completedTask = await Task.WhenAny( + tcs.Task, + Task.Delay(server.Config.TimeoutMs, cts.Token)); + + if (completedTask != tcs.Task) + { + _logger.LogError("MCP SSE リクエストがタイムアウト: {Method}", method); + server.PendingRequests.TryRemove(requestId, out _); + return null; + } + + return await tcs.Task; + } + catch (Exception ex) + { + _logger.LogError(ex, "MCP SSE リクエストエラー: {Method}", method); + server.PendingRequests.TryRemove(requestId, out _); + return null; + } } /// @@ -301,9 +526,23 @@ private async Task LoadToolsAsync(ConnectedServer server) } /// - /// MCPリクエストを送信 + /// MCPリクエストを送信(トランスポート自動選択) /// private async Task SendRequestAsync(ConnectedServer server, string method, object? parameters = null) + { + // トランスポートに応じて適切なメソッドを選択 + if (server.HttpClient != null) + { + return await SendSseRequestAsync(server, method, parameters); + } + + return await SendStdioRequestAsync(server, method, parameters); + } + + /// + /// stdio経由でJSON-RPCリクエストを送信 + /// + private async Task SendStdioRequestAsync(ConnectedServer server, string method, object? parameters = null) { if (server.StdIn == null || server.StdOut == null) { @@ -464,11 +703,24 @@ private void CleanupServer(ConnectedServer server) { try { + // stdio クリーンアップ server.StdIn?.Close(); server.StdOut?.Close(); server.StdErr?.Close(); server.Process?.Kill(entireProcessTree: true); server.Process?.Dispose(); + + // SSE クリーンアップ + server.SseCts?.Cancel(); + server.SseCts?.Dispose(); + server.HttpClient?.Dispose(); + + // 待機中のリクエストをキャンセル + foreach (var tcs in server.PendingRequests.Values) + { + tcs.TrySetCanceled(); + } + server.PendingRequests.Clear(); } catch { } } From 3a386e45e8273bed7a5c1d2f33ee0b319fdee01f Mon Sep 17 00:00:00 2001 From: actbit <57023457+actbit@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:39:40 +0900 Subject: [PATCH 4/4] [add] mcp test --- Clawleash.Tests/Mcp/McpPluginFactoryTests.cs | 234 +++++++++++++++++++ Clawleash.Tests/Mcp/McpSettingsTests.cs | 98 +++++++- Clawleash/Mcp/McpPluginFactory.cs | 5 +- 3 files changed, 335 insertions(+), 2 deletions(-) create mode 100644 Clawleash.Tests/Mcp/McpPluginFactoryTests.cs diff --git a/Clawleash.Tests/Mcp/McpPluginFactoryTests.cs b/Clawleash.Tests/Mcp/McpPluginFactoryTests.cs new file mode 100644 index 0000000..31ca07f --- /dev/null +++ b/Clawleash.Tests/Mcp/McpPluginFactoryTests.cs @@ -0,0 +1,234 @@ +using FluentAssertions; +using Moq; +using Microsoft.Extensions.Logging; +using Microsoft.SemanticKernel; +using Clawleash.Mcp; +using System.Text.Json; + +namespace Clawleash.Tests.Mcp; + +public class McpPluginFactoryTests +{ + private readonly Mock _loggerFactoryMock; + private readonly Mock> _loggerMock; + + public McpPluginFactoryTests() + { + _loggerMock = new Mock>(); + _loggerFactoryMock = new Mock(); + _loggerFactoryMock.Setup(x => x.CreateLogger(It.IsAny())) + .Returns(_loggerMock.Object); + } + + [Fact] + public void CreateKernelFunction_ShouldCreateFunctionWithCorrectName() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "get_weather", + Description = "Get weather information" + }; + + // Act + var function = McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + function.Should().NotBeNull(); + function.Name.Should().Be("get_weather"); + function.Description.Should().Be("Get weather information"); + } + + [Fact] + public void CreateKernelFunction_WithInputSchema_ShouldParseParameters() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var schemaJson = @"{ + ""type"": ""object"", + ""properties"": { + ""location"": { + ""type"": ""string"", + ""description"": ""City name"" + }, + ""unit"": { + ""type"": ""string"", + ""description"": ""Temperature unit"", + ""enum"": [""celsius"", ""fahrenheit""] + } + }, + ""required"": [""location""] + }"; + + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "get_weather", + Description = "Get weather", + InputSchema = JsonDocument.Parse(schemaJson).RootElement + }; + + // Act + var function = McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + function.Should().NotBeNull(); + function.Name.Should().Be("get_weather"); + // パラメータメタデータはKernelFunction内部に保持される + } + + [Fact] + public void CreateKernelPlugin_ShouldCreatePluginWithMultipleFunctions() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var tools = new List + { + new() { ServerName = "test-server", ToolName = "read_file", Description = "Read a file" }, + new() { ServerName = "test-server", ToolName = "write_file", Description = "Write a file" }, + new() { ServerName = "test-server", ToolName = "list_files", Description = "List files" } + }; + + // Act + var plugin = McpPluginFactory.CreateKernelPlugin(manager, "test-server", tools); + + // Assert + plugin.Should().NotBeNull(); + plugin.Name.Should().Be("Mcp_test_server"); + plugin.FunctionCount.Should().Be(3); + plugin.Should().Contain(f => f.Name == "read_file"); + plugin.Should().Contain(f => f.Name == "write_file"); + plugin.Should().Contain(f => f.Name == "list_files"); + } + + [Fact] + public void CreateKernelPlugin_WithEmptyTools_ShouldCreateEmptyPlugin() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var tools = new List(); + + // Act + var plugin = McpPluginFactory.CreateKernelPlugin(manager, "test-server", tools); + + // Assert + plugin.Should().NotBeNull(); + plugin.Name.Should().Be("Mcp_test_server"); + plugin.FunctionCount.Should().Be(0); + } + + [Theory] + [InlineData("string")] + [InlineData("integer")] + [InlineData("number")] + [InlineData("boolean")] + [InlineData("array")] + [InlineData("object")] + public void CreateKernelFunction_ShouldInferCorrectTypes(string jsonType) + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var schemaJson = @"{ + ""type"": ""object"", + ""properties"": { + ""param"": { + ""type"": """ + jsonType + @""" + } + } + }"; + + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "test_tool", + Description = "Test tool", + InputSchema = JsonDocument.Parse(schemaJson).RootElement + }; + + // Act + var function = McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + function.Should().NotBeNull(); + } + + [Fact] + public void CreateKernelFunction_WithComplexSchema_ShouldNotThrow() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var schemaJson = @"{ + ""type"": ""object"", + ""properties"": { + ""nested"": { + ""type"": ""object"", + ""properties"": { + ""inner"": { ""type"": ""string"" } + } + }, + ""array"": { + ""type"": ""array"", + ""items"": { ""type"": ""string"" } + } + }, + ""required"": [""nested""] + }"; + + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "complex_tool", + Description = "Complex tool", + InputSchema = JsonDocument.Parse(schemaJson).RootElement + }; + + // Act + var act = () => McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + act.Should().NotThrow(); + } + + [Fact] + public void CreateKernelFunction_WithNullSchema_ShouldCreateFunctionWithoutParameters() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "no_params_tool", + Description = "Tool without parameters", + InputSchema = null + }; + + // Act + var function = McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + function.Should().NotBeNull(); + function.Name.Should().Be("no_params_tool"); + } + + [Fact] + public void CreateKernelFunction_WithInvalidSchema_ShouldNotThrow() + { + // Arrange + var manager = new McpClientManager(_loggerFactoryMock.Object); + var tool = new McpToolInfo + { + ServerName = "test-server", + ToolName = "invalid_schema_tool", + Description = "Tool with invalid schema", + InputSchema = JsonDocument.Parse("\"invalid\"").RootElement + }; + + // Act + var act = () => McpPluginFactory.CreateKernelFunction(manager, "test-server", tool); + + // Assert + act.Should().NotThrow(); + } +} diff --git a/Clawleash.Tests/Mcp/McpSettingsTests.cs b/Clawleash.Tests/Mcp/McpSettingsTests.cs index c112a3f..8185969 100644 --- a/Clawleash.Tests/Mcp/McpSettingsTests.cs +++ b/Clawleash.Tests/Mcp/McpSettingsTests.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Clawleash.Mcp; using System.Text.Json; +using System.Threading.Tasks; namespace Clawleash.Tests.Mcp; @@ -102,6 +103,73 @@ public void McpSettings_ShouldDeserializeFromJson() settings.Servers[0].Name.Should().Be("github"); settings.Servers[1].Name.Should().Be("filesystem"); } + + [Fact] + public void McpServerConfig_Sse_ShouldDeserializeFromJson() + { + // Arrange + var json = @"{ + ""name"": ""sse-server"", + ""transport"": ""sse"", + ""url"": ""http://localhost:3000"", + ""headers"": { + ""Authorization"": ""Bearer token123"" + }, + ""enabled"": true, + ""timeoutMs"": 45000 + }"; + + // Act + var config = JsonSerializer.Deserialize(json, new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); + + // Assert + config.Should().NotBeNull(); + config!.Name.Should().Be("sse-server"); + config.Transport.Should().Be("sse"); + config.Url.Should().Be("http://localhost:3000"); + config.Headers.Should().ContainKey("Authorization"); + config.Headers!["Authorization"].Should().Be("Bearer token123"); + config.TimeoutMs.Should().Be(45000); + } + + [Fact] + public void McpSettings_WithMixedTransports_ShouldDeserializeFromJson() + { + // Arrange + var json = @"{ + ""enabled"": true, + ""servers"": [ + { + ""name"": ""stdio-tool"", + ""transport"": ""stdio"", + ""command"": ""node"", + ""args"": [""server.js""] + }, + { + ""name"": ""sse-tool"", + ""transport"": ""sse"", + ""url"": ""http://api.example.com/mcp"" + } + ] + }"; + + // Act + var settings = JsonSerializer.Deserialize(json, new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true + }); + + // Assert + settings.Should().NotBeNull(); + settings!.Servers.Should().HaveCount(2); + settings.Servers[0].Transport.Should().Be("stdio"); + settings.Servers[0].Command.Should().Be("node"); + settings.Servers[1].Transport.Should().Be("sse"); + settings.Servers[1].Url.Should().Be("http://api.example.com/mcp"); + } } public class McpToolInfoTests @@ -128,15 +196,43 @@ public void ConnectedServer_DefaultValues_ShouldBeCorrect() // Arrange & Act var server = new ConnectedServer(); - // Assert + // Assert - stdio fields server.Process.Should().BeNull(); server.StdIn.Should().BeNull(); server.StdOut.Should().BeNull(); server.StdErr.Should().BeNull(); + + // Assert - SSE fields + server.HttpClient.Should().BeNull(); + server.SseCts.Should().BeNull(); + server.SseListenerTask.Should().BeNull(); + server.PendingRequests.Should().BeEmpty(); + + // Assert - common fields server.Tools.Should().BeEmpty(); server.IsConnected.Should().BeFalse(); server.RequestId.Should().Be(0); } + + [Fact] + public void ConnectedServer_PendingRequests_ShouldBeThreadSafe() + { + // Arrange + var server = new ConnectedServer(); + var tcs = new TaskCompletionSource(); + + // Act + server.PendingRequests[1] = tcs; + var exists = server.PendingRequests.TryGetValue(1, out var retrieved); + var removed = server.PendingRequests.TryRemove(1, out _); + + // Assert + exists.Should().BeTrue(); + removed.Should().BeTrue(); + // retrieved and tcs should be the same reference + (retrieved == tcs).Should().BeTrue("retrieved should be the same instance as tcs"); + server.PendingRequests.Should().BeEmpty(); + } } public class McpClientManagerTests : IDisposable diff --git a/Clawleash/Mcp/McpPluginFactory.cs b/Clawleash/Mcp/McpPluginFactory.cs index 7a7bcfd..ca982e5 100644 --- a/Clawleash/Mcp/McpPluginFactory.cs +++ b/Clawleash/Mcp/McpPluginFactory.cs @@ -61,8 +61,11 @@ public static KernelPlugin CreateKernelPlugin( functions.Add(function); } + // プラグイン名を正規化(ハイフン等をアンダースコアに変換) + var normalizedServerName = serverName.Replace("-", "_").Replace(" ", "_"); + return KernelPluginFactory.CreateFromFunctions( - pluginName: $"Mcp_{serverName}", + pluginName: $"Mcp_{normalizedServerName}", description: $"MCP Server: {serverName}", functions: functions); }