@@ -22,6 +22,7 @@ use tokio_tungstenite::tungstenite::{
2222 Message ,
2323 protocol:: frame:: { CloseFrame , coding:: CloseCode } ,
2424} ;
25+ use universaldb:: utils:: IsolationLevel :: * ;
2526
2627use crate :: shared_state:: { InFlightRequestHandle , SharedState } ;
2728
@@ -46,7 +47,7 @@ pub struct WebsocketPendingLimitReached;
4647
4748#[ derive( Debug ) ]
4849enum LifecycleResult {
49- ServerClose ( protocol:: ToServerWebSocketClose ) ,
50+ ServerClose ( protocol:: mk2 :: ToServerWebSocketClose ) ,
5051 ClientClose ( Option < CloseFrame > ) ,
5152 Aborted ,
5253}
@@ -153,10 +154,22 @@ impl CustomServeTrait for PegboardGateway {
153154 . context ( "failed to read body" ) ?
154155 . to_bytes ( ) ;
155156
156- let mut stopped_sub = self
157- . ctx
158- . subscribe :: < pegboard:: workflows:: actor:: Stopped > ( ( "actor_id" , self . actor_id ) )
159- . await ?;
157+ let udb = self . ctx . udb ( ) ?;
158+ let runner_id = self . runner_id ;
159+ let ( mut stopped_sub, runner_protocol_version) = tokio:: try_join!(
160+ self . ctx
161+ . subscribe:: <pegboard:: workflows:: actor:: Stopped >( ( "actor_id" , self . actor_id) ) ,
162+ // Read runner protocol version
163+ udb. run( |tx| async move {
164+ tx. with_subspace( pegboard:: keys:: subspace( ) ) ;
165+
166+ tx. read(
167+ & pegboard:: keys:: runner:: ProtocolVersionKey :: new( runner_id) ,
168+ Serializable ,
169+ )
170+ . await
171+ } )
172+ ) ?;
160173
161174 // Build subject to publish to
162175 let tunnel_subject =
@@ -169,12 +182,12 @@ impl CustomServeTrait for PegboardGateway {
169182 ..
170183 } = self
171184 . shared_state
172- . start_in_flight_request ( tunnel_subject, request_id)
185+ . start_in_flight_request ( tunnel_subject, runner_protocol_version , request_id)
173186 . await ;
174187
175188 // Start request
176- let message = protocol:: ToClientTunnelMessageKind :: ToClientRequestStart (
177- protocol:: ToClientRequestStart {
189+ let message = protocol:: mk2 :: ToClientTunnelMessageKind :: ToClientRequestStart (
190+ protocol:: mk2 :: ToClientRequestStart {
178191 actor_id : actor_id. clone ( ) ,
179192 method,
180193 path : self . path . clone ( ) ,
@@ -197,12 +210,12 @@ impl CustomServeTrait for PegboardGateway {
197210 res = msg_rx. recv( ) => {
198211 if let Some ( msg) = res {
199212 match msg {
200- protocol:: ToServerTunnelMessageKind :: ToServerResponseStart (
213+ protocol:: mk2 :: ToServerTunnelMessageKind :: ToServerResponseStart (
201214 response_start,
202215 ) => {
203216 return anyhow:: Ok ( response_start) ;
204217 }
205- protocol:: ToServerTunnelMessageKind :: ToServerResponseAbort => {
218+ protocol:: mk2 :: ToServerTunnelMessageKind :: ToServerResponseAbort => {
206219 tracing:: warn!( "request aborted" ) ;
207220 return Err ( ServiceUnavailable . build( ) ) ;
208221 }
@@ -277,9 +290,6 @@ impl CustomServeTrait for PegboardGateway {
277290 request_id : protocol:: RequestId ,
278291 after_hibernation : bool ,
279292 ) -> Result < Option < CloseFrame > > {
280- // Use the actor ID from the gateway instance
281- let actor_id = self . actor_id . to_string ( ) ;
282-
283293 // Extract headers
284294 let mut request_headers = HashableMap :: new ( ) ;
285295 for ( name, value) in headers {
@@ -288,10 +298,22 @@ impl CustomServeTrait for PegboardGateway {
288298 }
289299 }
290300
291- let mut stopped_sub = self
292- . ctx
293- . subscribe :: < pegboard:: workflows:: actor:: Stopped > ( ( "actor_id" , self . actor_id ) )
294- . await ?;
301+ let udb = self . ctx . udb ( ) ?;
302+ let runner_id = self . runner_id ;
303+ let ( mut stopped_sub, runner_protocol_version) = tokio:: try_join!(
304+ self . ctx
305+ . subscribe:: <pegboard:: workflows:: actor:: Stopped >( ( "actor_id" , self . actor_id) ) ,
306+ // Read runner protocol version
307+ udb. run( |tx| async move {
308+ tx. with_subspace( pegboard:: keys:: subspace( ) ) ;
309+
310+ tx. read(
311+ & pegboard:: keys:: runner:: ProtocolVersionKey :: new( runner_id) ,
312+ Serializable ,
313+ )
314+ . await
315+ } )
316+ ) ?;
295317
296318 // Build subject to publish to
297319 let tunnel_subject =
@@ -304,7 +326,7 @@ impl CustomServeTrait for PegboardGateway {
304326 new,
305327 } = self
306328 . shared_state
307- . start_in_flight_request ( tunnel_subject. clone ( ) , request_id)
329+ . start_in_flight_request ( tunnel_subject. clone ( ) , runner_protocol_version , request_id)
308330 . await ;
309331
310332 ensure ! (
@@ -317,9 +339,9 @@ impl CustomServeTrait for PegboardGateway {
317339 true
318340 } else {
319341 // Send WebSocket open message
320- let open_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketOpen (
321- protocol:: ToClientWebSocketOpen {
322- actor_id : actor_id. clone ( ) ,
342+ let open_message = protocol:: mk2 :: ToClientTunnelMessageKind :: ToClientWebSocketOpen (
343+ protocol:: mk2 :: ToClientWebSocketOpen {
344+ actor_id : self . actor_id . to_string ( ) ,
323345 path : self . path . clone ( ) ,
324346 headers : request_headers,
325347 } ,
@@ -338,10 +360,10 @@ impl CustomServeTrait for PegboardGateway {
338360 res = msg_rx. recv( ) => {
339361 if let Some ( msg) = res {
340362 match msg {
341- protocol:: ToServerTunnelMessageKind :: ToServerWebSocketOpen ( msg) => {
363+ protocol:: mk2 :: ToServerTunnelMessageKind :: ToServerWebSocketOpen ( msg) => {
342364 return anyhow:: Ok ( msg) ;
343365 }
344- protocol:: ToServerTunnelMessageKind :: ToServerWebSocketClose ( close) => {
366+ protocol:: mk2 :: ToServerTunnelMessageKind :: ToServerWebSocketClose ( close) => {
345367 tracing:: warn!( ?close, "websocket closed before opening" ) ;
346368 return Err ( WebSocketServiceUnavailable . build( ) ) ;
347369 }
@@ -538,8 +560,8 @@ impl CustomServeTrait for PegboardGateway {
538560 Ok ( _) => ( CloseCode :: Normal . into ( ) , None ) ,
539561 Err ( _) => ( CloseCode :: Error . into ( ) , Some ( "ws.downstream_closed" . into ( ) ) ) ,
540562 } ;
541- let close_message = protocol:: ToClientTunnelMessageKind :: ToClientWebSocketClose (
542- protocol:: ToClientWebSocketClose {
563+ let close_message = protocol:: mk2 :: ToClientTunnelMessageKind :: ToClientWebSocketClose (
564+ protocol:: mk2 :: ToClientWebSocketClose {
543565 code : Some ( close_code. into ( ) ) ,
544566 reason : close_reason. map ( |x| x. as_str ( ) . to_string ( ) ) ,
545567 } ,
0 commit comments