diff --git a/docs/LocalExecutionMode.md b/docs/LocalExecutionMode.md new file mode 100644 index 000000000..49e50ca36 --- /dev/null +++ b/docs/LocalExecutionMode.md @@ -0,0 +1,469 @@ +# Local Execution Mode Design + +## Overview + +This document describes the design for a new execution mode in Halibut where RPC requests are executed locally on the worker node that dequeues work from the `IPendingRequestQueue`, rather than being proxied over TCP to a remote service. + +## Motivation + +Currently, Halibut's polling mode works as follows: +1. Client queues work in `IPendingRequestQueue` for a specific endpoint (e.g., `poll://tentacle-1`) +2. Worker (tentacle) establishes TCP connection and polls the queue +3. Worker dequeues requests and **proxies them over TCP** back to the server +4. Server executes the RPC and returns response over TCP +5. Worker receives response and applies it back to the queue + +This design introduces unnecessary TCP overhead when the goal is to distribute work execution across multiple worker nodes. Instead of using the queue for work distribution and then still doing TCP RPC, we want: + +1. Client queues work in `IPendingRequestQueue` for a logical worker pool (e.g., `local://worker-pool-a`) +2. Worker node registered as `local://worker-pool-a` dequeues requests +3. Worker **executes the RPC locally** on itself (no TCP) +4. Worker applies response back to the queue +5. Client receives response + +This enables true distributed work execution patterns like: +- Worker pools processing background jobs +- Horizontally scaled compute nodes +- Fan-out task distribution without TCP bottlenecks + +## Design + +### 1. URL Scheme: `local://` + +Use `local://` scheme to identify endpoints that should execute locally: +- `local://worker-pool-a` - worker pool A +- `local://worker-pool-b` - worker pool B +- `local://image-processor` - specialized image processing workers + +Multiple workers can register with the same `local://` identifier to form a pool. The queue (in-memory or Redis) handles load distribution. + +### 2. Service Registration and Worker Startup + +Services in Halibut are registered **globally** on the runtime, not per-endpoint. The existing service registration mechanism works unchanged: + +```csharp +var worker = new HalibutRuntime(serviceFactory); +worker.Services.AddSingleton(new MyServiceImpl()); +``` + +To start processing work for a specific `local://` endpoint, workers use a new API: + +```csharp +// Tell Halibut to start processing requests for this endpoint +await worker.PollLocalAsync("local://worker-pool-a", cancellationToken); +``` + +This is analogous to how TCP polling works today - services are registered globally, and the polling mechanism specifies which endpoint's queue to process. + +### 3. Protocol Layer Changes (Option B) + +Modify `MessageExchangeProtocol` to detect local execution mode and bypass TCP: + +#### Current Flow in `ProcessReceiverInternalAsync()`: +```csharp +// Lines 225-288 in MessageExchangeProtocol.cs +async Task ProcessReceiverInternalAsync(RequestMessageWithCancellationToken? nextRequest) +{ + if (nextRequest != null) + { + var response = await SendAndReceiveRequest(nextRequest); // TCP send/receive + await pendingRequests.ApplyResponse(response, nextRequest.ActivityId); + } + await stream.SendNext(); +} + +async Task SendAndReceiveRequest(RequestMessageWithCancellationToken request) +{ + await stream.SendAsync(request); // Serialize and send over TCP + return await stream.ReceiveResponseAsync(); // Deserialize from TCP +} +``` + +#### Proposed Flow with Local Execution: +```csharp +async Task ProcessReceiverInternalAsync(RequestMessageWithCancellationToken? nextRequest) +{ + if (nextRequest != null) + { + ResponseMessage response; + + if (isLocalExecutionMode) + { + // Execute locally using ServiceInvoker + response = await ExecuteLocallyAsync(nextRequest); + } + else + { + // Existing TCP-based execution + response = await SendAndReceiveRequest(nextRequest); + } + + await pendingRequests.ApplyResponse(response, nextRequest.ActivityId); + } + + if (!isLocalExecutionMode) + { + await stream.SendNext(); // Only needed for TCP mode + } +} + +async Task ExecuteLocallyAsync(RequestMessageWithCancellationToken request) +{ + try + { + // Use existing ServiceInvoker to execute the method locally + return await incomingRequestProcessor(request.RequestMessage, request.CancellationToken); + } + catch (Exception ex) + { + return ResponseMessage.FromException(request.RequestMessage, ex); + } +} +``` + +### 4. Detection of Local Execution Mode + +Add logic to detect `local://` scheme in the connection setup: + +#### In `MessageExchangeProtocol` constructor or initialization: +```csharp +private readonly bool isLocalExecutionMode; +private readonly Func>? incomingRequestProcessor; + +public MessageExchangeProtocol( + IMessageExchangeStream stream, + ConnectionId connectionId, + IPendingRequestQueue? pendingRequests, + Func>? incomingRequestProcessor, + bool isLocalExecutionMode = false) // New parameter +{ + this.stream = stream; + this.connectionId = connectionId; + this.pendingRequests = pendingRequests; + this.incomingRequestProcessor = incomingRequestProcessor; + this.isLocalExecutionMode = isLocalExecutionMode; +} +``` + +#### Detection in `PollingClient` or connection factory: +```csharp +// In PollingClient.ExecutePollingLoopAsync() or similar +var serviceUri = subscription.ServiceUri; +var isLocalMode = serviceUri.Scheme == "local"; + +// When creating protocol: +var protocol = new MessageExchangeProtocol( + stream, + connectionId, + pendingRequestQueue, + incomingRequestProcessor: isLocalMode ? localServiceInvoker : null, + isLocalExecutionMode: isLocalMode +); +``` + +### 5. Component Changes Summary + +#### A. `MessageExchangeProtocol.cs` +**File:** `/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs` + +Changes: +1. Add `isLocalExecutionMode` field and constructor parameter +2. Add `incomingRequestProcessor` field to invoke services locally +3. Modify `ProcessReceiverInternalAsync()` to check mode and route accordingly +4. Add new `ExecuteLocallyAsync()` method +5. Skip `SendNext()` control message in local mode +6. Keep existing `SendAndReceiveRequest()` for TCP mode + +Lines affected: ~225-294 + +#### B. `PollingClient.cs` +**File:** `/source/Halibut/Transport/PollingClient.cs` + +Changes: +1. Detect `local://` scheme in `subscription.ServiceUri` +2. Pass local execution flag to `MessageExchangeProtocol` +3. Provide `incomingRequestProcessor` (ServiceInvoker) when in local mode +4. May need to skip actual TCP connection establishment for local mode +5. Or: Use a "null" stream implementation that throws if accidentally used + +Lines affected: ~60-101 (ExecutePollingLoopAsync) + +#### C. Connection/Transport Layer +**Files:** +- `/source/Halibut/Transport/Protocol/SecureClient.cs` or similar +- Potentially `/source/Halibut/HalibutRuntime.cs` for routing + +Changes: +1. When establishing "connection" for `local://` endpoints: + - Don't create actual TCP socket + - Create dummy/null stream or special local stream + - Pass local service invoker instead +2. Service routing already exists via `HalibutRuntime.Routes` +3. May need to ensure services are registered before polling starts + +#### D. Stream Handling in Local Mode + +Two options: + +**Option 1: Null Stream** +- Create `NullMessageExchangeStream` that throws if methods are called +- Protocol layer ensures it's never used in local mode +- Safeguard against bugs + +**Option 2: No Stream** +- Make `IMessageExchangeStream` nullable in `MessageExchangeProtocol` +- Check for null before any stream operations +- Cleaner but requires more null checks + +Recommendation: Option 1 for safety. + +### 6. Queue Behavior + +No changes needed to `IPendingRequestQueue` interface or implementations: +- `PendingRequestQueueAsync` (in-memory) works as-is +- `RedisPendingRequestQueue` works as-is +- Queue doesn't care how execution happens +- Request/response correlation remains the same + +### 7. Serialization + +In local mode: +- **Request parameters:** Still need to be serialized when queued (supports Redis queue) +- **During execution:** Parameters deserialized from `RequestMessage.Params` by `ServiceInvoker` +- **Response:** Serialized back into `ResponseMessage` +- **No change needed:** Existing serialization paths handle this + +However, there's potential for optimization: +- If using in-memory queue only, could skip serialization entirely +- Keep serialized objects in memory +- Future enhancement, not required for v1 + +### 8. Worker Pool Registration + +Example usage: + +```csharp +// Worker Node 1 +var worker1 = new HalibutRuntime(serviceFactory); +worker1.Services.AddSingleton(new ImageProcessorImpl()); + +// Start polling for work from the local://image-processor queue +await worker1.PollLocalAsync("local://image-processor", cancellationToken); + +// Worker Node 2 (same pool) - on different machine or process +var worker2 = new HalibutRuntime(serviceFactory); +worker2.Services.AddSingleton(new ImageProcessorImpl()); + +// Both workers poll the same queue, load balanced automatically +await worker2.PollLocalAsync("local://image-processor", cancellationToken); + +// Client queues work +var client = new HalibutRuntime(serviceFactory); +var imageProcessor = client.CreateClient("local://image-processor"); +var result = await imageProcessor.ProcessImageAsync(imageData); // Queued, executed by worker1 or worker2 +``` + +**Key points:** +- Services registered globally on the worker's `HalibutRuntime` +- `PollLocalAsync()` is the new API that starts processing for a specific endpoint +- No TCP connection needed - workers directly access the queue +- Multiple workers can call `PollLocalAsync()` with the same identifier to form a pool + +### 9. Control Flow Differences + +#### TCP Mode: +``` +PollingClient + → Establish TCP connection + → MessageExchangeProtocol.ExchangeAsSubscriberAsync() + → Loop: + → DequeueAsync() from queue + → SendAsync(request) over TCP to server + → ReceiveResponseAsync() from TCP + → ApplyResponse() to queue + → SendNext() control message +``` + +#### Local Mode: +``` +PollingClient + → Skip TCP connection (or use null stream) + → MessageExchangeProtocol.ExchangeAsSubscriberAsync() + → Loop: + → DequeueAsync() from queue + → ExecuteLocallyAsync(request) using ServiceInvoker + → ApplyResponse() to queue + → (no SendNext needed) +``` + +### 10. Error Handling + +Local execution errors: +- Caught in `ExecuteLocallyAsync()` +- Wrapped in `ResponseMessage.FromException()` +- Applied to queue like any other response +- Client receives error response normally + +Same semantics as TCP mode - no special handling needed. + +### 11. Cancellation + +Cancellation tokens flow through: +1. Client provides `CancellationToken` to method call +2. Token stored in queue with request +3. Dequeued as `RequestMessageWithCancellationToken` +4. Passed to `ServiceInvoker.InvokeAsync()` +5. Method can check cancellation during execution + +Local mode has better cancellation behavior: +- No TCP serialization delays +- Direct propagation to service method +- Faster response to cancellation + +### 12. Testing Strategy + +#### Unit Tests: +1. `MessageExchangeProtocol` with `isLocalExecutionMode = true` + - Mock `incomingRequestProcessor` + - Verify local execution path taken + - Verify TCP methods not called + +2. `PollingClient` with `local://` URI + - Verify local mode detected + - Verify protocol configured correctly + +#### Integration Tests: +1. End-to-end with in-memory queue + - Client queues work for `local://test` + - Worker dequeues and executes locally + - Client receives response + +2. End-to-end with Redis queue + - Multiple workers polling same `local://pool` + - Verify work distribution + - Verify no crosstalk between pools + +3. Error scenarios + - Service throws exception + - Cancellation during execution + - Worker crashes mid-execution + +4. Performance comparison + - Measure latency: local vs TCP mode + - Measure throughput with worker pool + +### 13. Configuration and Feature Flags + +Consider adding configuration options: + +```csharp +public class HalibutRuntimeConfiguration +{ + /// + /// Enable local execution mode for 'local://' URIs. + /// Default: true + /// + public bool EnableLocalExecutionMode { get; set; } = true; + + /// + /// Timeout for local method execution. + /// Default: 5 minutes (same as TCP default) + /// + public TimeSpan LocalExecutionTimeout { get; set; } = TimeSpan.FromMinutes(5); +} +``` + +### 14. Migration Path + +This feature is additive and backward compatible: +1. Existing `poll://` endpoints work unchanged +2. New `local://` endpoints opt into local execution +3. No breaking changes to APIs +4. Can incrementally adopt in applications + +### 15. Performance Considerations + +**Benefits:** +- No TCP serialization overhead +- No network latency +- No SSL/TLS handshake overhead +- Lower memory (no network buffers) +- Faster cancellation propagation + +**Tradeoffs:** +- Still requires serialization for queue (especially Redis) +- Can't execute across machines (by design) +- Worker must have all required services registered + +**Expected Improvements:** +- Latency: 10-100x faster (no network) +- Throughput: 5-10x higher (no TCP bottleneck) +- CPU: Lower (no SSL overhead) + +### 16. Security Considerations + +Local mode is inherently more secure: +- No network exposure +- No certificate validation needed +- No SSL/TLS overhead +- Requests never leave the machine + +However: +- Queue still needs securing (Redis authentication, etc.) +- Service authorization still applies +- Trust boundary is at the queue, not transport + +### 17. Open Questions + +1. **Connection management:** Should we create a fake connection for local mode, or special-case it everywhere? + - **Recommendation:** Use null/mock stream, maintain consistent abstractions + +2. **Metrics and logging:** How to differentiate local vs TCP executions in telemetry? + - **Recommendation:** Add tags/properties to logs indicating execution mode + +3. **Health checks:** How to verify workers are running and polling? + - **Recommendation:** Leverage existing Redis heartbeat mechanism + +4. **Queue selection:** Should `local://` always use Redis, or support in-memory too? + - **Recommendation:** Support both, let application choose + +5. **Backward compatibility:** What if old worker connects with `local://` before feature is implemented? + - **Recommendation:** Fail fast with clear error message + +## Implementation Plan + +### Phase 1: Core Protocol Changes +1. Modify `MessageExchangeProtocol` to support local execution mode +2. Add `isLocalExecutionMode` flag and `ExecuteLocallyAsync()` method +3. Unit tests for protocol layer + +### Phase 2: Transport Integration +1. Update `PollingClient` to detect `local://` scheme +2. Pass local service invoker to protocol +3. Handle stream creation for local mode + +### Phase 3: Runtime Integration +1. Ensure service routing works with `local://` endpoints +2. Configuration options for local execution +3. Integration tests with in-memory queue + +### Phase 4: Redis Support +1. Test local execution with Redis queue +2. Multi-worker scenarios +3. Performance benchmarks + +### Phase 5: Documentation and Examples +1. Update Halibut documentation +2. Example applications showing worker pool pattern +3. Migration guide for existing polling users + +## Summary + +Local execution mode extends Halibut's queue-based architecture to enable true distributed work processing without TCP overhead. By modifying the protocol layer to detect `local://` URIs and execute requests locally using the existing `ServiceInvoker`, we can support worker pool patterns efficiently while maintaining backward compatibility and leveraging existing queue implementations. + +Key benefits: +- 10-100x lower latency +- No TCP/SSL overhead +- True horizontal scaling via worker pools +- Queue-agnostic (works with in-memory and Redis) +- Backward compatible with existing code \ No newline at end of file diff --git a/source/Halibut.Tests/HalibutExamplesFixture.cs b/source/Halibut.Tests/HalibutExamplesFixture.cs new file mode 100644 index 000000000..83187a113 --- /dev/null +++ b/source/Halibut.Tests/HalibutExamplesFixture.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using Halibut.ServiceModel; +using Halibut.Tests.Support; +using Halibut.Tests.TestServices; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; + +namespace Halibut.Tests +{ + public class HalibutExamplesFixture : BaseTest + { + [Test] + public async Task SimplePollingExample() + { + var services = GetDelegateServiceFactory(); + await using (var client = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.Octopus) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) + .Build()) + await using (var pollingService = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.TentaclePolling) + .WithServiceFactory(services) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) + .Build()) + { + var octopusPort = client.Listen(); + client.Trust(Certificates.TentaclePollingPublicThumbprint); + + pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken); + + var echo = client.CreateAsyncClient(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits)); + + await echo.SayHelloAsync("World"); + } + } + + static DelegateServiceFactory GetDelegateServiceFactory() + { + var services = new DelegateServiceFactory(); + services.Register(() => new AsyncEchoService()); + return services; + } + } +} \ No newline at end of file diff --git a/source/Halibut.Tests/LocalExecutionModeFixture.cs b/source/Halibut.Tests/LocalExecutionModeFixture.cs new file mode 100644 index 000000000..9ae86d803 --- /dev/null +++ b/source/Halibut.Tests/LocalExecutionModeFixture.cs @@ -0,0 +1,111 @@ +#if NET8_0_OR_GREATER +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Diagnostics; +using Halibut.Logging; +using Halibut.Queue; +using Halibut.Queue.Redis; +using Halibut.Queue.Redis.RedisDataLossDetection; +using Halibut.Queue.Redis.RedisHelpers; +using Halibut.ServiceModel; +using Halibut.Tests.Queue.Redis.Utils; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Logging; +using Halibut.Tests.TestServices; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; +using DisposableCollection = Halibut.Util.DisposableCollection; + +namespace Halibut.Tests +{ + public class LocalExecutionModeFixture : BaseTest + { + [RedisTest] + [Test] + public async Task SimpleLocalExecutionExample() + { + var services = GetDelegateServiceFactory(); + var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + timeoutsAndLimits = new HalibutTimeoutsAndLimits(); + + // Use a shared queue factory so client and worker share the same queue + var queueFactory = new PendingRequestQueueFactoryAsync(timeoutsAndLimits, new LogFactory()); + + var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace)); + + var log = new TestContextLogCreator("Redis", LogLevel.Fatal); + + var preSharedGuid = Guid.NewGuid(); + + await using var disposables = new DisposableCollection(); + + await using var client = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.Octopus) + .WithPendingRequestQueueFactory(RedisFactory()) + .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) + .Build(); + + await using var worker = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.TentaclePolling) + .WithServiceFactory(services) + .WithPendingRequestQueueFactory(RedisFactory()) + .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) + .Build(); + + // Start worker polling for local://test-worker + using var workerCts = new CancellationTokenSource(); + var pollingTask = Task.Run(async () => + { + //await Task.Delay(TimeSpan.FromSeconds(10)); + await worker.PollLocalAsync(new Uri("local://test-worker"), workerCts.Token); + }, workerCts.Token); + + // Client creates proxy to local://test-worker and makes request + var echo = client.CreateAsyncClient( + new ServiceEndPoint("local://test-worker", null, client.TimeoutsAndLimits)); + + var result = await echo.SayHelloAsync("World"); + result.Should().Be("World..."); + + // Cleanup + workerCts.Cancel(); + // try + // { + // await pollingTask; + // } + // catch (OperationCanceledException) + // { + // // Expected + // } + + Func RedisFactory() + { + return msgSer => + { + var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid); + disposables.AddAsyncDisposable(redisFacade); + var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher")); + disposables.AddAsyncDisposable(watchForRedisLosingAllItsData); + + return new RedisPendingRequestQueueFactory(msgSer, + new InMemoryStoreDataStreamsForDistributedQueues(), + watchForRedisLosingAllItsData, + new HalibutRedisTransport(redisFacade), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + logFactory); + }; + } + } + + static DelegateServiceFactory GetDelegateServiceFactory() + { + var services = new DelegateServiceFactory(); + services.Register(() => new AsyncEchoService()); + return services; + } + } +} +#endif diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 368899c1b..2a529f806 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -195,6 +195,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy)); } + public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken) + { + if (localEndpoint.Scheme.ToLowerInvariant() != "local") + { + throw new ArgumentException($"Only 'local://' endpoints are supported. Provided: {localEndpoint.Scheme}://", nameof(localEndpoint)); + } + + var queue = GetQueue(localEndpoint); + var log = logs.ForEndpoint(localEndpoint); + + log.Write(EventType.MessageExchange, $"Starting local polling for endpoint: {localEndpoint}"); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + var request = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false); + + if (request != null) + { + ResponseMessage response; + try + { + response = await invoker.InvokeAsync(request.RequestMessage).ConfigureAwait(false); + } + catch (Exception ex) + { + log.WriteException(EventType.Error, $"Error executing local request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex); + response = ResponseMessage.FromException(request.RequestMessage, ex); + } + + await queue.ApplyResponse(response, request.RequestMessage.ActivityId).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + log.Write(EventType.MessageExchange, $"Local polling cancelled for endpoint: {localEndpoint}"); + break; + } + catch (Exception ex) + { + log.WriteException(EventType.Error, $"Error in local polling loop for endpoint: {localEndpoint}", ex); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); + } + } + + log.Write(EventType.MessageExchange, $"Local polling stopped for endpoint: {localEndpoint}"); + } + public async Task DiscoverAsync(Uri uri, CancellationToken cancellationToken) { return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken); @@ -240,6 +289,9 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met case "poll": response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); break; + case "local": + response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); + break; default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme); } diff --git a/source/Halibut/IHalibutRuntime.cs b/source/Halibut/IHalibutRuntime.cs index 3d678db77..a4d8c7e22 100644 --- a/source/Halibut/IHalibutRuntime.cs +++ b/source/Halibut/IHalibutRuntime.cs @@ -17,6 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable int Listen(IPEndPoint endpoint); void ListenWebSocket(string endpoint); void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken); + Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken); Task DiscoverAsync(Uri uri, CancellationToken cancellationToken); Task DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);