|
2 | 2 | package app |
3 | 3 |
|
4 | 4 | import ( |
| 5 | + "context" |
5 | 6 | "fmt" |
| 7 | + "time" |
6 | 8 |
|
7 | 9 | "github.com/spf13/cobra" |
8 | 10 | "github.com/spf13/viper" |
9 | 11 |
|
| 12 | + "github.com/stacklok/toolhive/pkg/groups" |
10 | 13 | "github.com/stacklok/toolhive/pkg/logger" |
| 14 | + "github.com/stacklok/toolhive/pkg/vmcp/aggregator" |
| 15 | + vmcpclient "github.com/stacklok/toolhive/pkg/vmcp/client" |
11 | 16 | "github.com/stacklok/toolhive/pkg/vmcp/config" |
| 17 | + vmcprouter "github.com/stacklok/toolhive/pkg/vmcp/router" |
| 18 | + vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server" |
| 19 | + "github.com/stacklok/toolhive/pkg/workloads" |
12 | 20 | ) |
13 | 21 |
|
14 | 22 | var rootCmd = &cobra.Command{ |
@@ -74,18 +82,7 @@ func newServeCmd() *cobra.Command { |
74 | 82 | The server will read the configuration file specified by --config flag and start |
75 | 83 | listening for MCP client connections. It will aggregate tools, resources, and prompts |
76 | 84 | from all configured backend MCP servers.`, |
77 | | - RunE: func(_ *cobra.Command, _ []string) error { |
78 | | - configPath := viper.GetString("config") |
79 | | - if configPath == "" { |
80 | | - return fmt.Errorf("no configuration file specified, use --config flag") |
81 | | - } |
82 | | - |
83 | | - logger.Infof("Loading configuration from: %s", configPath) |
84 | | - // TODO: Load configuration and start server |
85 | | - // This will be implemented in a future PR when pkg/vmcp is added |
86 | | - |
87 | | - return fmt.Errorf("serve command not yet implemented") |
88 | | - }, |
| 85 | + RunE: runServe, |
89 | 86 | } |
90 | 87 | } |
91 | 88 |
|
@@ -171,3 +168,116 @@ func getVersion() string { |
171 | 168 | // This will be replaced with actual version info using ldflags |
172 | 169 | return "dev" |
173 | 170 | } |
| 171 | + |
| 172 | +// runServe implements the serve command logic |
| 173 | +func runServe(cmd *cobra.Command, _ []string) error { |
| 174 | + ctx := cmd.Context() |
| 175 | + configPath := viper.GetString("config") |
| 176 | + |
| 177 | + if configPath == "" { |
| 178 | + return fmt.Errorf("no configuration file specified, use --config flag") |
| 179 | + } |
| 180 | + |
| 181 | + logger.Infof("Loading configuration from: %s", configPath) |
| 182 | + |
| 183 | + // Load configuration from YAML |
| 184 | + loader := config.NewYAMLLoader(configPath) |
| 185 | + cfg, err := loader.Load() |
| 186 | + if err != nil { |
| 187 | + logger.Errorf("Failed to load configuration: %v", err) |
| 188 | + return fmt.Errorf("configuration loading failed: %w", err) |
| 189 | + } |
| 190 | + |
| 191 | + // Validate configuration |
| 192 | + validator := config.NewValidator() |
| 193 | + if err := validator.Validate(cfg); err != nil { |
| 194 | + logger.Errorf("Configuration validation failed: %v", err) |
| 195 | + return fmt.Errorf("validation failed: %w", err) |
| 196 | + } |
| 197 | + |
| 198 | + logger.Infof("Configuration loaded and validated successfully") |
| 199 | + logger.Infof(" Name: %s", cfg.Name) |
| 200 | + logger.Infof(" Group: %s", cfg.GroupRef) |
| 201 | + logger.Infof(" Conflict Resolution: %s", cfg.Aggregation.ConflictResolution) |
| 202 | + |
| 203 | + // Initialize managers for backend discovery |
| 204 | + logger.Info("Initializing workload and group managers") |
| 205 | + workloadsManager, err := workloads.NewManager(ctx) |
| 206 | + if err != nil { |
| 207 | + return fmt.Errorf("failed to create workloads manager: %w", err) |
| 208 | + } |
| 209 | + |
| 210 | + groupsManager, err := groups.NewManager() |
| 211 | + if err != nil { |
| 212 | + return fmt.Errorf("failed to create groups manager: %w", err) |
| 213 | + } |
| 214 | + |
| 215 | + // Create backend discoverer |
| 216 | + discoverer := aggregator.NewCLIBackendDiscoverer(workloadsManager, groupsManager) |
| 217 | + |
| 218 | + // Discover backends from the configured group |
| 219 | + logger.Infof("Discovering backends in group: %s", cfg.GroupRef) |
| 220 | + backends, err := discoverer.Discover(ctx, cfg.GroupRef) |
| 221 | + if err != nil { |
| 222 | + return fmt.Errorf("failed to discover backends: %w", err) |
| 223 | + } |
| 224 | + |
| 225 | + if len(backends) == 0 { |
| 226 | + return fmt.Errorf("no backends found in group %s", cfg.GroupRef) |
| 227 | + } |
| 228 | + |
| 229 | + logger.Infof("Discovered %d backends", len(backends)) |
| 230 | + |
| 231 | + // Create backend client |
| 232 | + backendClient := vmcpclient.NewHTTPBackendClient() |
| 233 | + |
| 234 | + // Create conflict resolver based on configuration |
| 235 | + // Use the factory method that handles all strategies |
| 236 | + conflictResolver, err := aggregator.NewConflictResolver(cfg.Aggregation) |
| 237 | + if err != nil { |
| 238 | + return fmt.Errorf("failed to create conflict resolver: %w", err) |
| 239 | + } |
| 240 | + |
| 241 | + // Create aggregator |
| 242 | + agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools) |
| 243 | + |
| 244 | + // Aggregate capabilities from all backends with timeout |
| 245 | + logger.Info("Aggregating capabilities from backends") |
| 246 | + aggCtx, cancel := context.WithTimeout(ctx, 30*time.Second) |
| 247 | + defer cancel() |
| 248 | + |
| 249 | + capabilities, err := agg.AggregateCapabilities(aggCtx, backends) |
| 250 | + if err != nil { |
| 251 | + return fmt.Errorf("failed to aggregate capabilities: %w", err) |
| 252 | + } |
| 253 | + |
| 254 | + logger.Infof("Aggregated %d tools, %d resources, %d prompts from %d backends", |
| 255 | + capabilities.Metadata.ToolCount, |
| 256 | + capabilities.Metadata.ResourceCount, |
| 257 | + capabilities.Metadata.PromptCount, |
| 258 | + capabilities.Metadata.BackendCount) |
| 259 | + |
| 260 | + // Create router |
| 261 | + rtr := vmcprouter.NewDefaultRouter() |
| 262 | + |
| 263 | + // Create server configuration |
| 264 | + serverCfg := &vmcpserver.Config{ |
| 265 | + Name: cfg.Name, |
| 266 | + Version: getVersion(), |
| 267 | + Host: "127.0.0.1", // TODO: Make configurable |
| 268 | + Port: 4483, // TODO: Make configurable |
| 269 | + } |
| 270 | + |
| 271 | + // Create server |
| 272 | + srv := vmcpserver.New(serverCfg, rtr, backendClient) |
| 273 | + |
| 274 | + // Register capabilities |
| 275 | + logger.Info("Registering capabilities with server") |
| 276 | + if err := srv.RegisterCapabilities(ctx, capabilities); err != nil { |
| 277 | + return fmt.Errorf("failed to register capabilities: %w", err) |
| 278 | + } |
| 279 | + |
| 280 | + // Start server (blocks until shutdown signal) |
| 281 | + logger.Infof("Starting Virtual MCP Server at %s", srv.Address()) |
| 282 | + return srv.Start(ctx) |
| 283 | +} |
0 commit comments