From a3b836df1cd4bae74fa7e33bfc5765eef253c2ea Mon Sep 17 00:00:00 2001 From: Mohammad Aziz Date: Tue, 28 Apr 2026 08:32:34 +0530 Subject: [PATCH] feat(ws): add task.deliver receive, task.started emission, and queue integration --- app/jobs/taskjob/result_channel_test.go | 15 ++ app/jobs/taskjob/taskjob.go | 37 +++- app/services/localtaskstore/store.go | 1 + app/services/wsclient/client.go | 116 +++++++++++- app/services/wsclient/client_test.go | 239 ++++++++++++++++++++++++ internal/wsprotocol/message.go | 12 ++ main.go | 8 +- 7 files changed, 415 insertions(+), 13 deletions(-) diff --git a/app/jobs/taskjob/result_channel_test.go b/app/jobs/taskjob/result_channel_test.go index 477fc51..0d2c317 100644 --- a/app/jobs/taskjob/result_channel_test.go +++ b/app/jobs/taskjob/result_channel_test.go @@ -26,6 +26,12 @@ func TestTaskJobStreamsOutputAndFinalOverResultChannel(t *testing.T) { job.processTask(context.Background(), fetcher.tasks[0], reporter, channel) + if len(channel.started) != 1 { + t.Fatalf("started len = %d, want 1", len(channel.started)) + } + if channel.started[0].TaskID != "task-1" || channel.started[0].ExecutionAttemptID != "attempt-1" { + t.Fatalf("started = %#v", channel.started[0]) + } if len(channel.outputs) != 2 { t.Fatalf("outputs len = %d, want 2", len(channel.outputs)) } @@ -189,12 +195,21 @@ func (f *fakeTaskReporter) Report(taskID string, result *taskreporter.TaskResult type fakeResultChannel struct { mu sync.Mutex + started []localtaskstore.TaskReceipt outputs []localtaskstore.OutputChunk finals []localtaskstore.FinalResult + startedErr error outputErrs []error finalErr error } +func (f *fakeResultChannel) SendStarted(ctx context.Context, receipt localtaskstore.TaskReceipt) error { + f.mu.Lock() + defer f.mu.Unlock() + f.started = append(f.started, receipt) + return f.startedErr +} + func (f *fakeResultChannel) SendOutput(ctx context.Context, chunk localtaskstore.OutputChunk) error { f.mu.Lock() defer f.mu.Unlock() diff --git a/app/jobs/taskjob/taskjob.go b/app/jobs/taskjob/taskjob.go index 6df56c2..c376dbd 100644 --- a/app/jobs/taskjob/taskjob.go +++ b/app/jobs/taskjob/taskjob.go @@ -31,14 +31,16 @@ type TaskJobConfig struct { } type ResultChannel interface { + SendStarted(context.Context, localtaskstore.TaskReceipt) error SendOutput(context.Context, localtaskstore.OutputChunk) error SendFinal(context.Context, localtaskstore.FinalResult) error } type TaskJob struct { - config TaskJobConfig - cancel context.CancelFunc - wg sync.WaitGroup + config TaskJobConfig + enqueueCh chan task.Task + cancel context.CancelFunc + wg sync.WaitGroup } func New() *TaskJob { @@ -59,7 +61,8 @@ func NewJobWithConf(cfg TaskJobConfig) *TaskJob { } return &TaskJob{ - config: cfg, + config: cfg, + enqueueCh: make(chan task.Task, 16), } } @@ -71,6 +74,18 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr channel = channels[0] } tj.wg.Add(1) + go func() { + defer tj.wg.Done() + for { + select { + case queued := <-tj.enqueueCh: + tj.processTask(ctx, queued, tr, channel) + case <-ctx.Done(): + return + } + } + }() + tj.wg.Add(1) go func() { defer tj.wg.Done() tj.config.Trigger(ctx, func() error { @@ -93,6 +108,15 @@ func (tj *TaskJob) Register(ctx context.Context, tf taskfetcher.TaskFetcher, tr return cancel } +func (tj *TaskJob) Enqueue(ctx context.Context, t task.Task) error { + select { + case tj.enqueueCh <- t: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func (tj *TaskJob) processTask(ctx context.Context, t task.Task, tr taskreporter.TaskReporter, channel ResultChannel) { tempFile, err := os.CreateTemp("", "*_script.sh") if err != nil { @@ -184,6 +208,11 @@ func (tj *TaskJob) processTaskWithResultChannel(ctx context.Context, t task.Task tj.reportHTTPResult(t, tr, "failed", "", err.Error(), 1) return } + if err := channel.SendStarted(ctx, localtaskstore.TaskReceipt{TaskID: t.ID, ExecutionAttemptID: t.ExecutionAttemptID}); err != nil { + _ = execCmd.Process.Kill() + tj.reportHTTPResult(t, tr, "failed", "", fmt.Sprintf("failed to report task start: %v", err), 1) + return + } var stdoutBuf bytes.Buffer var stderrBuf bytes.Buffer diff --git a/app/services/localtaskstore/store.go b/app/services/localtaskstore/store.go index 460206e..b117a22 100644 --- a/app/services/localtaskstore/store.go +++ b/app/services/localtaskstore/store.go @@ -87,6 +87,7 @@ type Config struct { type ReceiptStore interface { RecordReceived(TaskReceipt) (TaskState, error) + RecordStarted(taskID, executionAttemptID string) error TaskState(taskID, executionAttemptID string) (TaskState, error) } diff --git a/app/services/wsclient/client.go b/app/services/wsclient/client.go index 5b9a16a..457a01a 100644 --- a/app/services/wsclient/client.go +++ b/app/services/wsclient/client.go @@ -13,6 +13,7 @@ import ( "hostlink/app/services/agentstate" "hostlink/app/services/requestsigner" + "hostlink/domain/task" "hostlink/internal/wsprotocol" ) @@ -31,6 +32,10 @@ type Conn interface { type SleepFunc func(context.Context, time.Duration) error +type TaskEnqueuer interface { + Enqueue(context.Context, task.Task) error +} + type Config struct { URL string AgentState *agentstate.AgentState @@ -41,6 +46,8 @@ type Config struct { PingInterval time.Duration SleepFunc SleepFunc ResultOutbox localtaskstore.ResultOutbox + ReceiptStore localtaskstore.ReceiptStore + TaskEnqueuer TaskEnqueuer } type Client struct { @@ -53,12 +60,14 @@ type Client struct { pingInterval time.Duration sleep SleepFunc - mu sync.RWMutex - writeMu sync.Mutex - active bool - lastAck *wsprotocol.AckPayload - conn Conn - outbox localtaskstore.ResultOutbox + mu sync.RWMutex + writeMu sync.Mutex + active bool + lastAck *wsprotocol.AckPayload + conn Conn + outbox localtaskstore.ResultOutbox + receipts localtaskstore.ReceiptStore + enqueuer TaskEnqueuer } func New(cfg Config) (*Client, error) { @@ -99,6 +108,8 @@ func New(cfg Config) (*Client, error) { pingInterval: cfg.PingInterval, sleep: cfg.SleepFunc, outbox: cfg.ResultOutbox, + receipts: cfg.ReceiptStore, + enqueuer: cfg.TaskEnqueuer, }, nil } @@ -234,12 +245,82 @@ func (c *Client) readLoop(ctx context.Context, conn Conn, helloMessageID string) continue } return fmt.Errorf("websocket protocol error: %s", env.MessageID) + case wsprotocol.TypeTaskDeliver: + if err := c.receiveTaskDeliver(ctx, conn, env); err != nil { + return err + } default: return fmt.Errorf("unsupported inbound websocket message type: %s", env.Type) } } } +func (c *Client) receiveTaskDeliver(ctx context.Context, conn Conn, env wsprotocol.Envelope) error { + if c.receipts == nil { + return fmt.Errorf("receipt store is not configured") + } + payload, err := wsprotocol.DecodePayload[wsprotocol.TaskDeliverPayload](env) + if err != nil { + return err + } + if err := payload.Validate(); err != nil { + return err + } + + previous, err := c.receipts.TaskState(env.TaskID, env.ExecutionAttemptID) + if err != nil { + return err + } + state, err := c.receipts.RecordReceived(localtaskstore.TaskReceipt{ + TaskID: env.TaskID, + ExecutionAttemptID: env.ExecutionAttemptID, + }) + if err != nil { + return err + } + if previous.Exists && previous.Status == localtaskstore.TaskStatusRunning { + return c.writeEnvelope(ctx, conn, c.buildTaskStateEnvelope(wsprotocol.TypeTaskStarted, env.TaskID, env.ExecutionAttemptID)) + } + if previous.Exists && (previous.Status == localtaskstore.TaskStatusFinal || previous.Status == localtaskstore.TaskStatusInterrupted) { + replayed, err := c.replayTaskFinal(ctx, conn, env.TaskID, env.ExecutionAttemptID) + if err != nil { + return err + } + if replayed { + return nil + } + } + if err := c.writeEnvelope(ctx, conn, c.buildTaskStateEnvelope(wsprotocol.TypeTaskReceived, env.TaskID, env.ExecutionAttemptID)); err != nil { + return err + } + if !previous.Exists && state.Status == localtaskstore.TaskStatusReceived && c.enqueuer != nil { + return c.enqueuer.Enqueue(ctx, task.Task{ + ID: env.TaskID, + ExecutionAttemptID: env.ExecutionAttemptID, + Command: payload.Command, + Status: "pending", + Priority: payload.Priority, + }) + } + return nil +} + +func (c *Client) replayTaskFinal(ctx context.Context, conn Conn, taskID, executionAttemptID string) (bool, error) { + if c.outbox == nil { + return false, nil + } + messages, err := c.outbox.UnackedMessages() + if err != nil { + return false, err + } + for _, message := range messages { + if message.TaskID == taskID && message.ExecutionAttemptID == executionAttemptID && message.Type == localtaskstore.OutboxMessageTypeFinal { + return true, c.writeEnvelope(ctx, conn, envelopeFromOutboxMessage(c.agentID, message)) + } + } + return false, nil +} + func (c *Client) SendOutput(ctx context.Context, chunk localtaskstore.OutputChunk) error { if c.outbox == nil { return fmt.Errorf("result outbox is not configured") @@ -283,6 +364,16 @@ func (c *Client) SendFinal(ctx context.Context, result localtaskstore.FinalResul return nil } +func (c *Client) SendStarted(ctx context.Context, receipt localtaskstore.TaskReceipt) error { + if c.receipts == nil { + return fmt.Errorf("receipt store cannot record started state") + } + if err := c.receipts.RecordStarted(receipt.TaskID, receipt.ExecutionAttemptID); err != nil { + return err + } + return c.sendIfActive(ctx, c.buildTaskStateEnvelope(wsprotocol.TypeTaskStarted, receipt.TaskID, receipt.ExecutionAttemptID)) +} + func (c *Client) buildHello() wsprotocol.Envelope { return wsprotocol.Envelope{ ProtocolVersion: wsprotocol.ProtocolVersion, @@ -294,6 +385,19 @@ func (c *Client) buildHello() wsprotocol.Envelope { } } +func (c *Client) buildTaskStateEnvelope(messageType wsprotocol.MessageType, taskID, executionAttemptID string) wsprotocol.Envelope { + return wsprotocol.Envelope{ + ProtocolVersion: wsprotocol.ProtocolVersion, + MessageID: fmt.Sprintf("msg_%s_%s_%d", taskID, messageType, time.Now().UnixNano()), + Type: messageType, + AgentID: c.agentID, + TaskID: taskID, + ExecutionAttemptID: executionAttemptID, + SentAt: time.Now().UTC().Format(time.RFC3339), + Payload: map[string]any{}, + } +} + func (c *Client) setActive(active bool) { c.mu.Lock() defer c.mu.Unlock() diff --git a/app/services/wsclient/client_test.go b/app/services/wsclient/client_test.go index 62bc4ea..f96bdc9 100644 --- a/app/services/wsclient/client_test.go +++ b/app/services/wsclient/client_test.go @@ -9,6 +9,7 @@ import ( "encoding/pem" "errors" "hostlink/app/services/localtaskstore" + "hostlink/domain/task" "net/http" "os" "path/filepath" @@ -118,6 +119,178 @@ func TestClientHandlesAckWithoutTaskSideEffects(t *testing.T) { } } +func TestClientReceivesTaskDeliverStoresAcksAndQueues(t *testing.T) { + store := newClientTestStore(t) + enqueuer := &fakeTaskEnqueuer{} + conn := newFakeConn() + dialer := &fakeDialer{conn: conn} + client := newTestClient(t, dialer, WithReceiptStore(store), WithTaskEnqueuer(enqueuer)) + + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- client.Start(runCtx) }() + + hello := conn.waitForWrite(t) + conn.readCh <- helloAckEnvelope(hello.MessageID) + conn.readCh <- deliverEnvelope("msg_deliver", "task-1", "attempt-1", "printf hi", 2) + + received := conn.waitForWrite(t) + if received.Type != wsprotocol.TypeTaskReceived { + t.Fatalf("received type = %q, want %q", received.Type, wsprotocol.TypeTaskReceived) + } + if received.TaskID != "task-1" || received.ExecutionAttemptID != "attempt-1" { + t.Fatalf("received envelope = %#v", received) + } + state, err := store.TaskState("task-1", "attempt-1") + requireNoError(t, err) + if !state.Exists || state.Status != localtaskstore.TaskStatusReceived { + t.Fatalf("state = %#v", state) + } + waitFor(t, func() bool { return len(enqueuer.tasks()) == 1 }, "task to be queued") + queued := enqueuer.tasks()[0] + if queued.ID != "task-1" || queued.ExecutionAttemptID != "attempt-1" || queued.Command != "printf hi" || queued.Priority != 2 { + t.Fatalf("queued task = %#v", queued) + } + cancel() + if err := <-done; err != nil { + t.Fatalf("Start returned error: %v", err) + } +} + +func TestClientSendStartedPersistsRunningStateAndSendsStarted(t *testing.T) { + store := newClientTestStore(t) + conn := newFakeConn() + dialer := &fakeDialer{conn: conn} + client := newTestClient(t, dialer, WithReceiptStore(store)) + + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- client.Start(runCtx) }() + + hello := conn.waitForWrite(t) + conn.readCh <- helloAckEnvelope(hello.MessageID) + waitFor(t, func() bool { return client.IsActive() }, "client to become active") + requireNoError(t, client.SendStarted(context.Background(), localtaskstore.TaskReceipt{TaskID: "task-1", ExecutionAttemptID: "attempt-1"})) + + started := conn.waitForWrite(t) + if started.Type != wsprotocol.TypeTaskStarted { + t.Fatalf("started type = %q", started.Type) + } + state, err := store.TaskState("task-1", "attempt-1") + requireNoError(t, err) + if state.Status != localtaskstore.TaskStatusRunning { + t.Fatalf("state = %#v", state) + } + cancel() + if err := <-done; err != nil { + t.Fatalf("Start returned error: %v", err) + } +} + +func TestClientDuplicateTaskDeliverReacksWithoutDuplicateQueue(t *testing.T) { + store := newClientTestStore(t) + requireNoError(t, store.RecordStarted("task-1", "attempt-1")) + enqueuer := &fakeTaskEnqueuer{} + conn := newFakeConn() + dialer := &fakeDialer{conn: conn} + client := newTestClient(t, dialer, WithReceiptStore(store), WithTaskEnqueuer(enqueuer)) + + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- client.Start(runCtx) }() + + hello := conn.waitForWrite(t) + conn.readCh <- helloAckEnvelope(hello.MessageID) + conn.readCh <- deliverEnvelope("msg_deliver", "task-1", "attempt-1", "printf hi", 2) + + started := conn.waitForWrite(t) + if started.Type != wsprotocol.TypeTaskStarted { + t.Fatalf("started type = %q", started.Type) + } + if len(enqueuer.tasks()) != 0 { + t.Fatalf("queued tasks = %#v, want none", enqueuer.tasks()) + } + cancel() + if err := <-done; err != nil { + t.Fatalf("Start returned error: %v", err) + } +} + +func TestClientReceivedDuplicateTaskDeliverReacksWithoutDuplicateQueue(t *testing.T) { + store := newClientTestStore(t) + _, err := store.RecordReceived(localtaskstore.TaskReceipt{TaskID: "task-1", ExecutionAttemptID: "attempt-1"}) + requireNoError(t, err) + enqueuer := &fakeTaskEnqueuer{} + conn := newFakeConn() + dialer := &fakeDialer{conn: conn} + client := newTestClient(t, dialer, WithReceiptStore(store), WithTaskEnqueuer(enqueuer)) + + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- client.Start(runCtx) }() + + hello := conn.waitForWrite(t) + conn.readCh <- helloAckEnvelope(hello.MessageID) + conn.readCh <- deliverEnvelope("msg_deliver", "task-1", "attempt-1", "printf hi", 2) + + received := conn.waitForWrite(t) + if received.Type != wsprotocol.TypeTaskReceived { + t.Fatalf("received type = %q", received.Type) + } + if len(enqueuer.tasks()) != 0 { + t.Fatalf("queued tasks = %#v, want none", enqueuer.tasks()) + } + cancel() + if err := <-done; err != nil { + t.Fatalf("Start returned error: %v", err) + } +} + +func TestClientFinalDuplicateTaskDeliverResendsUnackedFinalWithoutQueue(t *testing.T) { + store := newClientTestStore(t) + requireNoError(t, store.RecordFinal(localtaskstore.FinalResult{ + MessageID: "msg-final-1", + TaskID: "task-1", + ExecutionAttemptID: "attempt-1", + Status: "completed", + ExitCode: 0, + Payload: `{"status":"completed","exit_code":0,"output_truncated":false,"error_truncated":false}`, + })) + enqueuer := &fakeTaskEnqueuer{} + conn := newFakeConn() + dialer := &fakeDialer{conn: conn} + client := newTestClient(t, dialer, WithReceiptStore(store), WithResultOutbox(store), WithTaskEnqueuer(enqueuer)) + + runCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan error, 1) + go func() { done <- client.Start(runCtx) }() + + hello := conn.waitForWrite(t) + conn.readCh <- helloAckEnvelope(hello.MessageID) + replayed := conn.waitForWrite(t) + if replayed.Type != wsprotocol.TypeTaskFinal { + t.Fatalf("hello replay type = %q", replayed.Type) + } + conn.readCh <- deliverEnvelope("msg_deliver", "task-1", "attempt-1", "printf hi", 2) + + final := conn.waitForWrite(t) + if final.Type != wsprotocol.TypeTaskFinal || final.MessageID != "msg-final-1" { + t.Fatalf("final duplicate response = %#v", final) + } + if len(enqueuer.tasks()) != 0 { + t.Fatalf("queued tasks = %#v, want none", enqueuer.tasks()) + } + cancel() + if err := <-done; err != nil { + t.Fatalf("Start returned error: %v", err) + } +} + func TestClientAckRemovesResultMessageFromOutbox(t *testing.T) { store := newClientTestStore(t) requireNoError(t, store.AppendOutputChunk(localtaskstore.OutputChunk{ @@ -419,6 +592,14 @@ func WithResultOutbox(outbox localtaskstore.ResultOutbox) clientOption { return func(cfg *Config) { cfg.ResultOutbox = outbox } } +func WithReceiptStore(store localtaskstore.ReceiptStore) clientOption { + return func(cfg *Config) { cfg.ReceiptStore = store } +} + +func WithTaskEnqueuer(enqueuer TaskEnqueuer) clientOption { + return func(cfg *Config) { cfg.TaskEnqueuer = enqueuer } +} + type fakeDialer struct { mu sync.Mutex conn *fakeConn @@ -544,6 +725,64 @@ func ackEnvelope(messageID, ackedMessageID string, ackedType wsprotocol.MessageT } } +func helloAckEnvelope(ackedMessageID string) wsprotocol.Envelope { + return wsprotocol.Envelope{ + ProtocolVersion: wsprotocol.ProtocolVersion, + MessageID: "msg_hello_ack", + Type: wsprotocol.TypeAgentHelloAck, + AgentID: "agent_ws_test", + SentAt: time.Now().UTC().Format(time.RFC3339), + Payload: payloadMapForTest(wsprotocol.BuildAck(wsprotocol.AckOptions{ + AckedMessageID: ackedMessageID, + AckedType: wsprotocol.TypeAgentHello, + })), + } +} + +func deliverEnvelope(messageID, taskID, attemptID, command string, priority int) wsprotocol.Envelope { + return wsprotocol.Envelope{ + ProtocolVersion: wsprotocol.ProtocolVersion, + MessageID: messageID, + Type: wsprotocol.TypeTaskDeliver, + AgentID: "agent_ws_test", + TaskID: taskID, + ExecutionAttemptID: attemptID, + SentAt: time.Now().UTC().Format(time.RFC3339), + Payload: map[string]any{ + "command": command, + "priority": priority, + }, + } +} + +func payloadMapForTest(value any) map[string]any { + data, _ := json.Marshal(value) + var payload map[string]any + _ = json.Unmarshal(data, &payload) + return payload +} + +type fakeTaskEnqueuer struct { + mu sync.Mutex + queued []task.Task + enqueue error +} + +func (f *fakeTaskEnqueuer) Enqueue(ctx context.Context, t task.Task) error { + f.mu.Lock() + defer f.mu.Unlock() + f.queued = append(f.queued, t) + return f.enqueue +} + +func (f *fakeTaskEnqueuer) tasks() []task.Task { + f.mu.Lock() + defer f.mu.Unlock() + tasks := make([]task.Task, len(f.queued)) + copy(tasks, f.queued) + return tasks +} + func newClientTestStore(t *testing.T) *localtaskstore.Store { t.Helper() store, err := localtaskstore.New(localtaskstore.Config{ diff --git a/internal/wsprotocol/message.go b/internal/wsprotocol/message.go index 80f375d..c1878bc 100644 --- a/internal/wsprotocol/message.go +++ b/internal/wsprotocol/message.go @@ -65,6 +65,11 @@ type FinalPayload struct { ErrorTruncated bool `json:"error_truncated"` } +type TaskDeliverPayload struct { + Command string `json:"command"` + Priority int `json:"priority"` +} + func (e Envelope) Validate(authenticatedAgentID string) error { if e.ProtocolVersion != ProtocolVersion { return fmt.Errorf("unsupported protocol_version: %d", e.ProtocolVersion) @@ -120,6 +125,13 @@ func (p FinalPayload) Validate() error { } } +func (p TaskDeliverPayload) Validate() error { + if p.Command == "" { + return fmt.Errorf("command is required") + } + return nil +} + func DecodePayload[T any](e Envelope) (T, error) { var payload T diff --git a/main.go b/main.go index 7208b8a..039fe9c 100644 --- a/main.go +++ b/main.go @@ -268,8 +268,9 @@ func runServer(ctx context.Context, cmd *cli.Command) error { <-registeredChan log.Println("Agent registered, starting task job...") var resultChannel taskjob.ResultChannel + taskJob := taskjob.New() startWebSocketClientIfEnabled(ctx, func() (webSocketRuntime, error) { - runtime, err := newDefaultWebSocketRuntime(localStore) + runtime, err := newDefaultWebSocketRuntime(localStore, taskJob) if err == nil { resultChannel = runtime.(taskjob.ResultChannel) } @@ -286,7 +287,6 @@ func runServer(ctx context.Context, cmd *cli.Command) error { log.Printf("failed to initialize task reporter: %v", err) return } - taskJob := taskjob.New() taskJob.Register(ctx, fetcher, reporter, resultChannel) metricsReporter, err := metrics.New() @@ -335,7 +335,7 @@ func startWebSocketClientIfEnabled(ctx context.Context, constructor func() (webS return true } -func newDefaultWebSocketRuntime(localStore *localtaskstore.Store) (webSocketRuntime, error) { +func newDefaultWebSocketRuntime(localStore *localtaskstore.Store, enqueuer wsclient.TaskEnqueuer) (webSocketRuntime, error) { state := agentstate.New(appconf.AgentStatePath()) if err := state.Load(); err != nil { return nil, fmt.Errorf("failed to load agent state: %w", err) @@ -351,6 +351,8 @@ func newDefaultWebSocketRuntime(localStore *localtaskstore.Store) (webSocketRunt ReconnectMax: appconf.WebSocketReconnectMax(), PingInterval: appconf.WebSocketPingInterval(), ResultOutbox: localStore, + ReceiptStore: localStore, + TaskEnqueuer: enqueuer, }) }