Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
View Vercel preview at instant-www-js-streams-jsv.vercel.app. |
📝 WalkthroughWalkthroughThis pull request introduces a comprehensive bidirectional streaming system for Instant. It adds client-side stream creation and consumption APIs (WritableStream and ReadableStream) coordinated through a new InstantStream orchestrator. On the server, a gRPC-based streaming infrastructure is established with in-process clients, bidirectional stream handlers, and persistent state management. The system includes buffer-to-file flushing for write streams, offset tracking for reliability, error handling, and seamless reconnection. Integration points span the reactive session system, storage layer, cache eviction, and type serialization via Nippy. UI components are updated to reflect the new $streams system namespace as read-only. Sequence Diagram(s)sequenceDiagram
participant Client
participant Reactor
participant InstantStream
participant gRPCServer
participant StreamModel
participant Storage
Client->>Reactor: createWriteStream(opts)
Reactor->>InstantStream: createWriteStream(opts)
InstantStream-->>Reactor: WritableStream
Reactor-->>Client: WritableStream
Client->>WritableStream: write(chunk)
WritableStream->>InstantStream: trySend(create-stream)
InstantStream->>gRPCServer: send StreamRequest
gRPCServer->>StreamModel: create!(app-id, client-id, machine-id)
StreamModel-->>gRPCServer: stream-id, reconnect-token
gRPCServer->>InstantStream: create-stream-ok
InstantStream->>WritableStream: stream initialized
Client->>WritableStream: write(chunk)
WritableStream->>WritableStream: buffer chunk
alt Buffer exceeds flush-limit
WritableStream->>InstantStream: trySend(append-stream)
InstantStream->>gRPCServer: send StreamAppend
gRPCServer->>StreamModel: append(chunks)
StreamModel->>Storage: flush-to-file()
Storage-->>StreamModel: file-id, size
StreamModel->>StreamModel: link-file()
StreamModel-->>gRPCServer: stream-flushed
gRPCServer->>InstantStream: stream-flushed
InstantStream->>WritableStream: buffer cleared, offset updated
end
Client->>WritableStream: close()
WritableStream->>InstantStream: trySend(append-stream, done=true)
InstantStream->>gRPCServer: send final append
gRPCServer->>StreamModel: append(chunks, done=true)
sequenceDiagram
participant Client
participant Reactor
participant InstantStream
participant gRPCClient
participant gRPCServer
participant StreamModel
participant Storage
Client->>Reactor: createReadStream({streamId})
Reactor->>InstantStream: createReadStream({streamId})
InstantStream->>InstantStream: create ReadableStream with iterator
InstantStream-->>Client: ReadableStream
Client->>ReadableStream: subscribe to async iterator
ReadableStream->>InstantStream: register sink via connect-to-stream
InstantStream->>gRPCClient: subscribe-to-instant-stream(StreamRequest)
gRPCClient->>gRPCServer: bidirectional stream open
gRPCServer->>StreamModel: handle-bidi-subscribe()
gRPCServer->>StreamModel: connect-to-stream(stream-id, offset)
StreamModel->>Storage: get-stream-files()
Storage-->>StreamModel: files with signed URLs
StreamModel->>gRPCServer: send StreamInit(files, offset)
gRPCServer->>gRPCClient: StreamInit message
gRPCClient->>InstantStream: receive init message
InstantStream->>ReadableStream: enqueue StreamInit
Client->>ReadableStream: receive initial data
par Server appends data
StreamModel->>gRPCServer: stream append occurs
gRPCServer->>gRPCClient: send StreamAppend(chunks, offset)
and Client receives
gRPCClient->>InstantStream: receive StreamAppend
InstantStream->>ReadableStream: enqueue StreamAppend
Client->>ReadableStream: read next chunk
end
Client->>ReadableStream: cancel()
ReadableStream->>InstantStream: cleanup sink
InstantStream->>gRPCClient: send cancel signal
gRPCClient->>gRPCServer: stream cancelled
gRPCServer->>StreamModel: cleanup reader subscription
Comment |
There was a problem hiding this comment.
Actionable comments posted: 20
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
server/test/instant/reactive/session_test.clj (1)
82-117:⚠️ Potential issue | 🟡 MinorClose
socket-3in fixture cleanup to avoid leaked sessions.
with-sessionopenssocket-3but thefinallyblock only closes two sockets.🧹 Suggested cleanup fix
- (session/on-close store socket) - (session/on-close store socket-2) + (session/on-close store socket) + (session/on-close store socket-2) + (session/on-close store socket-3)
🤖 Fix all issues with AI agents
In `@client/packages/core/src/Reactor.js`:
- Around line 983-999: The switch is reading the wrong key name
msg['orignal-event'] so none of the error cases are matched; update the key to
msg['original-event'] (preserve the optional chaining/access pattern used, e.g.,
msg['original-event']?.op) in the switch expression in Reactor.js so the cases
route to this._syncTable.onResyncError, this._syncTable.onStartSyncError and
this._instantStream.onRecieveError correctly; verify the surrounding switch
block that references these handlers is the one to change.
- Around line 352-357: Reactor currently directly passes global
WritableStream/ReadableStream into new InstantStream which breaks in RN/old
browsers; update Reactor to accept stream constructors via config (or polyfills)
and guard their availability before initializing _instantStream: check the
config first, then fallback to typeof WritableStream/ReadableStream !==
'undefined' before passing them to InstantStream (or throw a clear error if
neither exists), ensuring you still pass trySend: this._trySendAuthed.bind(this)
and log: this._log; reference symbols: InstantStream, _instantStream,
WritableStream, ReadableStream, _trySendAuthed, _log.
In `@client/packages/core/src/Stream.ts`:
- Around line 631-647: The start/restart callback maps (startStreamCbs,
restartStreamCbs) leak entries because onCreateStreamOk and onRestartStreamOk
never remove the callback after calling it; update both handlers
(onCreateStreamOk and onRestartStreamOk) to retrieve and then delete the entry
from their respective maps (delete this.startStreamCbs[msg['client-event-id']]
and delete this.restartStreamCbs[msg['client-event-id']]) before or immediately
after invoking cb, and ensure you still handle the missing-callback case (log
and return) exactly as now to avoid calling undefined.
- Around line 181-206: Closed/aborted write streams (handled by close() and
abort(reason)) are never unregistered from writeStreams so they still get
onConnectionReconnect and leak; update close() and abort() to mark the stream as
terminal (isDone: true) and also remove/unregister the stream from the
writeStreams registry (call the same unregister/remove used elsewhere), and
ensure any terminal flush path also unregisters; additionally, make
onConnectionReconnect skip or ignore streams marked done (check stream.isDone)
to avoid restarting completed streams; locate and modify the close, abort,
appendStream usage and the writeStreams/onConnectionReconnect handlers to add
the unregister call and the isDone check.
- Around line 272-415: The unsubscribe uses a different eventId than subscribe
because runStartStream creates its own eventId; fix by tracking and reusing the
same active subscribe event-id: remove the inner const eventId in runStartStream
(or accept an eventId param) and set a shared variable (e.g., activeEventId) to
the eventId used when calling startStream inside createReadStream (or just reuse
the outer eventId), then change cancel() to call cancelStream({ eventId:
activeEventId }) so the same event-id is sent for subscribe and unsubscribe
(references: createReadStream, runStartStream, cancel, startStream,
cancelStream, eventId).
In `@server/deps.edn`:
- Around line 61-64: Upgrade to io.grpc versions (io.grpc/grpc-netty-shaded,
io.grpc/grpc-stub, io.grpc/grpc-core, io.grpc/grpc-services) at 1.79.0 requires
targeted regression testing and possibly a stepped upgrade path: run unit and
integration tests verifying JSON parsing behavior (ensure duplicate keys are
rejected where expected), validate large header/trailer handling to exercise
metadata size soft-limits, execute load/soak tests to catch Netty-related
corruption changes introduced by grpc-netty-shaded, and check code paths
referencing deprecated APIs (e.g., ProtoReflectionService) for removals; if any
issues surface, perform incremental upgrades through intermediate versions
(1.69.x, 1.75.x, etc.) to isolate behavioral changes and update tests or code
accordingly.
In `@server/src/instant/db/model/triple.clj`:
- Line 430: Remove the debug invocation tool/def-locals from the code (it
captures local bindings for development and can affect production); locate the
occurrence of (tool/def-locals) in triple.clj (the debug helper was added near
the top-level of the file) and delete that call so no debug tooling remains in
the merged codebase.
In `@server/src/instant/grpc.clj`:
- Around line 55-61: The MethodDescriptor named subscribe-method currently sets
(.setType MethodDescriptor$MethodType/SERVER_STREAMING) but the server and
client use bidi calls (ServerCalls/asyncBidiStreamingCall and
ClientCalls/asyncBidiStreamingCall); change the setType to
MethodDescriptor$MethodType/BIDI_STREAMING so subscribe-method's descriptor
matches the bidirectional streaming handlers.
In `@server/src/instant/jdbc/cache_evict.clj`:
- Around line 61-70: The function notify-stream-machine-id-changed (and the
similar code at lines 87-92) currently uses nth on the columns vector without a
default which can throw if columns is shorter or nil; change to guarded access
(e.g., use nth with a default nil or lookup by column name) and only parse and
call app-stream-model/notify-machine-id-changed when both stream-id and
machine-id are present and successfully parsed; ensure
parse-uuid/parse-json-uuid are only invoked on non-nil values to avoid
exceptions.
In `@server/src/instant/model/app_stream.clj`:
- Around line 467-500: The proxy currently creates a per-call executor that is
only shut down in onError/onCompleted, which can leak if neither occurs; modify
server-ify-observer to ensure executor is always cleaned up by either using a
shared executor pool (replace Executors/newSingleThreadExecutor with a shared
executor) or by wrapping the cancel handler to shut it down: when implementing
setOnCancelHandler in the proxy, store the provided runnable and replace it with
a wrapper that calls the provided runnable and then (.shutdown executor) and
also ensure the wrapper is invoked where onError/onCompleted currently set/reset
the on-cancel atom; reference the executor, setOnCancelHandler, onError,
onCompleted, closed? and on-cancel-atom symbols to locate and apply this change.
- Around line 337-343: The current double deref on the flush promise in the
app_stream logic can block forever; change the blocking dereferences of the
promise/value in the (when-let [p (:flush-promise updated)] ...) path to use
timed deref calls with sensible timeouts and a clear fallback path: call deref
with a milliseconds timeout for the outer promise (flush-promise) and then for
the inner future value returned by that promise, check for a timeout result and
handle it (log via your logger, set an error flag on stream-object, or throw a
controlled exception) before proceeding to (flush-to-file stream-object nil
on-flush-to-file). Ensure you reference the same symbols (flush-promise, p,
updated, flush-to-file, stream-object, on-flush-to-file) so the change replaces
the blocking derefs with timed derefs and explicit timeout handling.
- Around line 275-282: The concurrent-flush branch currently only logs an
exception via tracer/record-exception-span! but allows the code path to
continue, risking overlapping writes; modify the synchronization in the
flush-to-file flow (the update of :flush-promise and the use of flush-promise)
so that when (= p flush-promise) is false you either fail-fast by throwing an
IllegalStateException (or return an explicit error) to abort the new flush, or
use an atomic compare-and-set pattern to ensure only one flush promise is
installed and others are rejected; update the code around update :flush-promise
and the app-stream/flush-to-file entry point to either throw on concurrent
detection or return the existing p to prevent any overlapping write operations
and ensure callers handle the failure path.
- Around line 543-555: The error branches for :writer-disconnected,
:stream-missing-on-instance and :instance-missing currently return a retry map
only when `@machine-id-changed` is true, causing nil to be returned (so on-payload
is never called); update the handling in the case dispatch (the clauses that
reference `@machine-id-changed`) to always return an error payload (e.g. {:error
true :error-type :retry} or a distinct {:error true :error-type
:pending-machine-id}) when those cases occur, or explicitly return a non-nil
notification indicating the promise is not yet delivered, so that msg is truthy
and on-payload is invoked; ensure changes are made where these keywords are
matched (the :writer-disconnected, :stream-missing-on-instance and
:instance-missing clauses) and keep the original machine-id-changed check only
to adjust the error-type, not to suppress returning any message.
In `@server/src/instant/nippy.clj`:
- Around line 118-125: The code calls the macro tool/def-locals (in the let
block alongside nippy/thaw-from-in!, read-file, and read-chunks) but the
namespace does not require the tool alias; add a require for the namespace that
provides def-locals (e.g. [your-tool-namespace :as tool]) in the ns declaration
or remove the debug macro call; update the namespace header so tool/def-locals
resolves, then recompile to ensure instant.grpc/->StreamInit usage remains
unchanged.
- Around line 50-115: The custom Nippy serializers are inconsistent: code uses
private helpers (#'nippy/write-long, #'nippy/write-str) to write raw bytes but
reads with nippy/thaw-from-in! (which expects Nippy-framed values), causing
mismatched serialization for StreamRequest, StreamFile, StreamInit and the
helper functions write-file, read-file, write-chunks, read-chunks; fix by making
writes and reads symmetric—replace all uses of #'nippy/write-long and
#'nippy/write-str in write-file, write-chunks and the StreamRequest/StreamInit
extend-freeze blocks with nippy/freeze-to-out! (so fields like offset, size,
location-id, combined chunk byte-size are Nippy-encoded), and ensure
corresponding extend-thaw/read helpers use nippy/thaw-from-in! for those same
fields (or conversely switch to raw DataInput/.readLong/.readUTF and use
.writeLong/.writeUTF), but pick one consistent approach across functions (prefer
nippy/freeze-to-out! / nippy/thaw-from-in!) and update read-file and read-chunks
to match that chosen encoding.
In `@server/src/instant/reactive/session.clj`:
- Around line 739-741: The call to the macro tool/def-locals inside
handle-create-stream! is failing because the namespace alias tool is not
required; either add a require for the namespace that provides def-locals (e.g.,
require it as tool) in the ns declaration or remove/replace the debug macro
call; locate the handle-create-stream! function and the use of tool/def-locals
and update the ns requires to include the correct namespace alias (or delete the
tool/def-locals line) so the code compiles.
- Around line 950-967: The handler cancels the reader but never removes the
reader entity, leaving stale entries; after calling the cancel function
(reader-ent → :stream-reader/reader-object → :cancel with
rs/stream-unsubscribe-reason) you must delete the reader entity from the store
so resubscribe by event-id succeeds. Add a call to a store helper (e.g. a new
rs/remove-stream-reader! or store/remove-stream-reader! helper) immediately
after the cancel, passing the reader entity identifier (use reader-ent's id
field such as :db/id or :stream-reader/id) and sess-id/app-id as needed; keep
the validation/error flow (get-auth!, rs/get-stream-reader) and ensure the
delete is invoked even if cancel succeeds (consider wrapping cancel in
try/finally if necessary).
- Around line 1451-1454: The combine implementation for defmethod combine
[:append-stream :append-stream] currently merges chunks and offset but can drop
a done/abort signal from the earlier event; update the merge to preserve these
flags by carrying over :done and :abort from event1 into the resulting event
(e.g., set :done to (or (:done event1) (:done event2)) and likewise for :abort),
and if there is an abort reason on event1 propagate that as well (e.g., prefer
event1's :abort/:abort-reason when present) while keeping the existing update of
:chunks and assoc of :offset.
In `@server/src/instant/reactive/store.clj`:
- Around line 367-387: The tx-cleanup function uses the macro tool/def-locals
but the namespace alias tool is not required; add the proper require for the
namespace that defines def-locals (e.g., add the corresponding [<tool-namespace>
:as tool] entry to the ns declaration) so tool/def-locals resolves, or
remove/replace the debug macro call inside tx-cleanup if you don't want the
dependency; ensure the change references the tx-cleanup function and the
tool/def-locals symbol.
In `@server/test/instant/reactive/session_test.clj`:
- Around line 1081-1115: Remove the debug-only references to tool/* which are
unresolved: delete the call to tool/def-locals inside with-file-mock's
get-metadata and remove tool/inspect from read-full-stream; instead directly
dereference the result of (rs/get-stream-object-for-subscribe store app-id
stream-id) to access :buffer. Specifically, edit with-file-mock -> get-metadata
to just return {:size (alength ^bytes (get `@files` key))} without
tool/def-locals, and update read-full-stream to use (some->> (deref
(rs/get-stream-object-for-subscribe store app-id stream-id)) :buffer ...) or
equivalent direct deref/access so no tool/inspect is referenced.
🧹 Nitpick comments (7)
server/src/instant/db/permissioned_transaction.clj (1)
61-66: LGTM! The logic correctly extends the $files restriction to $streams.The conditional properly disallows attribute changes for both system namespaces, even for admins.
Minor suggestions:
Docstring update: The
prevent-system-column-updatesdocstring (lines 93-95) only mentions$filesas the special case. Consider updating it to also mention$streamsfor consistency.Optional: Use a set for clarity:
💡 Optional: Use a set for multiple etype checks
- (if (or (= etype "$files") - (= etype "$streams")) + (if (#{"$files" "$streams"} etype) ;; under no circumstances do we let you change $files or $streams attrs falseThis makes it easier to add more restricted namespaces in the future.
server/src/instant/model/app_stream.clj (3)
118-132: Remove debug artifacttool/def-locals.
tool/def-localsappears to be a debugging helper that should be removed before merging. It's present in multiple places throughout this file (lines 120, 283, 318, 324, 422, 449, 524, 593, 617).🔧 Suggested fix
(defn extract-part-num [stream-id file-path] (let [prefix (stream-file-name-prefix stream-id) - _ (tool/def-locals) _ (when-not (clojure.string/starts-with? file-path prefix)
448-449: Remove debug artifacttool/inspect.
tool/inspectappears to be a debugging helper that should be removed before merging.🔧 Suggested fix
(if (instance? StreamRequest v) (deliver cleanup (handle-subscribe store credits v observer)) - ;; XXX: Where are our credits going? - (tool/inspect (swap! credits inc)))) + (swap! credits inc)))
596-601: Clean up debug toggle comments.The
true ;false ;true ;falsecomments appear to be leftover debug toggles and should be cleaned up.🔧 Suggested fix
use-local? (or (not channel) (and (= machine-id config/machine-id) - true ;false ;true ;false ;; In dev, hit the grpc server half the time to exercise the route (or (not (config/dev?)) (= 0 (rand-int 2)))))client/sandbox/react-nextjs/pages/play/streams.tsx (2)
9-11: Add a fallback forcrypto.randomUUIDin older browsers.This page will crash where the API is missing (e.g., older Safari/embedded WebViews). Consider a guarded fallback.
🔧 Example fallback
-function randomUUID() { - return crypto.randomUUID(); -} +function randomUUID() { + return typeof crypto?.randomUUID === 'function' + ? crypto.randomUUID() + : `${Date.now()}-${Math.random().toString(16).slice(2)}`; +}
22-23: Avoid exposing the database onglobalThisin non-dev builds.This is handy for debugging, but it can leak internals if this page ships. Consider gating behind a dev flag or removing.
client/packages/core/src/Stream.ts (1)
71-77: Remove or guard debug globals/logs.
globalThis._stuffand console logs will leak internals and spam logs in production. Consider gating by a dev flag or removing.Also applies to: 175-176
| this._instantStream = new InstantStream({ | ||
| WStream: WritableStream, // XXX: needs to be passed in from above | ||
| RStream: ReadableStream, // XXX: needs to be passed in from above | ||
| trySend: this._trySendAuthed.bind(this), | ||
| log: this._log, | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd client/packages/core/src && wc -l Reactor.jsRepository: instantdb/instant
Length of output: 76
🏁 Script executed:
cd client/packages/core/src && sed -n '340,370p' Reactor.jsRepository: instantdb/instant
Length of output: 947
🏁 Script executed:
rg -n "WritableStream|ReadableStream" client/packages/core/src/ -A 2 -B 2Repository: instantdb/instant
Length of output: 4804
🏁 Script executed:
cd client/packages/core/src && head -n 50 Reactor.js | cat -nRepository: instantdb/instant
Length of output: 2408
🏁 Script executed:
rg "typeof.*Stream|WritableStream.*undefined|polyfill" client/packages/core/src/ -iRepository: instantdb/instant
Length of output: 43
🏁 Script executed:
cd client/packages/core && cat package.json | grep -A 10 -B 2 "browser\|targets\|engines"Repository: instantdb/instant
Length of output: 43
🏁 Script executed:
rg "new Reactor\|Reactor\.create" client/packages/ -A 5 -B 2 | head -n 100Repository: instantdb/instant
Length of output: 43
🏁 Script executed:
rg "WritableStream|ReadableStream" client/packages/ --type ts --type js -B 1 | head -n 60Repository: instantdb/instant
Length of output: 2477
🏁 Script executed:
fd -e "ts" -e "js" . client/packages/core/src/ | grep -iE "(init|entry|index)" | head -n 10Repository: instantdb/instant
Length of output: 183
🏁 Script executed:
cd client/packages/core/src && cat -n index.ts | head -n 150Repository: instantdb/instant
Length of output: 4770
🏁 Script executed:
rg "export.*init\|export.*Reactor\|export.*db" client/packages/core/src/index.ts -A 3Repository: instantdb/instant
Length of output: 43
🏁 Script executed:
rg "WritableStream\|ReadableStream" client/packages/ -l --type ts --type jsRepository: instantdb/instant
Length of output: 43
🏁 Script executed:
rg "new Reactor|Reactor\.init" client/packages/core/src/ -B 3 -A 5Repository: instantdb/instant
Length of output: 570
🏁 Script executed:
cat -n client/packages/core/src/Reactor.js | sed -n '1,100p' | grep -i "class\|constructor\|export"Repository: instantdb/instant
Length of output: 253
🏁 Script executed:
grep -n "class Reactor\|constructor\|export default" client/packages/core/src/Reactor.js | head -n 20Repository: instantdb/instant
Length of output: 114
🏁 Script executed:
sed -n '274,350p' client/packages/core/src/Reactor.js | cat -nRepository: instantdb/instant
Length of output: 2811
🏁 Script executed:
sed -n '207,280p' client/packages/core/src/Reactor.js | cat -nRepository: instantdb/instant
Length of output: 2565
🏁 Script executed:
rg "class Reactor" client/packages/core/src/Reactor.js -A 80 | head -n 100Repository: instantdb/instant
Length of output: 2362
🏁 Script executed:
sed -n '200,280p' client/packages/core/src/index.ts | cat -nRepository: instantdb/instant
Length of output: 2982
🏁 Script executed:
rg "new Reactor" client/packages/core/src/index.ts -B 5 -A 15Repository: instantdb/instant
Length of output: 683
🏁 Script executed:
rg "isClient" client/packages/core/src/ -B 2 -A 2 | head -n 40Repository: instantdb/instant
Length of output: 664
🏁 Script executed:
rg "EventSourceImpl" client/packages/core/src/index.ts -B 10 -A 2Repository: instantdb/instant
Length of output: 836
🏁 Script executed:
sed -n '150,220p' client/packages/core/src/index.ts | cat -nRepository: instantdb/instant
Length of output: 2568
🏁 Script executed:
sed -n '2550,2595p' client/packages/core/src/Reactor.js | cat -nRepository: instantdb/instant
Length of output: 1596
🏁 Script executed:
rg "typeof.*WritableStream\|typeof.*ReadableStream" client/ -iRepository: instantdb/instant
Length of output: 43
🏁 Script executed:
rg "polyfill\|ponyfill" client/packages/core/ -iRepository: instantdb/instant
Length of output: 43
🏁 Script executed:
fd -e "package.json" client/packages/core | xargs cat | grep -A 5 -B 5 "stream\|polyfill" -iRepository: instantdb/instant
Length of output: 446
🏁 Script executed:
fd "package.json" client/packages/core --type fRepository: instantdb/instant
Length of output: 94
🏁 Script executed:
cat client/packages/core/package.jsonRepository: instantdb/instant
Length of output: 2322
🏁 Script executed:
rg "React Native\|ReactNative\|react-native" client/packages/core -iRepository: instantdb/instant
Length of output: 43
🏁 Script executed:
grep -n "XXX.*pass" client/packages/core/src/Reactor.jsRepository: instantdb/instant
Length of output: 212
🏁 Script executed:
rg "isClient" client/packages/core/src/Reactor.js -A 5Repository: instantdb/instant
Length of output: 404
🏁 Script executed:
sed -n '300,330p' client/packages/core/src/Reactor.js | cat -nRepository: instantdb/instant
Length of output: 1134
🏁 Script executed:
rg "Web Streams\|WritableStream\|ReadableStream" . --type md --type txt -iRepository: instantdb/instant
Length of output: 43
Add runtime guard for WritableStream/ReadableStream before InstantStream initialization.
WritableStream/ReadableStream are unavailable in React Native and older browsers, causing Reactor initialization to fail. Currently these are passed as globals without any guard, despite the XXX comments acknowledging they should be injected. Guard the availability or pass constructors via config/polyfill:
🔧 Suggested fix
- this._instantStream = new InstantStream({
- WStream: WritableStream, // XXX: needs to be passed in from above
- RStream: ReadableStream, // XXX: needs to be passed in from above
+ const WStreamCtor =
+ typeof WritableStream !== 'undefined' ? WritableStream : null;
+ const RStreamCtor =
+ typeof ReadableStream !== 'undefined' ? ReadableStream : null;
+ if (!WStreamCtor || !RStreamCtor) {
+ throw new Error(
+ 'WritableStream/ReadableStream not available; provide constructors or a polyfill.',
+ );
+ }
+ this._instantStream = new InstantStream({
+ WStream: WStreamCtor,
+ RStream: RStreamCtor,
trySend: this._trySendAuthed.bind(this),
log: this._log,
});🤖 Prompt for AI Agents
In `@client/packages/core/src/Reactor.js` around lines 352 - 357, Reactor
currently directly passes global WritableStream/ReadableStream into new
InstantStream which breaks in RN/old browsers; update Reactor to accept stream
constructors via config (or polyfills) and guard their availability before
initializing _instantStream: check the config first, then fallback to typeof
WritableStream/ReadableStream !== 'undefined' before passing them to
InstantStream (or throw a clear error if neither exists), ensuring you still
pass trySend: this._trySendAuthed.bind(this) and log: this._log; reference
symbols: InstantStream, _instantStream, WritableStream, ReadableStream,
_trySendAuthed, _log.
| switch (msg['orignal-event']?.op) { | ||
| case 'resync-table': { | ||
| this._syncTable.onResyncError(msg); | ||
| return; | ||
| } | ||
| case 'start-sync': { | ||
| this._syncTable.onStartSyncError(msg); | ||
| return; | ||
| } | ||
| case 'create-stream': | ||
| case 'restart-stream': | ||
| case 'append-stream': | ||
| case 'subscribe-stream': | ||
| case 'unsubscribe-stream': { | ||
| this._instantStream.onRecieveError(msg); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Typo in original-event prevents error routing.
msg['orignal-event'] never matches, so sync/stream errors won’t be routed.
🐛 Fix the key name
- switch (msg['orignal-event']?.op) {
+ switch (msg['original-event']?.op) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| switch (msg['orignal-event']?.op) { | |
| case 'resync-table': { | |
| this._syncTable.onResyncError(msg); | |
| return; | |
| } | |
| case 'start-sync': { | |
| this._syncTable.onStartSyncError(msg); | |
| return; | |
| } | |
| case 'create-stream': | |
| case 'restart-stream': | |
| case 'append-stream': | |
| case 'subscribe-stream': | |
| case 'unsubscribe-stream': { | |
| this._instantStream.onRecieveError(msg); | |
| return; | |
| } | |
| switch (msg['original-event']?.op) { | |
| case 'resync-table': { | |
| this._syncTable.onResyncError(msg); | |
| return; | |
| } | |
| case 'start-sync': { | |
| this._syncTable.onStartSyncError(msg); | |
| return; | |
| } | |
| case 'create-stream': | |
| case 'restart-stream': | |
| case 'append-stream': | |
| case 'subscribe-stream': | |
| case 'unsubscribe-stream': { | |
| this._instantStream.onRecieveError(msg); | |
| return; | |
| } |
🤖 Prompt for AI Agents
In `@client/packages/core/src/Reactor.js` around lines 983 - 999, The switch is
reading the wrong key name msg['orignal-event'] so none of the error cases are
matched; update the key to msg['original-event'] (preserve the optional
chaining/access pattern used, e.g., msg['original-event']?.op) in the switch
expression in Reactor.js so the cases route to this._syncTable.onResyncError,
this._syncTable.onStartSyncError and this._instantStream.onRecieveError
correctly; verify the surrounding switch block that references these handlers is
the one to change.
| close() { | ||
| if (streamId_) { | ||
| appendStream({ | ||
| streamId: streamId_, | ||
| chunks: [], | ||
| offset: bufferOffset + bufferByteSize, | ||
| isDone: true, | ||
| }); | ||
| } | ||
| }, | ||
| abort(reason) { | ||
| // XXX: handle abortReason on the server | ||
| // Should store something on the $stream (add new field to stream) | ||
| console.log('abort', reason); | ||
| if (streamId_) { | ||
| // Probably needs to be slightly changed... | ||
| // 1. Delay sending if we're not connected | ||
| // 2. Send the unsent chunks | ||
| appendStream({ | ||
| streamId: streamId_, | ||
| chunks: [], | ||
| offset: bufferOffset + bufferByteSize, | ||
| isDone: true, | ||
| abortReason: reason, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Closed/aborted write streams stay registered and will reconnect.
writeStreams is never unregistered, so closed streams will still receive onConnectionReconnect, leading to unexpected restarts and leaks. Mark streams as done and unregister on close/abort (and ideally on terminal flush).
💡 Sketch of a fix (adds unregister callback)
function createWriteStream({
WStream,
opts,
startStream,
restartStream,
appendStream,
registerStream,
+ unregisterStream,
}: {
...
registerStream: (streamId: string, cbs: WriteStreamCbs) => void;
+ unregisterStream: (streamId: string) => void;
}): WritableStream<string> {
...
close() {
if (streamId_) {
+ isDone = true;
+ unregisterStream(streamId_);
appendStream({
streamId: streamId_,
chunks: [],
offset: bufferOffset + bufferByteSize,
isDone: true,
});
}
},
abort(reason) {
...
if (streamId_) {
+ isDone = true;
+ unregisterStream(streamId_);
appendStream({
streamId: streamId_,
chunks: [],
offset: bufferOffset + bufferByteSize,
isDone: true,
abortReason: reason,
});
}
},🤖 Prompt for AI Agents
In `@client/packages/core/src/Stream.ts` around lines 181 - 206, Closed/aborted
write streams (handled by close() and abort(reason)) are never unregistered from
writeStreams so they still get onConnectionReconnect and leak; update close()
and abort() to mark the stream as terminal (isDone: true) and also
remove/unregister the stream from the writeStreams registry (call the same
unregister/remove used elsewhere), and ensure any terminal flush path also
unregisters; additionally, make onConnectionReconnect skip or ignore streams
marked done (check stream.isDone) to avoid restarting completed streams; locate
and modify the close, abort, appendStream usage and the
writeStreams/onConnectionReconnect handlers to add the unregister call and the
isDone check.
| /// XXX: on cancel, send unsubscribe-stream | ||
| function createReadStream({ | ||
| RStream, | ||
| opts, | ||
| startStream, | ||
| cancelStream, | ||
| }: { | ||
| RStream: ReadableStreamCtor; | ||
| opts: { | ||
| clientId?: string; | ||
| streamId?: string; | ||
| offset?: number; | ||
| }; | ||
| startStream: (opts: { | ||
| eventId: string; | ||
| clientId?: string; | ||
| streamId?: string; | ||
| offset?: number; | ||
| }) => StreamIterator<ReadStreamUpdate>; | ||
| cancelStream: (opts: { eventId: string }) => void; | ||
| }): ReadableStream<string> { | ||
| let seenOffset = opts.offset || 0; | ||
| let canceled = false; | ||
| const decoder = new TextDecoder('utf-8'); | ||
| const encoder = new TextEncoder(); | ||
| const eventId = uuid(); | ||
|
|
||
| async function runStartStream( | ||
| opts: { | ||
| clientId?: string; | ||
| streamId?: string; | ||
| offset?: number; | ||
| }, | ||
| controller: ReadableStreamDefaultController<string>, | ||
| ): Promise<{ retryAfter: number } | undefined> { | ||
| const eventId = uuid(); | ||
| const streamOpts = { ...(opts || {}), eventId }; | ||
| for await (const item of startStream(streamOpts)) { | ||
| if (canceled) { | ||
| return; | ||
| } | ||
| if (item.type === 'error') { | ||
| console.log('got the error'); | ||
| return { retryAfter: 0 }; | ||
| } | ||
|
|
||
| if (item.type === 'reconnect') { | ||
| return { retryAfter: 0 }; | ||
| } | ||
|
|
||
| if (item.offset > seenOffset) { | ||
| // XXX: We should try to resubscribe from the offset we know if this | ||
| // happens instead of throwing an error | ||
| console.error('corrupted stream', { item, seenOffset }); | ||
| controller.error(new Error('Stream is corrupted.')); | ||
| canceled = true; | ||
| return; | ||
| } | ||
|
|
||
| let discardLen = seenOffset - item.offset; | ||
|
|
||
| if (item.files && item.files.length) { | ||
| const fetchAbort = new AbortController(); | ||
| let nextFetch = fetch(item.files[0].url, { | ||
| signal: fetchAbort.signal, | ||
| }); | ||
| for (let i = 0; i < item.files.length; i++) { | ||
| const nextFile = item.files[i + 1]; | ||
| const thisFetch = nextFetch; | ||
| const res = await thisFetch; | ||
| if (nextFile) { | ||
| nextFetch = fetch(nextFile.url, { signal: fetchAbort.signal }); | ||
| } | ||
|
|
||
| // XXX: error handling | ||
| if (res.body) { | ||
| for await (const bodyChunk of res.body) { | ||
| if (canceled) { | ||
| fetchAbort.abort(); | ||
| return; | ||
| } | ||
| let chunk = bodyChunk; | ||
| if (discardLen > 0) { | ||
| chunk = bodyChunk.subarray(discardLen); | ||
| discardLen -= bodyChunk.length - chunk.length; | ||
| } | ||
| if (!chunk.length) { | ||
| continue; | ||
| } | ||
| seenOffset += chunk.length; | ||
| const s = decoder.decode(chunk); | ||
|
|
||
| controller.enqueue(s); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if (item.content) { | ||
| let content = item.content; | ||
| let encoded = encoder.encode(item.content); | ||
| if (discardLen > 0) { | ||
| const remaining = encoded.subarray(discardLen); | ||
| discardLen -= encoded.length - remaining.length; | ||
| if (!remaining.length) { | ||
| continue; | ||
| } | ||
| encoded = remaining; | ||
| content = decoder.decode(remaining); | ||
| } | ||
| seenOffset += encoded.length; | ||
| controller.enqueue(content); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async function start(controller: ReadableStreamDefaultController<string>) { | ||
| let lastStart = Date.now(); | ||
| let retry = true; | ||
| while (retry) { | ||
| retry = false; | ||
| const res = await runStartStream( | ||
| { ...opts, offset: seenOffset }, | ||
| controller, | ||
| ); | ||
| console.log('res', res); | ||
| if (typeof res?.retryAfter !== 'undefined') { | ||
| retry = true; | ||
| await new Promise((resolve) => { | ||
| setTimeout(resolve, res.retryAfter); | ||
| }); | ||
| } | ||
| } | ||
| if (!canceled) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| return new RStream<string>({ | ||
| start(controller) { | ||
| start(controller); | ||
| }, | ||
| cancel(reason) { | ||
| canceled = true; | ||
| cancelStream({ eventId }); | ||
| }, |
There was a problem hiding this comment.
Unsubscribe uses a different event-id than subscribe.
cancel() sends the outer eventId, but subscribe-stream uses a new event id inside runStartStream. The server never receives the correct subscribe-event-id.
🐛 Track the active subscribe event-id
- const eventId = uuid();
+ let currentEventId: string | null = null;
...
- const eventId = uuid();
- const streamOpts = { ...(opts || {}), eventId };
+ const eventId = uuid();
+ currentEventId = eventId;
+ const streamOpts = { ...(opts || {}), eventId };
...
cancel(reason) {
canceled = true;
- cancelStream({ eventId });
+ if (currentEventId) cancelStream({ eventId: currentEventId });
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// XXX: on cancel, send unsubscribe-stream | |
| function createReadStream({ | |
| RStream, | |
| opts, | |
| startStream, | |
| cancelStream, | |
| }: { | |
| RStream: ReadableStreamCtor; | |
| opts: { | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }; | |
| startStream: (opts: { | |
| eventId: string; | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }) => StreamIterator<ReadStreamUpdate>; | |
| cancelStream: (opts: { eventId: string }) => void; | |
| }): ReadableStream<string> { | |
| let seenOffset = opts.offset || 0; | |
| let canceled = false; | |
| const decoder = new TextDecoder('utf-8'); | |
| const encoder = new TextEncoder(); | |
| const eventId = uuid(); | |
| async function runStartStream( | |
| opts: { | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }, | |
| controller: ReadableStreamDefaultController<string>, | |
| ): Promise<{ retryAfter: number } | undefined> { | |
| const eventId = uuid(); | |
| const streamOpts = { ...(opts || {}), eventId }; | |
| for await (const item of startStream(streamOpts)) { | |
| if (canceled) { | |
| return; | |
| } | |
| if (item.type === 'error') { | |
| console.log('got the error'); | |
| return { retryAfter: 0 }; | |
| } | |
| if (item.type === 'reconnect') { | |
| return { retryAfter: 0 }; | |
| } | |
| if (item.offset > seenOffset) { | |
| // XXX: We should try to resubscribe from the offset we know if this | |
| // happens instead of throwing an error | |
| console.error('corrupted stream', { item, seenOffset }); | |
| controller.error(new Error('Stream is corrupted.')); | |
| canceled = true; | |
| return; | |
| } | |
| let discardLen = seenOffset - item.offset; | |
| if (item.files && item.files.length) { | |
| const fetchAbort = new AbortController(); | |
| let nextFetch = fetch(item.files[0].url, { | |
| signal: fetchAbort.signal, | |
| }); | |
| for (let i = 0; i < item.files.length; i++) { | |
| const nextFile = item.files[i + 1]; | |
| const thisFetch = nextFetch; | |
| const res = await thisFetch; | |
| if (nextFile) { | |
| nextFetch = fetch(nextFile.url, { signal: fetchAbort.signal }); | |
| } | |
| // XXX: error handling | |
| if (res.body) { | |
| for await (const bodyChunk of res.body) { | |
| if (canceled) { | |
| fetchAbort.abort(); | |
| return; | |
| } | |
| let chunk = bodyChunk; | |
| if (discardLen > 0) { | |
| chunk = bodyChunk.subarray(discardLen); | |
| discardLen -= bodyChunk.length - chunk.length; | |
| } | |
| if (!chunk.length) { | |
| continue; | |
| } | |
| seenOffset += chunk.length; | |
| const s = decoder.decode(chunk); | |
| controller.enqueue(s); | |
| } | |
| } | |
| } | |
| } | |
| if (item.content) { | |
| let content = item.content; | |
| let encoded = encoder.encode(item.content); | |
| if (discardLen > 0) { | |
| const remaining = encoded.subarray(discardLen); | |
| discardLen -= encoded.length - remaining.length; | |
| if (!remaining.length) { | |
| continue; | |
| } | |
| encoded = remaining; | |
| content = decoder.decode(remaining); | |
| } | |
| seenOffset += encoded.length; | |
| controller.enqueue(content); | |
| } | |
| } | |
| } | |
| async function start(controller: ReadableStreamDefaultController<string>) { | |
| let lastStart = Date.now(); | |
| let retry = true; | |
| while (retry) { | |
| retry = false; | |
| const res = await runStartStream( | |
| { ...opts, offset: seenOffset }, | |
| controller, | |
| ); | |
| console.log('res', res); | |
| if (typeof res?.retryAfter !== 'undefined') { | |
| retry = true; | |
| await new Promise((resolve) => { | |
| setTimeout(resolve, res.retryAfter); | |
| }); | |
| } | |
| } | |
| if (!canceled) { | |
| controller.close(); | |
| } | |
| } | |
| return new RStream<string>({ | |
| start(controller) { | |
| start(controller); | |
| }, | |
| cancel(reason) { | |
| canceled = true; | |
| cancelStream({ eventId }); | |
| }, | |
| /// XXX: on cancel, send unsubscribe-stream | |
| function createReadStream({ | |
| RStream, | |
| opts, | |
| startStream, | |
| cancelStream, | |
| }: { | |
| RStream: ReadableStreamCtor; | |
| opts: { | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }; | |
| startStream: (opts: { | |
| eventId: string; | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }) => StreamIterator<ReadStreamUpdate>; | |
| cancelStream: (opts: { eventId: string }) => void; | |
| }): ReadableStream<string> { | |
| let seenOffset = opts.offset || 0; | |
| let canceled = false; | |
| const decoder = new TextDecoder('utf-8'); | |
| const encoder = new TextEncoder(); | |
| let currentEventId: string | null = null; | |
| async function runStartStream( | |
| opts: { | |
| clientId?: string; | |
| streamId?: string; | |
| offset?: number; | |
| }, | |
| controller: ReadableStreamDefaultController<string>, | |
| ): Promise<{ retryAfter: number } | undefined> { | |
| const eventId = uuid(); | |
| currentEventId = eventId; | |
| const streamOpts = { ...(opts || {}), eventId }; | |
| for await (const item of startStream(streamOpts)) { | |
| if (canceled) { | |
| return; | |
| } | |
| if (item.type === 'error') { | |
| console.log('got the error'); | |
| return { retryAfter: 0 }; | |
| } | |
| if (item.type === 'reconnect') { | |
| return { retryAfter: 0 }; | |
| } | |
| if (item.offset > seenOffset) { | |
| // XXX: We should try to resubscribe from the offset we know if this | |
| // happens instead of throwing an error | |
| console.error('corrupted stream', { item, seenOffset }); | |
| controller.error(new Error('Stream is corrupted.')); | |
| canceled = true; | |
| return; | |
| } | |
| let discardLen = seenOffset - item.offset; | |
| if (item.files && item.files.length) { | |
| const fetchAbort = new AbortController(); | |
| let nextFetch = fetch(item.files[0].url, { | |
| signal: fetchAbort.signal, | |
| }); | |
| for (let i = 0; i < item.files.length; i++) { | |
| const nextFile = item.files[i + 1]; | |
| const thisFetch = nextFetch; | |
| const res = await thisFetch; | |
| if (nextFile) { | |
| nextFetch = fetch(nextFile.url, { signal: fetchAbort.signal }); | |
| } | |
| // XXX: error handling | |
| if (res.body) { | |
| for await (const bodyChunk of res.body) { | |
| if (canceled) { | |
| fetchAbort.abort(); | |
| return; | |
| } | |
| let chunk = bodyChunk; | |
| if (discardLen > 0) { | |
| chunk = bodyChunk.subarray(discardLen); | |
| discardLen -= bodyChunk.length - chunk.length; | |
| } | |
| if (!chunk.length) { | |
| continue; | |
| } | |
| seenOffset += chunk.length; | |
| const s = decoder.decode(chunk); | |
| controller.enqueue(s); | |
| } | |
| } | |
| } | |
| } | |
| if (item.content) { | |
| let content = item.content; | |
| let encoded = encoder.encode(item.content); | |
| if (discardLen > 0) { | |
| const remaining = encoded.subarray(discardLen); | |
| discardLen -= encoded.length - remaining.length; | |
| if (!remaining.length) { | |
| continue; | |
| } | |
| encoded = remaining; | |
| content = decoder.decode(remaining); | |
| } | |
| seenOffset += encoded.length; | |
| controller.enqueue(content); | |
| } | |
| } | |
| } | |
| async function start(controller: ReadableStreamDefaultController<string>) { | |
| let lastStart = Date.now(); | |
| let retry = true; | |
| while (retry) { | |
| retry = false; | |
| const res = await runStartStream( | |
| { ...opts, offset: seenOffset }, | |
| controller, | |
| ); | |
| console.log('res', res); | |
| if (typeof res?.retryAfter !== 'undefined') { | |
| retry = true; | |
| await new Promise((resolve) => { | |
| setTimeout(resolve, res.retryAfter); | |
| }); | |
| } | |
| } | |
| if (!canceled) { | |
| controller.close(); | |
| } | |
| } | |
| return new RStream<string>({ | |
| start(controller) { | |
| start(controller); | |
| }, | |
| cancel(reason) { | |
| canceled = true; | |
| if (currentEventId) cancelStream({ eventId: currentEventId }); | |
| }, |
🤖 Prompt for AI Agents
In `@client/packages/core/src/Stream.ts` around lines 272 - 415, The unsubscribe
uses a different eventId than subscribe because runStartStream creates its own
eventId; fix by tracking and reusing the same active subscribe event-id: remove
the inner const eventId in runStartStream (or accept an eventId param) and set a
shared variable (e.g., activeEventId) to the eventId used when calling
startStream inside createReadStream (or just reuse the outer eventId), then
change cancel() to call cancelStream({ eventId: activeEventId }) so the same
event-id is sent for subscribe and unsubscribe (references: createReadStream,
runStartStream, cancel, startStream, cancelStream, eventId).
| onCreateStreamOk(msg: CreateStreamOkMsg) { | ||
| const cb = this.startStreamCbs[msg['client-event-id']]; | ||
| if (!cb) { | ||
| this.log.info('No stream for start-stream-ok', msg); | ||
| return; | ||
| } | ||
| cb({ streamId: msg['stream-id'], reconnectToken: msg['reconnect-token'] }); | ||
| } | ||
|
|
||
| onRestartStreamOk(msg: RestartStreamOkMsg) { | ||
| const cb = this.restartStreamCbs[msg['client-event-id']]; | ||
| if (!cb) { | ||
| this.log.info('No stream for start-stream-ok', msg); | ||
| return; | ||
| } | ||
| cb({ offset: msg.offset }); | ||
| } |
There was a problem hiding this comment.
Clean up start/restart callbacks after resolve.
startStreamCbs and restartStreamCbs never delete entries, so they can leak over time.
🧹 Cleanup after resolve
onCreateStreamOk(msg: CreateStreamOkMsg) {
const cb = this.startStreamCbs[msg['client-event-id']];
if (!cb) {
this.log.info('No stream for start-stream-ok', msg);
return;
}
+ delete this.startStreamCbs[msg['client-event-id']];
cb({ streamId: msg['stream-id'], reconnectToken: msg['reconnect-token'] });
}
onRestartStreamOk(msg: RestartStreamOkMsg) {
const cb = this.restartStreamCbs[msg['client-event-id']];
if (!cb) {
this.log.info('No stream for start-stream-ok', msg);
return;
}
+ delete this.restartStreamCbs[msg['client-event-id']];
cb({ offset: msg.offset });
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| onCreateStreamOk(msg: CreateStreamOkMsg) { | |
| const cb = this.startStreamCbs[msg['client-event-id']]; | |
| if (!cb) { | |
| this.log.info('No stream for start-stream-ok', msg); | |
| return; | |
| } | |
| cb({ streamId: msg['stream-id'], reconnectToken: msg['reconnect-token'] }); | |
| } | |
| onRestartStreamOk(msg: RestartStreamOkMsg) { | |
| const cb = this.restartStreamCbs[msg['client-event-id']]; | |
| if (!cb) { | |
| this.log.info('No stream for start-stream-ok', msg); | |
| return; | |
| } | |
| cb({ offset: msg.offset }); | |
| } | |
| onCreateStreamOk(msg: CreateStreamOkMsg) { | |
| const cb = this.startStreamCbs[msg['client-event-id']]; | |
| if (!cb) { | |
| this.log.info('No stream for start-stream-ok', msg); | |
| return; | |
| } | |
| delete this.startStreamCbs[msg['client-event-id']]; | |
| cb({ streamId: msg['stream-id'], reconnectToken: msg['reconnect-token'] }); | |
| } | |
| onRestartStreamOk(msg: RestartStreamOkMsg) { | |
| const cb = this.restartStreamCbs[msg['client-event-id']]; | |
| if (!cb) { | |
| this.log.info('No stream for start-stream-ok', msg); | |
| return; | |
| } | |
| delete this.restartStreamCbs[msg['client-event-id']]; | |
| cb({ offset: msg.offset }); | |
| } |
🤖 Prompt for AI Agents
In `@client/packages/core/src/Stream.ts` around lines 631 - 647, The start/restart
callback maps (startStreamCbs, restartStreamCbs) leak entries because
onCreateStreamOk and onRestartStreamOk never remove the callback after calling
it; update both handlers (onCreateStreamOk and onRestartStreamOk) to retrieve
and then delete the entry from their respective maps (delete
this.startStreamCbs[msg['client-event-id']] and delete
this.restartStreamCbs[msg['client-event-id']]) before or immediately after
invoking cb, and ensure you still handle the missing-callback case (log and
return) exactly as now to avoid calling undefined.
| (defn handle-create-stream! [store sess-id {:keys [client-id client-event-id] :as _event}] | ||
| (tool/def-locals) | ||
| ;; XXX: add a listener to the wal that will do something if the machine-id changes |
There was a problem hiding this comment.
tool/def-locals is used but tool isn’t required.
This will fail compilation unless the alias already exists. Please add the require or remove debug macros.
🔧 Add missing require
(:require
...
- [instant.util.tracer :as tracer]
+ [instant.util.tracer :as tracer]
+ [instant.util.tool :as tool]
[instant.util.uuid :as uuid-util]
[lambdaisland.uri :as uri])🤖 Prompt for AI Agents
In `@server/src/instant/reactive/session.clj` around lines 739 - 741, The call to
the macro tool/def-locals inside handle-create-stream! is failing because the
namespace alias tool is not required; either add a require for the namespace
that provides def-locals (e.g., require it as tool) in the ns declaration or
remove/replace the debug macro call; locate the handle-create-stream! function
and the use of tool/def-locals and update the ns requires to include the correct
namespace alias (or delete the tool/def-locals line) so the code compiles.
| (defn handle-unsubscribe-stream! [store sess-id event] | ||
| (let [{:keys [app]} (get-auth! store sess-id) | ||
| app-id (:id app) | ||
| subscribe-event-id (ex/get-optional-param! event | ||
| [:subscribe-event-id] | ||
| string-util/coerce-non-blank-str) | ||
| reader-ent (rs/get-stream-reader store app-id sess-id subscribe-event-id) | ||
| _ (when-not reader-ent | ||
| (ex/throw-validation-err! :unsubscribe-stream | ||
| {:sess-id sess-id | ||
| :subscribe-event-id subscribe-event-id} | ||
| [{:message "Stream is missing."}])) | ||
| cancel (-> reader-ent :stream-reader/reader-object :cancel)] | ||
| ;; XXX Might need to catch if the cancel happens twice?? | ||
| (tool/def-locals) | ||
| (cancel rs/stream-unsubscribe-reason) | ||
| ;; XXX: Remove from store | ||
| )) |
There was a problem hiding this comment.
Unsubscribe doesn’t remove the stream-reader entity.
Canceling without removing leaves stale reader entries and can block resubscribe by event-id.
💡 Possible fix (requires a store helper)
- (cancel rs/stream-unsubscribe-reason)
- ;; XXX: Remove from store
- ))
+ (cancel rs/stream-unsubscribe-reason)
+ (rs/remove-stream-reader store app-id sess-id subscribe-event-id))🤖 Prompt for AI Agents
In `@server/src/instant/reactive/session.clj` around lines 950 - 967, The handler
cancels the reader but never removes the reader entity, leaving stale entries;
after calling the cancel function (reader-ent → :stream-reader/reader-object →
:cancel with rs/stream-unsubscribe-reason) you must delete the reader entity
from the store so resubscribe by event-id succeeds. Add a call to a store helper
(e.g. a new rs/remove-stream-reader! or store/remove-stream-reader! helper)
immediately after the cancel, passing the reader entity identifier (use
reader-ent's id field such as :db/id or :stream-reader/id) and sess-id/app-id as
needed; keep the validation/error flow (get-auth!, rs/get-stream-reader) and
ensure the delete is invoked even if cancel succeeds (consider wrapping cancel
in try/finally if necessary).
| (defmethod combine [:append-stream :append-stream] [event1 event2] | ||
| (-> event2 | ||
| (update :chunks into (:chunks event1)) | ||
| (assoc :offset (:offset event1)))) |
There was a problem hiding this comment.
Combine should preserve :done (and abort) when merging append events.
If an earlier event marks the stream done, combining can drop that signal.
🧩 Merge done/abort flags
(defmethod combine [:append-stream :append-stream] [event1 event2]
(-> event2
(update :chunks into (:chunks event1))
- (assoc :offset (:offset event1))))
+ (assoc :offset (:offset event1))
+ (update :done #(or % (:done event1)))
+ (update :abort-reason #(or % (:abort-reason event1)))))🤖 Prompt for AI Agents
In `@server/src/instant/reactive/session.clj` around lines 1451 - 1454, The
combine implementation for defmethod combine [:append-stream :append-stream]
currently merges chunks and offset but can drop a done/abort signal from the
earlier event; update the merge to preserve these flags by carrying over :done
and :abort from event1 into the resulting event (e.g., set :done to (or (:done
event1) (:done event2)) and likewise for :abort), and if there is an abort
reason on event1 propagate that as well (e.g., prefer event1's
:abort/:abort-reason when present) while keeping the existing update of :chunks
and assoc of :offset.
| (defn tx-cleanup | ||
| "Runs cleanup on deleted entities for streams. | ||
| Returns the set of stale datalog-query-eids that may need to be removed." | ||
| [report] | ||
| (keep (fn [datom] | ||
| (when (and (= (:a datom) | ||
| :subscription/datalog-query) | ||
| (not (:added datom)) | ||
| (d/entity (:db-after report) (:v datom))) | ||
| (:v datom))) | ||
| (when-not (:added datom) | ||
| (case (:a datom) | ||
| :subscription/datalog-query | ||
| (when (d/entity (:db-after report) (:v datom)) | ||
| (:v datom)) | ||
|
|
||
| :stream/stream-object | ||
| (when-let [cleanup (:cleanup (meta (:v datom)))] | ||
| (tool/def-locals) | ||
| (cleanup) | ||
| nil) | ||
|
|
||
| :stream-reader/reader-object | ||
| (when-let [cancel (:cancel (:v datom))] | ||
| (cancel stream-unsubscribe-reason) | ||
| nil) |
There was a problem hiding this comment.
tool/def-locals used without a tool require.
Unless the alias already exists, this won’t compile. Add the require or remove debug macros.
🔧 Add missing require
(:require
...
- [instant.util.tracer :as tracer])
+ [instant.util.tracer :as tracer]
+ [instant.util.tool :as tool])🤖 Prompt for AI Agents
In `@server/src/instant/reactive/store.clj` around lines 367 - 387, The tx-cleanup
function uses the macro tool/def-locals but the namespace alias tool is not
required; add the proper require for the namespace that defines def-locals
(e.g., add the corresponding [<tool-namespace> :as tool] entry to the ns
declaration) so tool/def-locals resolves, or remove/replace the debug macro call
inside tx-cleanup if you don't want the dependency; ensure the change references
the tx-cleanup function and the tool/def-locals symbol.
| (defn with-file-mock [f] | ||
| (let [files (atom {}) | ||
| upload (fn [_bucket {:keys [object-key]} file] | ||
| (let [ba (with-open [_in ^InputStream file | ||
| out (ByteArrayOutputStream.)] | ||
| (io/copy file out) | ||
| (.toByteArray out))] | ||
| (swap! files assoc object-key ba))) | ||
| get-metadata (fn [_bucket key] | ||
| (tool/def-locals) | ||
| {:size (alength ^bytes (get @files key))})] | ||
| (test-util/with-s3-mock {:upload upload | ||
| :get-object-metadata get-metadata | ||
| :location-id-url (fn [app-id location-id] | ||
| (str "https://example.com/" app-id "/" location-id))} | ||
| (f {:slurp-file (fn [app-id location-id] | ||
| (-> @files | ||
| (get (s3/->object-key app-id location-id)) | ||
| slurp))})))) | ||
|
|
||
| (defn read-full-stream | ||
| "Reads the full stream, including any unflushed bytes from the buffer" | ||
| [store app-id stream-id slurp-file] | ||
| (let [stream-files (app-stream-model/get-stream-files {:app-id app-id | ||
| :stream-id stream-id}) | ||
| file-content (apply str (map (fn [{:keys [location-id]}] | ||
| (slurp-file app-id location-id)) | ||
| stream-files)) | ||
| buffer-content (some->> (tool/inspect (rs/get-stream-object-for-subscribe store app-id stream-id)) | ||
| deref | ||
| :buffer | ||
| (map (fn [^bytes ba] | ||
| (String. ba "UTF-8"))) | ||
| (apply str))] | ||
| (str file-content buffer-content))) |
There was a problem hiding this comment.
Remove debug-only tool/* calls (unresolved symbol).
These calls will not compile because tool isn’t required in this namespace.
🛠️ Suggested removal
- get-metadata (fn [_bucket key]
- (tool/def-locals)
- {:size (alength ^bytes (get `@files` key))})]
+ get-metadata (fn [_bucket key]
+ {:size (alength ^bytes (get `@files` key))})]- buffer-content (some->> (tool/inspect (rs/get-stream-object-for-subscribe store app-id stream-id))
+ buffer-content (some->> (rs/get-stream-object-for-subscribe store app-id stream-id)
deref
:buffer📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| (defn with-file-mock [f] | |
| (let [files (atom {}) | |
| upload (fn [_bucket {:keys [object-key]} file] | |
| (let [ba (with-open [_in ^InputStream file | |
| out (ByteArrayOutputStream.)] | |
| (io/copy file out) | |
| (.toByteArray out))] | |
| (swap! files assoc object-key ba))) | |
| get-metadata (fn [_bucket key] | |
| (tool/def-locals) | |
| {:size (alength ^bytes (get @files key))})] | |
| (test-util/with-s3-mock {:upload upload | |
| :get-object-metadata get-metadata | |
| :location-id-url (fn [app-id location-id] | |
| (str "https://example.com/" app-id "/" location-id))} | |
| (f {:slurp-file (fn [app-id location-id] | |
| (-> @files | |
| (get (s3/->object-key app-id location-id)) | |
| slurp))})))) | |
| (defn read-full-stream | |
| "Reads the full stream, including any unflushed bytes from the buffer" | |
| [store app-id stream-id slurp-file] | |
| (let [stream-files (app-stream-model/get-stream-files {:app-id app-id | |
| :stream-id stream-id}) | |
| file-content (apply str (map (fn [{:keys [location-id]}] | |
| (slurp-file app-id location-id)) | |
| stream-files)) | |
| buffer-content (some->> (tool/inspect (rs/get-stream-object-for-subscribe store app-id stream-id)) | |
| deref | |
| :buffer | |
| (map (fn [^bytes ba] | |
| (String. ba "UTF-8"))) | |
| (apply str))] | |
| (str file-content buffer-content))) | |
| (defn with-file-mock [f] | |
| (let [files (atom {}) | |
| upload (fn [_bucket {:keys [object-key]} file] | |
| (let [ba (with-open [_in ^InputStream file | |
| out (ByteArrayOutputStream.)] | |
| (io/copy file out) | |
| (.toByteArray out))] | |
| (swap! files assoc object-key ba))) | |
| get-metadata (fn [_bucket key] | |
| {:size (alength ^bytes (get `@files` key))})] | |
| (test-util/with-s3-mock {:upload upload | |
| :get-object-metadata get-metadata | |
| :location-id-url (fn [app-id location-id] | |
| (str "https://example.com/" app-id "/" location-id))} | |
| (f {:slurp-file (fn [app-id location-id] | |
| (-> `@files` | |
| (get (s3/->object-key app-id location-id)) | |
| slurp))})))) | |
| (defn read-full-stream | |
| "Reads the full stream, including any unflushed bytes from the buffer" | |
| [store app-id stream-id slurp-file] | |
| (let [stream-files (app-stream-model/get-stream-files {:app-id app-id | |
| :stream-id stream-id}) | |
| file-content (apply str (map (fn [{:keys [location-id]}] | |
| (slurp-file app-id location-id)) | |
| stream-files)) | |
| buffer-content (some->> (rs/get-stream-object-for-subscribe store app-id stream-id) | |
| deref | |
| :buffer | |
| (map (fn [^bytes ba] | |
| (String. ba "UTF-8"))) | |
| (apply str))] | |
| (str file-content buffer-content))) |
🤖 Prompt for AI Agents
In `@server/test/instant/reactive/session_test.clj` around lines 1081 - 1115,
Remove the debug-only references to tool/* which are unresolved: delete the call
to tool/def-locals inside with-file-mock's get-metadata and remove tool/inspect
from read-full-stream; instead directly dereference the result of
(rs/get-stream-object-for-subscribe store app-id stream-id) to access :buffer.
Specifically, edit with-file-mock -> get-metadata to just return {:size (alength
^bytes (get `@files` key))} without tool/def-locals, and update read-full-stream
to use (some->> (deref (rs/get-stream-object-for-subscribe store app-id
stream-id)) :buffer ...) or equivalent direct deref/access so no tool/inspect is
referenced.
Adds support for streams.
Still TODO: