Skip to content

grpc: gate Ark/Indexer RPCs with readiness interceptors#924

Merged
altafan merged 11 commits intoarkade-os:masterfrom
sekulicd:rediness-interceptor
Feb 27, 2026
Merged

grpc: gate Ark/Indexer RPCs with readiness interceptors#924
altafan merged 11 commits intoarkade-os:masterfrom
sekulicd:rediness-interceptor

Conversation

@sekulicd
Copy link
Collaborator

@sekulicd sekulicd commented Feb 23, 2026

Summary

This PR removes the gRPC server/service swap behavior during wallet readiness transitions and replaces it with middleware-based readiness gating.

Ark and Indexer RPCs are now always registered at startup, but access is blocked until the app backend is started and the wallet is ready (initialized, unlocked, synced).

Why

Previously, arkd could restart/swap gRPC internals when the wallet became ready. That made reconnect behavior surprising for SDK clients because clients had to understand arkd internals and sometimes replace connections manually.

This PR moves the responsibility to the server side in a way that is transparent to clients:

  • keep service registration stable
  • return FailedPrecondition while wallet/app is not ready
  • let clients rely on normal gRPC reconnect/backoff behavior

Changes

gRPC readiness gating (server-side)

  • Added unary + stream readiness interceptors for protected methods:
    • /ark.v1.ArkService/*
    • /ark.v1.IndexerService/*
  • Readiness checks require:
    • app backend started (appStarted)
    • wallet status is initialized + unlocked + synced
  • Returns:
    • FailedPrecondition when app/wallet is not ready
    • Unavailable when wallet status cannot be queried

gRPC service lifecycle simplification

  • Removed withAppSvc / withoutAppSvc branching in gRPC service startup/shutdown
  • onReady() now activates app services (startAppServices()) instead of stopping/restarting the gRPC servers
  • stop() uses a snapshot-under-lock pattern for app service shutdown and updates readiness state

Application service startup refactor

  • Moved wallet-dependent initialization from application.NewService() to application.(*service).Start():
    • dust amount derived min values
    • forfeit pubkey
    • checkpoint tapscript
    • forfeit address
  • Added registerEventHandlers() helper and moved event handler registration to Start()
  • Stop() is now nil-safe if Start() fails before sweeperCancel is initialized
  • application.Service.Start() now returns error instead of errors.Error

Misc

  • Added .gitignore entries for local worktree directories:
    • .worktrees/
    • .worktree/

Behavior change (intended)

After arkd restart:

  • gRPC transport can reconnect normally
  • Ark/Indexer RPCs remain registered
  • while wallet is locked/syncing, Ark/Indexer RPCs return FailedPrecondition
  • once wallet is ready, the same client connection can continue working (no custom conn swap required)

Tests

Added readiness interceptor tests covering:

  • non-protected methods bypass readiness checks
  • app-not-started -> FailedPrecondition
  • wallet status error -> Unavailable
  • locked/syncing wallet -> FailedPrecondition
  • ready wallet -> success

Also verified targeted package tests locally:

  • go test ./internal/interface/grpc/...
  • go test ./internal/core/application

@altafan @louisinger please review

Summary by CodeRabbit

  • New Features

    • Service readiness checks now block protected APIs until the app and wallet are ready.
    • Client libraries: improved stream reconnection, backoff behavior, and clearer error propagation on stream/parsing failures.
  • Refactor

    • App service startup made lazy and decoupled from server boot; lifecycle hardened to prevent duplicate starts and ensure safe shutdown ordering.
  • Tests

    • Added unit tests for readiness gating (protected and unprotected paths).
  • Chores

    • Added ignore patterns for local worktree directories.

- keep Ark and Indexer gRPC services registered from startup
- replace stop/start swap flow on wallet ready with interceptor-based readiness checks
- add readiness interceptor service (app lifecycle + wallet status checks) and tests
- simplify grpc service lifecycle by removing with/without app-service startup paths
- move wallet-dependent app initialization (dust/forfeit pubkey/checkpoint tapscript) from NewService to Start
- make app Stop nil-safe when Start fails before sweeper init
- update application.Service Start() to return error
- ignore local worktree directories in .gitignore
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 23, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds readiness gating for protected gRPC methods via a new ReadinessService and interceptors, refactors application startup by moving runtime initialization into a new service Start() lifecycle method, and updates gRPC client reconnect/backoff behavior while removing the previous gRPC connection monitor.

Changes

Cohort / File(s) Summary
Gitignore Configuration
./.gitignore
Added ignore patterns: .worktrees/, .worktree/.
Application Service Lifecycle
internal/core/application/service.go
Added Start() method and started atomic flag; moved runtime initialization (dust/vtxo/settlement params, forfeit pubkey, checkpoint tapscript/address, event handler registration, background routines) from constructor into Start(); added vtxoMinAmount and checkpointExitDelay; Stop() made sweeper-cancellation nil-safe; introduced registerEventHandlers().
gRPC Readiness Interceptors
internal/interface/grpc/interceptors/readiness.go, internal/interface/grpc/interceptors/readiness_test.go
New WalletReadinessStatusProvider and ReadinessService (tracks appStarted); Check() enforces app and wallet readiness for protected methods; unary/stream interceptors added with unit tests covering allow/block paths and wallet error states.
gRPC Interceptor Integration
internal/interface/grpc/interceptors/interceptor.go
Updated UnaryInterceptor and StreamInterceptor signatures to accept readiness *ReadinessService and injected readiness handlers into interceptor chains.
gRPC Service Wiring & Lifecycle
internal/interface/grpc/service.go
Added readinessSvc, appSvcStartMu, appSvcStarted; refactored server start/stop flow, introduced startAppServices() and simplified newServer signature; wired readiness into interceptors and adjusted unconditional registration of certain handlers.
Client gRPC: Events & Transactions
pkg/client-lib/client/grpc/client.go
Removed monitor-based wait-for-ready logic; added grpc.ConnectParams backoff config; introduced listener ID mutexed access, per-stream mutexing and reconnection loop using utils.ShouldReconnect; updated stream recv/reconnect/close logic and surfaced parsing errors.
Indexer gRPC Client
pkg/client-lib/indexer/grpc/client.go
Removed connection monitor and wait-for-server-ready logic; added grpc.ConnectParams backoff config; per-subscription stream mutexing and reconnection using utils.ShouldReconnect; simplified Close and reconnection behavior.
Removed gRPC Monitor
pkg/client-lib/internal/utils/grpc_monitor.go
Deleted MonitorGrpcConn utility that watched connection state and invoked onReconnect callbacks.
Reconnect Utility
pkg/client-lib/internal/utils/reconnect.go
Added GrpcReconnectConfig and ShouldReconnect(err) to classify errors and return reconnect decisions and backoff durations.
Service Options
pkg/client-lib/service_opts.go
Marked WithMonitorConnection option as backward-compatible no-op (comment only).

Sequence Diagram(s)

sequenceDiagram
    participant Client as gRPC Client
    participant Interceptor as Readiness Interceptor
    participant ReadinessSvc as ReadinessService
    participant Wallet as Wallet Provider
    participant Handler as Service Handler

    Client->>Interceptor: RPC Call (protected method)
    Interceptor->>ReadinessSvc: Check(ctx, fullMethod)
    ReadinessSvc->>ReadinessSvc: isProtectedServiceMethod?
    alt not protected
        ReadinessSvc-->>Interceptor: nil
    else protected
        ReadinessSvc->>ReadinessSvc: appStarted?
        alt app not started
            ReadinessSvc-->>Interceptor: Unavailable
        else app started
            ReadinessSvc->>Wallet: Status(ctx)
            Wallet-->>ReadinessSvc: WalletStatus / error
            ReadinessSvc->>ReadinessSvc: validate initialized/unlocked/synced
            alt wallet not ready
                ReadinessSvc-->>Interceptor: FailedPrecondition
            else wallet ready
                ReadinessSvc-->>Interceptor: nil
            end
        end
    end
    alt check passed
        Interceptor->>Handler: invoke handler
        Handler-->>Interceptor: response
        Interceptor-->>Client: response
    else check failed
        Interceptor-->>Client: error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • altafan
  • louisinger
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'grpc: gate Ark/Indexer RPCs with readiness interceptors' directly and clearly describes the main change: implementing readiness-based gating for gRPC service methods using interceptors.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal/interface/grpc/interceptors/readiness.go (1)

50-52: Consider a more specific error message when app hasn't started.

When appStarted is false (Line 50-51), protectedServiceUnavailableErr returns a message saying "wallet is locked or syncing", but the actual reason is that the backend hasn't started yet. A distinct message like "service starting up" would aid client-side debugging without changing the gRPC status code.

♻️ Suggested improvement
 	if !r.appStarted.Load() {
-		return protectedServiceUnavailableErr(fullMethod)
+		return status.Error(codes.FailedPrecondition, "service not yet started")
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/interceptors/readiness.go` around lines 50 - 52, The
readiness check currently returns protectedServiceUnavailableErr(fullMethod)
when appStarted.Load() is false, which yields "wallet is locked or syncing";
update the branch so it returns a distinct error message indicating the service
is starting up (e.g. call a new helper like
protectedServiceStartingErr(fullMethod) or extend protectedServiceUnavailableErr
to accept a reason string) while keeping the same gRPC status code; change the
call in the readiness interceptor (the appStarted.Load() check) to use the new
error helper so clients receive "service starting up" instead of the misleading
wallet message.
internal/core/application/service.go (1)

213-217: Remove commented-out code.

This commented-out restoreWatchingVtxos block appears to be a leftover debug artifact. If it's no longer needed, remove it; if it's pending re-enablement, convert to a TODO with context on when it should be restored.

♻️ Suggested removal
-	// if err := svc.restoreWatchingVtxos(); err != nil {
-	// 	return nil, fmt.Errorf("failed to restore watching vtxos: %s", err)
-	// }
 	return svc, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 213 - 217, Remove the
commented-out restoreWatchingVtxos call or convert it into a proper TODO: delete
the three commented lines starting with "// if err :=
svc.restoreWatchingVtxos();" and the subsequent return error line; if this
functionality is intended to be re-enabled later, replace the commented block
with a concise TODO above the return (mentioning restoreWatchingVtxos and the
conditions for re-enablement) so the intent is documented while keeping code
clean around svc initialization in service.go.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/application/service.go`:
- Around line 273-278: The goroutine currently logs "sweeper service started"
unconditionally after calling s.sweeper.start(ctx); change the control flow so
that the success log is only emitted when s.sweeper.start(ctx) returns nil —
e.g., capture the error from s.sweeper.start(ctx), if err != nil call
log.WithError(err).Warn("failed to start sweeper") and return from the
goroutine, otherwise call log.Info("sweeper service started"); this touches the
anonymous goroutine that calls s.sweeper.start(ctx) and the two log calls.

---

Nitpick comments:
In `@internal/core/application/service.go`:
- Around line 213-217: Remove the commented-out restoreWatchingVtxos call or
convert it into a proper TODO: delete the three commented lines starting with
"// if err := svc.restoreWatchingVtxos();" and the subsequent return error line;
if this functionality is intended to be re-enabled later, replace the commented
block with a concise TODO above the return (mentioning restoreWatchingVtxos and
the conditions for re-enablement) so the intent is documented while keeping code
clean around svc initialization in service.go.

In `@internal/interface/grpc/interceptors/readiness.go`:
- Around line 50-52: The readiness check currently returns
protectedServiceUnavailableErr(fullMethod) when appStarted.Load() is false,
which yields "wallet is locked or syncing"; update the branch so it returns a
distinct error message indicating the service is starting up (e.g. call a new
helper like protectedServiceStartingErr(fullMethod) or extend
protectedServiceUnavailableErr to accept a reason string) while keeping the same
gRPC status code; change the call in the readiness interceptor (the
appStarted.Load() check) to use the new error helper so clients receive "service
starting up" instead of the misleading wallet message.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 76f077b and f1c316f.

📒 Files selected for processing (7)
  • .gitignore
  • internal/core/application/service.go
  • internal/core/application/types.go
  • internal/interface/grpc/interceptors/interceptor.go
  • internal/interface/grpc/interceptors/readiness.go
  • internal/interface/grpc/interceptors/readiness_test.go
  • internal/interface/grpc/service.go

Copy link
Contributor

@arkanaai arkanaai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arkanaai[bot] review — arkd #924

Solid architectural improvement. Moving wallet/forfeit initialization out of NewService() and into Start() means the service struct can now be constructed without a live wallet — and more importantly, RPCs are now gated until the service is actually ready to serve requests.

Security / correctness ✓

The readiness interceptor correctly scopes protection to /ark.v1.ArkService/ and /ark.v1.IndexerService/ method prefixes. Callers who connect while the wallet is locked or still syncing get a clear FailedPrecondition (implied by the not-ready message) rather than operating on inconsistent state. This is the right failure mode. ✓

sweeperCancel nil-check in Stop():

-    s.sweeperCancel()
+    if s.sweeperCancel != nil {
+        s.sweeperCancel()
+    }

Correct — sweeperCancel is only set inside Start(), so a Stop() call before Start() (e.g., on a startup error path) would have panicked before. ✓

⚠️ Breaking interface change — Start() return type

// types.go
-    Start() errors.Error
+    Start() error

errors.Error (Ark's rich error type) is replaced with stdlib error. Any callers that type-assert the return value against errors.Error to extract metadata will break silently. Confirm all callers of Start() (likely in cmd/ or server bootstrap) are updated accordingly.

⚠️ Lifecycle correctness — ensure MarkAppServiceStopped() is called on all exit paths

MarkAppServiceStarted() is called when the service is up. MarkAppServiceStopped() must be called on every path where the service stops — including error returns from Start() itself, panic recovery, and graceful shutdown. If Stopped() is missed on any path, the readiness flag stays true and subsequent reconnecting clients will get through the interceptor against a non-functional service.

⚠️ TODO — context.Background() in Start()

ctx := context.Background() //TODO manage context

The TODO is noted. This means the wallet calls inside Start() are not cancellable. Fine for now but should be addressed before this lifecycle management is considered complete.

Cross-repo note: go-sdk #95 (open, reviewed this cycle) explicitly handles codes.FailedPrecondition from stream reconnects, which is the error code the readiness interceptor returns. The two PRs are coordinated. ✓

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
internal/interface/grpc/interceptors/readiness.go (2)

53-60: ⚠️ Potential issue | 🟠 Major

Return Unavailable when wallet status cannot be queried.

Lines 53-60 currently route wallet-provider unavailability/errors to FailedPrecondition via protectedServiceUnavailableErr, which conflicts with the PR’s intended retry signaling for status-query failures.

🔧 Proposed fix
 func (r *ReadinessService) Check(ctx context.Context, fullMethod string) error {
@@
 	if r.wallet == nil {
-		return protectedServiceUnavailableErr(fullMethod)
+		return status.Error(codes.Unavailable, "wallet status unavailable")
 	}

 	walletStatus, err := r.wallet.Status(ctx)
 	if err != nil {
-		return protectedServiceUnavailableErr(fullMethod)
+		return status.Error(codes.Unavailable, "wallet status unavailable")
 	}
#!/bin/bash
set -euo pipefail

# Verify expected status-code behavior in tests vs implementation.
rg -n -C2 'app.*started|wallet.*status|Unavailable|FailedPrecondition|codes\.' internal/interface/grpc/interceptors/readiness_test.go
rg -n -C2 'appStarted|wallet\.Status|status\.Error\(codes\.' internal/interface/grpc/interceptors/readiness.go
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/interceptors/readiness.go` around lines 53 - 60, When
r.wallet.Status(ctx) fails you must return an Unavailable error (so callers can
retry) instead of routing all wallet errors to protectedServiceUnavailableErr;
update the readiness check around r.wallet == nil, r.wallet.Status(ctx) and
protectedServiceUnavailableErr(fullMethod) so that r.wallet == nil still returns
the existing precondition-style error but any non-nil err from
r.wallet.Status(ctx) returns an Unavailable gRPC error (e.g.,
status.Error(codes.Unavailable, ...)) that includes the status error details;
modify the code path that currently does "if err != nil { return
protectedServiceUnavailableErr(fullMethod) }" to return an Unavailable error
referencing the err.

50-52: ⚠️ Potential issue | 🟠 Major

Fix status code for app-not-started check to match test expectations.

Line 51 returns codes.Unavailable, but tests at lines 126–127 expect codes.FailedPrecondition when appStarted is false. Additionally, line 78 shows the codebase already uses FailedPrecondition for other precondition checks (wallet status), establishing the correct semantic pattern: FailedPrecondition for startup preconditions, Unavailable for runtime conditions. The current code will cause test failures.

🔧 Proposed fix
 	if !r.appStarted.Load() {
-		return status.Error(codes.Unavailable, "server not ready")
+		return status.Error(codes.FailedPrecondition, "server not ready")
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/interceptors/readiness.go` around lines 50 - 52, The
readiness interceptor's app-start check uses r.appStarted.Load() and currently
returns status.Error(codes.Unavailable, "server not ready"); change that to
return status.Error(codes.FailedPrecondition, "server not ready") so the startup
precondition aligns with existing checks (e.g., the wallet status check) and the
tests expecting FailedPrecondition will pass.
internal/core/application/service.go (1)

276-282: Sweeper error-handling fix is correctly applied.

The early return after the warning log prevents the "sweeper service started" message from being emitted on failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 276 - 282, Ensure the
goroutine calling s.sweeper.start handles errors by logging the error and
returning so the success log is not emitted on failure: call
s.sweeper.start(ctx) inside the anonymous goroutine, if err != nil use
log.WithError(err).Warn("failed to start sweeper") and return, otherwise call
log.Info("sweeper service started"); this guarantees the "sweeper service
started" message only appears when s.sweeper.start succeeds.
🧹 Nitpick comments (1)
internal/interface/grpc/interceptors/readiness.go (1)

44-45: Consider fail-closed behavior for protected methods when readiness service is nil.

Currently, line 44 bypasses all gating when r == nil. While NewReadinessService() always returns a non-nil value and the service is properly instantiated at startup, the current pattern would silently allow protected RPCs if a wiring regression caused nil to be passed. For protected methods, this should fail closed (e.g., Internal error) rather than bypassing checks.

The proposed refactor separates the checks so that unprotected methods still return nil for any state, but protected methods fail explicitly when the readiness service is not available:

Suggested change
 func (r *ReadinessService) Check(ctx context.Context, fullMethod string) error {
-	if r == nil || !isProtectedServiceMethod(fullMethod) {
+	if !isProtectedServiceMethod(fullMethod) {
 		return nil
 	}
+	if r == nil {
+		return status.Error(codes.Internal, "readiness service not configured")
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/interceptors/readiness.go` around lines 44 - 45,
Update the gating logic in the readiness interceptor so protected RPCs
fail-closed when the readiness service is nil: instead of the combined check "if
r == nil || !isProtectedServiceMethod(fullMethod) { return nil }", first
early-return nil for unprotected methods by checking
isProtectedServiceMethod(fullMethod) and then, for protected methods, return a
gRPC Internal error when r == nil (so a wiring regression doesn't bypass
checks); if r is non-nil proceed with the existing readiness checks
(NewReadinessService()/r and any subsequent logic).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/application/service.go`:
- Around line 218-288: Start() is not idempotent; add a start-guard to prevent
repeated initialization: introduce a boolean/atomic flag (e.g., service field
like started bool or started int32) protected by the existing mutex or atomic
ops, check it at the top of Start() and return nil (or an error) if already
started, and set it once before registering handlers/starting goroutines; this
will prevent duplicate calls to registerEventHandlers(), reassigning
s.sweeperCancel (or leaking the previous cancel), and launching a second
goroutine via s.wg.Add(1); go s.start(). Ensure the flag is referenced in
Stop/Shutdown (if present) to allow restart only after proper stop.

---

Duplicate comments:
In `@internal/core/application/service.go`:
- Around line 276-282: Ensure the goroutine calling s.sweeper.start handles
errors by logging the error and returning so the success log is not emitted on
failure: call s.sweeper.start(ctx) inside the anonymous goroutine, if err != nil
use log.WithError(err).Warn("failed to start sweeper") and return, otherwise
call log.Info("sweeper service started"); this guarantees the "sweeper service
started" message only appears when s.sweeper.start succeeds.

In `@internal/interface/grpc/interceptors/readiness.go`:
- Around line 53-60: When r.wallet.Status(ctx) fails you must return an
Unavailable error (so callers can retry) instead of routing all wallet errors to
protectedServiceUnavailableErr; update the readiness check around r.wallet ==
nil, r.wallet.Status(ctx) and protectedServiceUnavailableErr(fullMethod) so that
r.wallet == nil still returns the existing precondition-style error but any
non-nil err from r.wallet.Status(ctx) returns an Unavailable gRPC error (e.g.,
status.Error(codes.Unavailable, ...)) that includes the status error details;
modify the code path that currently does "if err != nil { return
protectedServiceUnavailableErr(fullMethod) }" to return an Unavailable error
referencing the err.
- Around line 50-52: The readiness interceptor's app-start check uses
r.appStarted.Load() and currently returns status.Error(codes.Unavailable,
"server not ready"); change that to return
status.Error(codes.FailedPrecondition, "server not ready") so the startup
precondition aligns with existing checks (e.g., the wallet status check) and the
tests expecting FailedPrecondition will pass.

---

Nitpick comments:
In `@internal/interface/grpc/interceptors/readiness.go`:
- Around line 44-45: Update the gating logic in the readiness interceptor so
protected RPCs fail-closed when the readiness service is nil: instead of the
combined check "if r == nil || !isProtectedServiceMethod(fullMethod) { return
nil }", first early-return nil for unprotected methods by checking
isProtectedServiceMethod(fullMethod) and then, for protected methods, return a
gRPC Internal error when r == nil (so a wiring regression doesn't bypass
checks); if r is non-nil proceed with the existing readiness checks
(NewReadinessService()/r and any subsequent logic).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f1c316f and 2d5a674.

📒 Files selected for processing (2)
  • internal/core/application/service.go
  • internal/interface/grpc/interceptors/readiness.go

@@ -0,0 +1,200 @@
package interceptors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use require instead of t.Fatal

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
})

t.Run("app not started returns failed precondition", func(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the name of these tests after we switched the err codes

Suggested change
t.Run("app not started returns failed precondition", func(t *testing.T) {
t.Run("app not started returns unavailable", func(t *testing.T) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
pkg/client-lib/client/grpc/client.go (2)

359-366: Same closeFn race consideration as indexer client.

The closeFn pattern here has the same potential race window as noted in the indexer client: cancel() is called first, then streamMu is acquired to call CloseSend(). During reconnection, the stream variable may have been updated.

This is a minor concern since context cancellation should prevent further use, but documenting the expected behavior or ensuring the goroutine exits before CloseSend() would make the lifecycle clearer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/client-lib/client/grpc/client.go` around lines 359 - 366, The closeFn
currently calls cancel() before acquiring streamMu and then calls
stream.CloseSend(), which can race with reconnection code that replaces stream;
fix by either acquiring streamMu before calling cancel() or by holding streamMu,
copying the current stream into a local variable, releasing the lock and then
cancelling (or cancelling then locking and using the captured stream) so
CloseSend() is invoked on the stable captured stream instance; update the
closeFn surrounding comments to document the chosen ordering and the lifecycle
expectations for stream, streamMu, cancel() and reconnection so future readers
understand why this avoids the race.

251-369: Consider extracting common reconnection logic.

The reconnection pattern in GetEventStream (lines 267-356) and GetTransactionsStream (lines 447-569) is nearly identical:

  • Mutex-guarded stream access
  • ShouldReconnect decision
  • Backoff delay calculation
  • Stream re-establishment with error handling

This duplication increases maintenance burden. Consider extracting a helper or using a generic stream wrapper.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/client-lib/client/grpc/client.go` around lines 251 - 369, The
reconnection loop in GetEventStream and GetTransactionsStream is duplicated;
extract it into a reusable helper (e.g., streamWithReconnect) that encapsulates
the mutexed stream reference (streamMu), recv/recvErr handling, ShouldReconnect
checks, backoffDelay logic using utils.GrpcReconnectConfig, re-dial via
a.svc().GetEventStream / a.svc().GetTransactionsStream, and error reporting to a
channel; have GetEventStream/GetTransactionsStream provide the initial stream,
request object (req), and a small callback to handle each successful response
(e.g., a function that interprets resp, calls setListenerID, converts to events
and sends to eventsCh), and return the same eventsCh/closeFn behavior; ensure
the helper exposes cancellation support (context) and returns dial errors to the
caller channel the same way as current code.
pkg/client-lib/indexer/grpc/client.go (2)

463-465: EOF check is redundant and unreachable for logging.

The io.EOF error would reach line 454's ShouldReconnect call first. Since io.EOF is not a gRPC status and doesn't contain "524", ShouldReconnect returns true, 1s. This means the code continues past line 461, but the if err == io.EOF check at line 463 will still match and log correctly.

However, the log message says "reconnecting" but only logs at Debug level. Consider whether this is the right level given it indicates a server-initiated close.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/client-lib/indexer/grpc/client.go` around lines 463 - 465, The io.EOF
branch is redundant/unreachable after the ShouldReconnect(err) call; remove the
if err == io.EOF block and instead log the server-initiated close at a higher
level (Info or Warn) from the reconnect/error handling path where
ShouldReconnect is evaluated (keep the same message "indexer subscription stream
closed by server" and log it when ShouldReconnect returns true for EOF cases),
updating the log level in that central error-handling location so you don’t
duplicate EOF handling; reference ShouldReconnect and the subscription stream
error variable err in client.go.

532-539: Potential race between closeFn and reconnection loop.

The closeFn closes the stream variable captured in the closure scope. However, the reconnection loop updates stream (line 495). If closeFn is called while a reconnection is in progress:

  1. cancel() is called
  2. The goroutine may have just assigned a new stream
  3. closeFn acquires streamMu and calls CloseSend() on the newly reconnected stream

This is likely benign since cancel() should eventually cause the goroutine to exit, but there's a brief window where CloseSend() might be called on a stream that's about to be used for Recv().

Consider capturing the stream reference to close at subscription creation time, or ensuring the goroutine has exited before calling CloseSend().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/client-lib/indexer/grpc/client.go` around lines 532 - 539, The closeFn
closure currently calls CloseSend() on the captured variable stream which can be
swapped by the reconnection goroutine (see reconnection loop around stream
assignment and streamMu); to avoid racing, either capture the stream reference
at subscription creation (e.g., s := stream inside the scope where closeFn is
defined and call s.CloseSend() instead of referencing stream) or ensure the
reconnection goroutine has exited before calling CloseSend (e.g., add a done
channel or WaitGroup the reconnection goroutine closes/signals and have closeFn
cancel() then wait for that done signal before acquiring streamMu and calling
CloseSend()); update closeFn, the reconnection goroutine, and use existing
streamMu/cancel to coordinate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/client-lib/client/grpc/client.go`:
- Around line 359-366: The closeFn currently calls cancel() before acquiring
streamMu and then calls stream.CloseSend(), which can race with reconnection
code that replaces stream; fix by either acquiring streamMu before calling
cancel() or by holding streamMu, copying the current stream into a local
variable, releasing the lock and then cancelling (or cancelling then locking and
using the captured stream) so CloseSend() is invoked on the stable captured
stream instance; update the closeFn surrounding comments to document the chosen
ordering and the lifecycle expectations for stream, streamMu, cancel() and
reconnection so future readers understand why this avoids the race.
- Around line 251-369: The reconnection loop in GetEventStream and
GetTransactionsStream is duplicated; extract it into a reusable helper (e.g.,
streamWithReconnect) that encapsulates the mutexed stream reference (streamMu),
recv/recvErr handling, ShouldReconnect checks, backoffDelay logic using
utils.GrpcReconnectConfig, re-dial via a.svc().GetEventStream /
a.svc().GetTransactionsStream, and error reporting to a channel; have
GetEventStream/GetTransactionsStream provide the initial stream, request object
(req), and a small callback to handle each successful response (e.g., a function
that interprets resp, calls setListenerID, converts to events and sends to
eventsCh), and return the same eventsCh/closeFn behavior; ensure the helper
exposes cancellation support (context) and returns dial errors to the caller
channel the same way as current code.

In `@pkg/client-lib/indexer/grpc/client.go`:
- Around line 463-465: The io.EOF branch is redundant/unreachable after the
ShouldReconnect(err) call; remove the if err == io.EOF block and instead log the
server-initiated close at a higher level (Info or Warn) from the reconnect/error
handling path where ShouldReconnect is evaluated (keep the same message "indexer
subscription stream closed by server" and log it when ShouldReconnect returns
true for EOF cases), updating the log level in that central error-handling
location so you don’t duplicate EOF handling; reference ShouldReconnect and the
subscription stream error variable err in client.go.
- Around line 532-539: The closeFn closure currently calls CloseSend() on the
captured variable stream which can be swapped by the reconnection goroutine (see
reconnection loop around stream assignment and streamMu); to avoid racing,
either capture the stream reference at subscription creation (e.g., s := stream
inside the scope where closeFn is defined and call s.CloseSend() instead of
referencing stream) or ensure the reconnection goroutine has exited before
calling CloseSend (e.g., add a done channel or WaitGroup the reconnection
goroutine closes/signals and have closeFn cancel() then wait for that done
signal before acquiring streamMu and calling CloseSend()); update closeFn, the
reconnection goroutine, and use existing streamMu/cancel to coordinate.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b4e329 and 93e7626.

📒 Files selected for processing (5)
  • pkg/client-lib/client/grpc/client.go
  • pkg/client-lib/indexer/grpc/client.go
  • pkg/client-lib/internal/utils/grpc_monitor.go
  • pkg/client-lib/internal/utils/reconnect.go
  • pkg/client-lib/service_opts.go
💤 Files with no reviewable changes (1)
  • pkg/client-lib/internal/utils/grpc_monitor.go

Comment on lines +53 to +54
appSvcStartMu sync.Mutex
appSvcStarted bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use atomic.Bool also here?

Comment on lines +15 to +16
arkServiceMethodPrefix = "/ark.v1.ArkService/"
indexerServiceMethodPrefix = "/ark.v1.IndexerService/"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get this from proto stubs (we did it in permissions.go)

@altafan altafan merged commit cf51877 into arkade-os:master Feb 27, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants